흐르는 시간의 블로그...

7가지 동시성 모델

2016. 9. 22. 10:42


읽은 책에 대해 다 정리하고 있지는 못하지만...

우쨋건 지난주까지 우격다짐으로 읽어보긴 한 책이다.


출판사링크: http://www.hanbit.co.kr/store/books/look.php?p_code=B3745244799


책에 대해서는 페북의 광고와 팟캐스트 나프다에서 접했다.

오래전부터 동시성 코딩에 대해 관심이 많기는 했다.


이미 15년 전쯤? 구매해서 가장 많이 활용했던 책도 Thread 관련 책이었다.


Multithreading Applications in Win32

Multithreading Applications in Win32

 

저 책의 저자 서문즈음에 나온다. 원문은 기억나지 않지만 대략 이런 내용이다.

"쓰레드는 반드시 네 손 위에서 쓰도록 하라"

모르거나 능력밖의 범주 혹은 통제범위 밖에서는 절대 사용하지 말라는 경고였다.

개인적인 학습의 목적이라면 저런 경고가 괜한 걱정일지 모른다.

그러나 상용 프로그래밍 개발에서는 진리와 같다.


쓸 얘기도 별로 없지만 다시 "7가지 동시성 모델"에 대해 얘기해보자.

내가 걸어온 개발자의 길에서는 상당히 생소한 책이었다.

내용은 충분히 가치가 있었다.

그러나 해당 언어들에 대해 생소하고 예제를 직접 다 실행하고 분석해보지 않은다면 와 닿지 않을 수 있다.


책에서 설명하는 언어에 대해 간단히 적어보자.

  • 클로저 - Java VM 기반의 함수형 언어
  • Elixir - Erlang 기반의 함수형 언어 

클로저 예제를 돌려 보기 위해 이것저것 뒤져보았다.


Elixir도 편하게 예제를 실행해 볼 수 없을까 찾아봤다.

  • http://elixir-lang.org/install.html - Elixir 설치
  • http://elixir-lang.org/getting-started/introduction.html#installation - 시작 부분으로 윈도우에서는 설치 후 iex.bat --werl 명령을 통해 얼랭의 콘솔을 이용하여 더 편리하게 작업 가능하다

참고로 Elixir의 몇몇 예제들은 버전문제인지 모르겠으나 책의 내용처럼 실행이 되진 않았다.
Clojure의 경우 자바의 jar 파일을 요구하는? 경우 환경 설정에 오래걸릴듯 하여 지나쳤다.


  "7가지 동시성 모델"을 한번에 이해하지 못하더라도 분명 읽어볼만 하다. 나처럼 평생 C와 C++로 함수형 언어를 겪어보지 못한 경우라면 말이다. 최근 Modern C++ 책을 보면서 람다와 같은 경우를 접하긴 했으나 저 책은 개념적으로 이해하기에 충분한 도움을 준다.

새롭게 "GO 언어"를 접해보고 있는 "7가지 동시성 모델"책에서 다루는 통신 순차 프로세스( CSP, Communicating Sequential Processes)에 대한 설명이 조금 나온다. 동시성에 대한 부분은 빅 데이터와 그 분석에 기인한바가 크다고 생각한다. map-reduce... 

책에서 얻은 아이디어 중 하나는 GPGPU에 대한 부분이다.
GIS 영역의 일과 GPGPU에서 처리하고자 했던 이미지 프로세싱은 같은 영역의 일이다.
지도와 이미지를 표시한다고 해서 같다고 생각하는 것이 아니라
이미지 프로세싱 자체가 위치와 경계, 계산의 영역이고 GIS 영역 또한 동일하다.

http://philliplemons.com/2014/09/14/ray-casting-algorithm/ - Ray Casting 알고리즘에 대한 부분이다. 실제로 이미지 프로세싱에서도 쓰이지만 GIS에서 2D Polygon 내의 점의 유무를 확인하는데 쓰인다.

이런 부분을 GPU를 이용할 경우 좋은 성능을 낼것이다. 물론 GPU를 활용할 수 있을때에 해당하겠지만...

HW의 변화에 따라 앞으로 동시성 프로그래밍에 대한 요구는 거세게 불어닥칠 것이고 5년 내에 일반화 될 것이다.
개발자로 그에 대한 대비는 분명히 해야할 것으로 보인다.
...




일전에 pcqueue를 간단히 만들었었다.
창피해서 코드를 올리지 말까 하다가 올려본다.
실제 사용중에 있다.
허접한 코드를 개선해서 사용해도 좋다.
요기 출처를 달아주고 개선된 부분이 있으면 공유해 주면 더 바랄게 없다.
허접하다고 너무 심하게 구박하지 말아줬으면 좋겠다.

아래는 해당 pcqueue의 템플릿 코드이다.
실제 호출은 템픗릿 소스의 아래에 올려 놓겠다.

//---------------------------------------------------------------------------------------------------------------------
// pcqueue.hpp
//
// Asker's product consumer queue
//---------------------------------------------------------------------------------------------------------------------
#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>

#include "errno.h"

#include "common.hpp"

using namespace std;
//---------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------
template <class T>
class CPCQueue
{
private:
	pthread_mutex_t ProducerLock;
	pthread_mutex_t ConsumerLock;

	vector<T*> ProducerQueue;
	vector<T*> ConsumerQueue;

private:
	bool LockProducer()
	{
		int _nError = pthread_mutex_lock( &ProducerLock );
		if ( _nError )
		{
			cout << "error in lock producer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in lock producer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}
	
		return true;
	}

	bool UnlockProducer()
	{
		int _nError = pthread_mutex_unlock( &ProducerLock );
		if ( _nError )
		{
			cout << "error in unlock producer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in unlock producer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}
	
		return true;
	}

	bool LockConsumer()
	{
		int _nError = pthread_mutex_lock( &ConsumerLock );
		if ( _nError )
		{
			cout << "error in lock consumer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in lock consumer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}

		return true;
	}

	bool UnlockConsumer()
	{
		int _nError = pthread_mutex_unlock( &ConsumerLock );
		if ( _nError )
		{
			cout << "error in unlock consumer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in unlock consumer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}

		return true;
	}


public:
	CPCQueue(){}
	virtual ~CPCQueue()
	{
		for ( class vector<T*>::iterator _iter = ProducerQueue.begin(); _iter != ProducerQueue.end(); _iter++)
			delete *_iter;

		for ( class vector<T*>::iterator _iter = ConsumerQueue.begin(); _iter != ConsumerQueue.end(); _iter++)
			delete *_iter;

		ProducerQueue.clear();
		ConsumerQueue.clear();
	}

	bool Init()
	{
		int _ErrorNo = pthread_mutex_init( &ProducerLock, NULL);
		if (_ErrorNo)
			return false;
	
		_ErrorNo = pthread_mutex_init( &ConsumerLock, NULL);
		if (_ErrorNo)
			return false;
	
		return true;
	}

	// Product에 하나를 추가한다
	bool AddProduct( T* _Job )
	{
		if ( false == LockProducer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		ProducerQueue.push_back(_Job);
	 
		if ( false == UnlockProducer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		return true;
	}

	// Consumer 쪽에서 하나씩 빼낸다.
	bool PopFromConsumer(T** _Value)
	{
		*_Value = NULL;

		if ( false == LockConsumer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		if (!ConsumerQueue.empty())
		{
			*_Value = ConsumerQueue.front();
			ConsumerQueue.erase(ConsumerQueue.begin());
		}

		if ( false == UnlockConsumer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		return true;
	}

	// 합할 것이 하나도 없는 경우 false를 반환한다
	bool MoveProducerToConsumer(int& _MoveCount)
	{
		if ( false == LockConsumer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		if ( false == LockProducer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		_MoveCount = ProducerQueue.size();
		if (!ProducerQueue.empty())
		{
			for ( class vector<T*>::const_iterator _iter = ProducerQueue.begin(); _iter != ProducerQueue.end(); _iter++)
				ConsumerQueue.push_back(*_iter);
	
			ProducerQueue.clear();
		}

		if ( false == UnlockProducer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		if ( false == UnlockConsumer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		return true;
	}

	int GetConsumerSize() const
	{
		return ConsumerQueue.size();
	}

};
//---------------------------------------------------------------------------------------------------------------------



class CDataThread: public CUbiThread
{
private:
	CPCQueue<COnePacket> PCQueue;
	int ThreadIndex;

	CURL* ctx;

public:
	bool Terminated;

public:
	CDataThread();
	virtual ~CDataThread();

	bool init();
	void SetIndex(int _Index);
	bool AddProduct( COnePacket* _OnePacket);

	virtual void Run();

private:
	bool DoPacketProcess(COnePacket* _OnePacket);
	CUbiParser* MakeParser(COnePacket* _OnePacket);
}

//---------------------------------------------------------------------------------------------------------------------
CDataThread::CDataThread()
{
}
//---------------------------------------------------------------------------------------------------------------------
CDataThread::~CDataThread()
{
}
//---------------------------------------------------------------------------------------------------------------------
bool CDataThread::init()
{
	return PCQueue.Init();
}
//---------------------------------------------------------------------------------------------------------------------
void CDataThread::SetIndex(int _Index)
{
	ThreadIndex = _Index;
}
//---------------------------------------------------------------------------------------------------------------------
bool CDataThread::AddProduct( COnePacket* _OnePacket )
{
	return PCQueue.AddProduct(_OnePacket);
}
//---------------------------------------------------------------------------------------------------------------------
void CDataThread::Run()
{
	int _Count = 0;
	int _JobCount = 0;
	int _SleepMicro;
	COnePacket* _OnePacket = NULL;

	Terminated = false;
	while(!Terminated)
	{
		try
		{
			// Customer Queue에 Producer Queue의 내용을 가져오게 한다
			if (!PCQueue.MoveProducerToConsumer(_Count))
				formatlog( LOG_FATAL, "CDataThread::%s(%d) %10d Thread Error in Move PCQueue data", __func__, __LINE__, ThreadIndex); //, typeid(T).name());
			else
				formatlog( LOG_INFO, "CDataThread::%s(%d) %10d Thread %2d %3d remain.", __func__, __LINE__, ThreadIndex, _Count, PCQueue.GetConsumerSize());
		}
		catch(exception &_innerException)
		{
			formatlog( LOG_CRIT, "Exception on CDataThread::%s(%d). %2d Thread what(%s)", __func__, __LINE__, ThreadIndex, _innerException.what());
		}

		// Customer Queue를 비울때 까지 작업을 처리 한다
		_JobCount = 0;
		while(true)
		{
			_OnePacket = NULL;

			// 처리 간격에 일정 시간을 둔다
			if (!PCQueue.PopFromConsumer(&_OnePacket))
			{
				formatlog( LOG_FATAL, "CDataThread::%s(%d). %2d Thread Error in Get Data From PCQueue", __func__, __LINE__, ThreadIndex);
				break;
			}

			if ( NULL == _OnePacket)
				break;

			//1 실질적인 패킷 처리!!
			DoPacketProcess(_OnePacket);
			_JobCount++;

			delete _OnePacket;
		}

		usleep(500000 );
	}
}



Producer Consumer 큐를 만들었다.

이와 함께 Job을 가상화 하여 일정한 규칙(동일 DB Insert)을 가지는 다양한 Job을 처리하게 하였다.

그리고 그것도 템플릿으로 또 다시 묶었다. (너무 과잉인가.. ㅠ.ㅠ)


문제는 테스트 코드에서는 문제가 없는데...

상용 코드에서는 자꾸 에러가 나는 것이다.


pthread_mutex_lock()을 콜하면 "130"을 반환하고 errno로 "0"을 줬다.

프로그램은 lock이 걸려서 더 진행되지 않았다.


스택오버플로우도 뒤져보고 해서...

뭐 테스트 쓰레드도 만들고 다 해봤는데 또 잘 된다.


결론은!!!!

상용코드에서 init()을 콜하지 않았다. ㅠ.ㅠ

해당 init 코드에서 pthread_mutex_init()을 통해 mutex를 초기화 해야 하는데

초기화 하지 않은 mutex를 사용하려니 문제를 발생 시킨 것이다.


그런데 그게 다른 에러가 아니라... 130번이 반환되서 고생한 것이었다. ㅠ.ㅠ

"EOWNERDEAD"