기존에 제작했던 pcqueue 샘플 코드
프로그래밍???/C/C++2016. 7. 13. 15:16
일전에 pcqueue를 간단히 만들었었다.
창피해서 코드를 올리지 말까 하다가 올려본다.
실제 사용중에 있다.
허접한 코드를 개선해서 사용해도 좋다.
요기 출처를 달아주고 개선된 부분이 있으면 공유해 주면 더 바랄게 없다.
아래는 해당 pcqueue의 템플릿 코드이다.
실제 호출은 템픗릿 소스의 아래에 올려 놓겠다.
//--------------------------------------------------------------------------------------------------------------------- // pcqueue.hpp // // Asker's product consumer queue //--------------------------------------------------------------------------------------------------------------------- #pragma once #include <iostream> #include <vector> #include <pthread.h> #include "errno.h" #include "common.hpp" using namespace std; //--------------------------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------------------------- template <class T> class CPCQueue { private: pthread_mutex_t ProducerLock; pthread_mutex_t ConsumerLock; vector<T*> ProducerQueue; vector<T*> ConsumerQueue; private: bool LockProducer() { int _nError = pthread_mutex_lock( &ProducerLock ); if ( _nError ) { cout << "error in lock producer. return : " << _nError << ", err :" << strerror(errno) << endl; formatlog( LOG_CRIT, "error in lock producer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno)); return false; } return true; } bool UnlockProducer() { int _nError = pthread_mutex_unlock( &ProducerLock ); if ( _nError ) { cout << "error in unlock producer. return : " << _nError << ", err :" << strerror(errno) << endl; formatlog( LOG_CRIT, "error in unlock producer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno)); return false; } return true; } bool LockConsumer() { int _nError = pthread_mutex_lock( &ConsumerLock ); if ( _nError ) { cout << "error in lock consumer. return : " << _nError << ", err :" << strerror(errno) << endl; formatlog( LOG_CRIT, "error in lock consumer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno)); return false; } return true; } bool UnlockConsumer() { int _nError = pthread_mutex_unlock( &ConsumerLock ); if ( _nError ) { cout << "error in unlock consumer. return : " << _nError << ", err :" << strerror(errno) << endl; formatlog( LOG_CRIT, "error in unlock consumer. return(%d) ErrNo(%d) ErrMsg(%s)", _nError, errno, strerror(errno)); return false; } return true; } public: CPCQueue(){} virtual ~CPCQueue() { for ( class vector<T*>::iterator _iter = ProducerQueue.begin(); _iter != ProducerQueue.end(); _iter++) delete *_iter; for ( class vector<T*>::iterator _iter = ConsumerQueue.begin(); _iter != ConsumerQueue.end(); _iter++) delete *_iter; ProducerQueue.clear(); ConsumerQueue.clear(); } bool Init() { int _ErrorNo = pthread_mutex_init( &ProducerLock, NULL); if (_ErrorNo) return false; _ErrorNo = pthread_mutex_init( &ConsumerLock, NULL); if (_ErrorNo) return false; return true; } // Product에 하나를 추가한다 bool AddProduct( T* _Job ) { if ( false == LockProducer()) { // lock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__); return false; } ProducerQueue.push_back(_Job); if ( false == UnlockProducer()) { // unlock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__); return false; } return true; } // Consumer 쪽에서 하나씩 빼낸다. bool PopFromConsumer(T** _Value) { *_Value = NULL; if ( false == LockConsumer()) { // lock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__); return false; } if (!ConsumerQueue.empty()) { *_Value = ConsumerQueue.front(); ConsumerQueue.erase(ConsumerQueue.begin()); } if ( false == UnlockConsumer()) { // unlock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__); return false; } return true; } // 합할 것이 하나도 없는 경우 false를 반환한다 bool MoveProducerToConsumer(int& _MoveCount) { if ( false == LockConsumer()) { // lock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__); return false; } if ( false == LockProducer()) { // lock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Lock Fail. fatal issue.", __func__, __LINE__); return false; } _MoveCount = ProducerQueue.size(); if (!ProducerQueue.empty()) { for ( class vector<T*>::const_iterator _iter = ProducerQueue.begin(); _iter != ProducerQueue.end(); _iter++) ConsumerQueue.push_back(*_iter); ProducerQueue.clear(); } if ( false == UnlockProducer()) { // unlock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__); return false; } if ( false == UnlockConsumer()) { // unlock 실패 formatlog( LOG_CRIT, "CPCQueue::%s(%d) Unlock Fail. fatal issue.", __func__, __LINE__); return false; } return true; } int GetConsumerSize() const { return ConsumerQueue.size(); } }; //---------------------------------------------------------------------------------------------------------------------
class CDataThread: public CUbiThread { private: CPCQueue<COnePacket> PCQueue; int ThreadIndex; CURL* ctx; public: bool Terminated; public: CDataThread(); virtual ~CDataThread(); bool init(); void SetIndex(int _Index); bool AddProduct( COnePacket* _OnePacket); virtual void Run(); private: bool DoPacketProcess(COnePacket* _OnePacket); CUbiParser* MakeParser(COnePacket* _OnePacket); } //--------------------------------------------------------------------------------------------------------------------- CDataThread::CDataThread() { } //--------------------------------------------------------------------------------------------------------------------- CDataThread::~CDataThread() { } //--------------------------------------------------------------------------------------------------------------------- bool CDataThread::init() { return PCQueue.Init(); } //--------------------------------------------------------------------------------------------------------------------- void CDataThread::SetIndex(int _Index) { ThreadIndex = _Index; } //--------------------------------------------------------------------------------------------------------------------- bool CDataThread::AddProduct( COnePacket* _OnePacket ) { return PCQueue.AddProduct(_OnePacket); } //--------------------------------------------------------------------------------------------------------------------- void CDataThread::Run() { int _Count = 0; int _JobCount = 0; int _SleepMicro; COnePacket* _OnePacket = NULL; Terminated = false; while(!Terminated) { try { // Customer Queue에 Producer Queue의 내용을 가져오게 한다 if (!PCQueue.MoveProducerToConsumer(_Count)) formatlog( LOG_FATAL, "CDataThread::%s(%d) %10d Thread Error in Move PCQueue data", __func__, __LINE__, ThreadIndex); //, typeid(T).name()); else formatlog( LOG_INFO, "CDataThread::%s(%d) %10d Thread %2d %3d remain.", __func__, __LINE__, ThreadIndex, _Count, PCQueue.GetConsumerSize()); } catch(exception &_innerException) { formatlog( LOG_CRIT, "Exception on CDataThread::%s(%d). %2d Thread what(%s)", __func__, __LINE__, ThreadIndex, _innerException.what()); } // Customer Queue를 비울때 까지 작업을 처리 한다 _JobCount = 0; while(true) { _OnePacket = NULL; // 처리 간격에 일정 시간을 둔다 if (!PCQueue.PopFromConsumer(&_OnePacket)) { formatlog( LOG_FATAL, "CDataThread::%s(%d). %2d Thread Error in Get Data From PCQueue", __func__, __LINE__, ThreadIndex); break; } if ( NULL == _OnePacket) break; //1 실질적인 패킷 처리!! DoPacketProcess(_OnePacket); _JobCount++; delete _OnePacket; } usleep(500000 ); } }
'프로그래밍??? > C/C++' 카테고리의 다른 글
Dual ABI 이슈 - GCC5 (0) | 2016.08.05 |
---|---|
POCO Library link 이슈 - GCC 5.4.0 업버전 후 생긴 문제 (0) | 2016.08.04 |
Bad cast exception on poco-library when I tried to cast Int64 (0) | 2015.07.30 |
Boost를 활용해 공유 메모리에 STL Container 사용하는 방법 (0) | 2015.07.10 |
Creating STL Containers in Shared Memory (0) | 2015.07.10 |