흐르는 시간의 블로그...

일전에 pcqueue를 간단히 만들었었다.
창피해서 코드를 올리지 말까 하다가 올려본다.
실제 사용중에 있다.
허접한 코드를 개선해서 사용해도 좋다.
요기 출처를 달아주고 개선된 부분이 있으면 공유해 주면 더 바랄게 없다.
허접하다고 너무 심하게 구박하지 말아줬으면 좋겠다.

아래는 해당 pcqueue의 템플릿 코드이다.
실제 호출은 템픗릿 소스의 아래에 올려 놓겠다.

//---------------------------------------------------------------------------------------------------------------------
// pcqueue.hpp
//
// Asker's product consumer queue
//---------------------------------------------------------------------------------------------------------------------
#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>

#include "errno.h"

#include "common.hpp"

using namespace std;
//---------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------------
template <class T>
class CPCQueue
{
private:
	pthread_mutex_t ProducerLock;
	pthread_mutex_t ConsumerLock;

	vector<T*> ProducerQueue;
	vector<T*> ConsumerQueue;

private:
	bool LockProducer()
	{
		int _nError = pthread_mutex_lock( &ProducerLock );
		if ( _nError )
		{
			cout << "error in lock producer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in lock producer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}
	
		return true;
	}

	bool UnlockProducer()
	{
		int _nError = pthread_mutex_unlock( &ProducerLock );
		if ( _nError )
		{
			cout << "error in unlock producer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in unlock producer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}
	
		return true;
	}

	bool LockConsumer()
	{
		int _nError = pthread_mutex_lock( &ConsumerLock );
		if ( _nError )
		{
			cout << "error in lock consumer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in lock consumer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}

		return true;
	}

	bool UnlockConsumer()
	{
		int _nError = pthread_mutex_unlock( &ConsumerLock );
		if ( _nError )
		{
			cout << "error in unlock consumer. return : " << _nError << ", err :" << strerror(errno) << endl;
			formatlog( LOG_CRIT, "error in unlock consumer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno));
			return false;
		}

		return true;
	}


public:
	CPCQueue(){}
	virtual ~CPCQueue()
	{
		for ( class vector<T*>::iterator _iter = ProducerQueue.begin(); _iter != ProducerQueue.end(); _iter++)
			delete *_iter;

		for ( class vector<T*>::iterator _iter = ConsumerQueue.begin(); _iter != ConsumerQueue.end(); _iter++)
			delete *_iter;

		ProducerQueue.clear();
		ConsumerQueue.clear();
	}

	bool Init()
	{
		int _ErrorNo = pthread_mutex_init( &ProducerLock, NULL);
		if (_ErrorNo)
			return false;
	
		_ErrorNo = pthread_mutex_init( &ConsumerLock, NULL);
		if (_ErrorNo)
			return false;
	
		return true;
	}

	// Product에 하나를 추가한다
	bool AddProduct( T* _Job )
	{
		if ( false == LockProducer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		ProducerQueue.push_back(_Job);
	 
		if ( false == UnlockProducer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		return true;
	}

	// Consumer 쪽에서 하나씩 빼낸다.
	bool PopFromConsumer(T** _Value)
	{
		*_Value = NULL;

		if ( false == LockConsumer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		if (!ConsumerQueue.empty())
		{
			*_Value = ConsumerQueue.front();
			ConsumerQueue.erase(ConsumerQueue.begin());
		}

		if ( false == UnlockConsumer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		return true;
	}

	// 합할 것이 하나도 없는 경우 false를 반환한다
	bool MoveProducerToConsumer(int& _MoveCount)
	{
		if ( false == LockConsumer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		if ( false == LockProducer())
		{
			// lock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		_MoveCount = ProducerQueue.size();
		if (!ProducerQueue.empty())
		{
			for ( class vector<T*>::const_iterator _iter = ProducerQueue.begin(); _iter != ProducerQueue.end(); _iter++)
				ConsumerQueue.push_back(*_iter);
	
			ProducerQueue.clear();
		}

		if ( false == UnlockProducer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		if ( false == UnlockConsumer())
		{
			// unlock 실패
			formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__);
			return false;
		}

		return true;
	}

	int GetConsumerSize() const
	{
		return ConsumerQueue.size();
	}

};
//---------------------------------------------------------------------------------------------------------------------



class CDataThread: public CUbiThread
{
private:
	CPCQueue<COnePacket> PCQueue;
	int ThreadIndex;

	CURL* ctx;

public:
	bool Terminated;

public:
	CDataThread();
	virtual ~CDataThread();

	bool init();
	void SetIndex(int _Index);
	bool AddProduct( COnePacket* _OnePacket);

	virtual void Run();

private:
	bool DoPacketProcess(COnePacket* _OnePacket);
	CUbiParser* MakeParser(COnePacket* _OnePacket);
}

//---------------------------------------------------------------------------------------------------------------------
CDataThread::CDataThread()
{
}
//---------------------------------------------------------------------------------------------------------------------
CDataThread::~CDataThread()
{
}
//---------------------------------------------------------------------------------------------------------------------
bool CDataThread::init()
{
	return PCQueue.Init();
}
//---------------------------------------------------------------------------------------------------------------------
void CDataThread::SetIndex(int _Index)
{
	ThreadIndex = _Index;
}
//---------------------------------------------------------------------------------------------------------------------
bool CDataThread::AddProduct( COnePacket* _OnePacket )
{
	return PCQueue.AddProduct(_OnePacket);
}
//---------------------------------------------------------------------------------------------------------------------
void CDataThread::Run()
{
	int _Count = 0;
	int _JobCount = 0;
	int _SleepMicro;
	COnePacket* _OnePacket = NULL;

	Terminated = false;
	while(!Terminated)
	{
		try
		{
			// Customer Queue에 Producer Queue의 내용을 가져오게 한다
			if (!PCQueue.MoveProducerToConsumer(_Count))
				formatlog( LOG_FATAL, "CDataThread::%s(%d) %10d Thread Error in Move PCQueue data", __func__, __LINE__, ThreadIndex); //, typeid(T).name());
			else
				formatlog( LOG_INFO, "CDataThread::%s(%d) %10d Thread %2d %3d remain.", __func__, __LINE__, ThreadIndex, _Count, PCQueue.GetConsumerSize());
		}
		catch(exception &_innerException)
		{
			formatlog( LOG_CRIT, "Exception on CDataThread::%s(%d). %2d Thread what(%s)", __func__, __LINE__, ThreadIndex, _innerException.what());
		}

		// Customer Queue를 비울때 까지 작업을 처리 한다
		_JobCount = 0;
		while(true)
		{
			_OnePacket = NULL;

			// 처리 간격에 일정 시간을 둔다
			if (!PCQueue.PopFromConsumer(&_OnePacket))
			{
				formatlog( LOG_FATAL, "CDataThread::%s(%d). %2d Thread Error in Get Data From PCQueue", __func__, __LINE__, ThreadIndex);
				break;
			}

			if ( NULL == _OnePacket)
				break;

			//1 실질적인 패킷 처리!!
			DoPacketProcess(_OnePacket);
			_JobCount++;

			delete _OnePacket;
		}

		usleep(500000 );
	}
}