version 1.153, 2008/12/01 17:49:52
|
version 1.159, 2008/12/16 18:56:00
|
|
|
#include <Pegasus/Common/Tracer.h> | #include <Pegasus/Common/Tracer.h> |
#include <Pegasus/Common/MessageLoader.h> | #include <Pegasus/Common/MessageLoader.h> |
| |
|
PEGASUS_USING_STD; |
|
|
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
cimom *MessageQueueService::_meta_dispatcher = 0; | cimom *MessageQueueService::_meta_dispatcher = 0; |
|
|
void* parm) | void* parm) |
{ | { |
Thread *myself = reinterpret_cast<Thread *>(parm); | Thread *myself = reinterpret_cast<Thread *>(parm); |
List<MessageQueueService, Mutex> *list = |
MessageQueueService::PollingList *list = |
reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm()); |
reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm()); |
| |
|
try |
|
{ |
while (_stop_polling.get() == 0) | while (_stop_polling.get() == 0) |
{ | { |
_polling_sem.wait(); | _polling_sem.wait(); |
|
|
// processing the _polling_list | // processing the _polling_list |
// (e.g., MessageQueueServer::~MessageQueueService). | // (e.g., MessageQueueServer::~MessageQueueService). |
| |
list->lock(); |
_polling_list_mutex.lock(); |
MessageQueueService *service = list->front(); | MessageQueueService *service = list->front(); |
ThreadStatus rtn = PEGASUS_THREAD_OK; | ThreadStatus rtn = PEGASUS_THREAD_OK; |
while (service != NULL) | while (service != NULL) |
|
|
// lock and has ownership of the service object. | // lock and has ownership of the service object. |
| |
service->_threads++; | service->_threads++; |
try |
|
{ |
|
rtn = _thread_pool->allocate_and_awaken( | rtn = _thread_pool->allocate_and_awaken( |
service, _req_proc, &_polling_sem); | service, _req_proc, &_polling_sem); |
} |
|
catch (...) |
|
{ |
|
service->_threads--; |
|
|
|
// allocate_and_awaken should never generate an exception. |
|
PEGASUS_ASSERT(0); |
|
} |
|
// if no more threads available, break from processing loop | // if no more threads available, break from processing loop |
if (rtn != PEGASUS_THREAD_OK ) | if (rtn != PEGASUS_THREAD_OK ) |
{ | { |
|
|
service->_threads.get())); | service->_threads.get())); |
| |
Threads::yield(); | Threads::yield(); |
service = NULL; |
break; |
} | } |
} | } |
if (service != NULL) |
|
{ |
|
service = list->next_of(service); | service = list->next_of(service); |
} | } |
|
_polling_list_mutex.unlock(); |
} | } |
list->unlock(); |
|
} | } |
|
catch(const Exception &e) |
|
{ |
|
PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1, |
|
"Exception caught in MessageQueueService::polling_routine : %s", |
|
(const char*)e.getMessage().getCString())); |
|
} |
|
catch(const exception &e) |
|
{ |
|
PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1, |
|
"Exception caught in MessageQueueService::polling_routine : %s", |
|
e.what())); |
|
} |
|
catch(...) |
|
{ |
|
PEG_TRACE_CSTRING(TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1, |
|
"Unknown Exception caught in MessageQueueService::polling_routine"); |
|
} |
|
|
|
PEGASUS_ASSERT(_stop_polling.get()); |
|
|
return ThreadReturnType(0); | return ThreadReturnType(0); |
} | } |
| |
|
|
} | } |
_service_count++; | _service_count++; |
| |
_get_polling_list()->insert_back(this); |
// Add to the polling list |
|
if (!_polling_list) |
|
{ |
|
_polling_list = new PollingList; |
|
} |
|
_polling_list->insert_back(this); |
|
_meta_dispatcher->registerCIMService(this); |
} | } |
| |
| |
MessageQueueService::~MessageQueueService() | MessageQueueService::~MessageQueueService() |
{ | { |
|
|
// Close incoming queue. | // Close incoming queue. |
if (_incoming_queue_shutdown.get() == 0) | if (_incoming_queue_shutdown.get() == 0) |
{ | { |
|
|
// die now. | // die now. |
_die = 1; | _die = 1; |
| |
|
_meta_dispatcher->deregisterCIMService(this); |
|
|
// Wait until all threads processing the messages | // Wait until all threads processing the messages |
// for this service have completed. | // for this service have completed. |
while (_threads.get() > 0) | while (_threads.get() > 0) |
|
|
Threads::yield(); | Threads::yield(); |
} | } |
| |
|
|
// The polling_routine locks the _polling_list while | // The polling_routine locks the _polling_list while |
// processing the incoming messages for services on the | // processing the incoming messages for services on the |
// list. Deleting the service from the _polling_list | // list. Deleting the service from the _polling_list |
|
|
| |
{ | { |
AutoMutex autoMut(_meta_dispatcher_mutex); | AutoMutex autoMut(_meta_dispatcher_mutex); |
|
|
_service_count--; | _service_count--; |
// If we are last service to die, delete metadispatcher. | // If we are last service to die, delete metadispatcher. |
if (_service_count.get() == 0) | if (_service_count.get() == 0) |
|
|
delete _thread_pool; | delete _thread_pool; |
_thread_pool = 0; | _thread_pool = 0; |
} | } |
} // mutex unlocks here |
} |
| |
// Clean up any extra stuff on the queue. | // Clean up any extra stuff on the queue. |
AsyncOpNode* op = 0; | AsyncOpNode* op = 0; |
|
|
| |
Boolean MessageQueueService::accept_async(AsyncOpNode* op) | Boolean MessageQueueService::accept_async(AsyncOpNode* op) |
{ | { |
|
if (!_isRunning) |
|
{ |
|
// Don't accept any messages other than start. |
|
if (op->_request.get()->getType() != ASYNC_CIMSERVICE_START) |
|
{ |
|
return false; |
|
} |
|
} |
|
|
if (_incoming_queue_shutdown.get() > 0) | if (_incoming_queue_shutdown.get() > 0) |
return false; | return false; |
|
|
if (_polling_thread == NULL) | if (_polling_thread == NULL) |
{ | { |
|
PEGASUS_ASSERT(_polling_list); |
_polling_thread = new Thread( | _polling_thread = new Thread( |
polling_routine, | polling_routine, |
reinterpret_cast<void *>(_get_polling_list()), |
reinterpret_cast<void *>(_polling_list), |
false); | false); |
ThreadStatus tr = PEGASUS_THREAD_OK; | ThreadStatus tr = PEGASUS_THREAD_OK; |
while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK) | while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK) |
|
|
return queue->getQueueId(); | return queue->getQueueId(); |
} | } |
| |
MessageQueueService::PollingList* MessageQueueService::_get_polling_list() |
|
{ |
|
_polling_list_mutex.lock(); |
|
|
|
if (!_polling_list) |
|
_polling_list = new PollingList; |
|
|
|
_polling_list_mutex.unlock(); |
|
|
|
return _polling_list; |
|
} |
|
|
|
void MessageQueueService::_removeFromPollingList(MessageQueueService *service) | void MessageQueueService::_removeFromPollingList(MessageQueueService *service) |
{ | { |
_polling_list_mutex.lock(); | _polling_list_mutex.lock(); |