version 1.106, 2005/05/18 15:56:51
|
version 1.107, 2005/05/27 02:34:02
|
|
|
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
cimom *MessageQueueService::_meta_dispatcher = 0; | cimom *MessageQueueService::_meta_dispatcher = 0; |
AtomicInt MessageQueueService::_service_count = 0; | AtomicInt MessageQueueService::_service_count = 0; |
AtomicInt MessageQueueService::_xid(1); | AtomicInt MessageQueueService::_xid(1); |
|
|
MessageQueueService *service = list->next(0); | MessageQueueService *service = list->next(0); |
while(service != NULL) | while(service != NULL) |
{ | { |
if (service->_incoming.count() > 0) |
if (service->_incoming.count() > 0 && service->_die.value() == 0) |
{ | { |
_thread_pool->allocate_and_awaken(service, _req_proc); | _thread_pool->allocate_and_awaken(service, _req_proc); |
} | } |
|
|
: Base(name, true, queueID), | : Base(name, true, queueID), |
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
|
_threads(0), |
_incoming(true, 0), | _incoming(true, 0), |
_incoming_queue_shutdown(0) | _incoming_queue_shutdown(0) |
{ | { |
|
|
if (_incoming_queue_shutdown.value() == 0) | if (_incoming_queue_shutdown.value() == 0) |
{ | { |
_shutdown_incoming_queue(); | _shutdown_incoming_queue(); |
|
|
} | } |
| |
|
while (_threads.value() > 0) |
|
{ |
|
pegasus_yield(); |
|
} |
|
_polling_list.remove(this); |
{ | { |
AutoMutex autoMut(_meta_dispatcher_mutex); | AutoMutex autoMut(_meta_dispatcher_mutex); |
_service_count--; | _service_count--; |
|
|
_thread_pool = 0; | _thread_pool = 0; |
} | } |
} // mutex unlocks here | } // mutex unlocks here |
_polling_list.remove(this); |
|
// Clean up in case there are extra stuff on the queue. | // Clean up in case there are extra stuff on the queue. |
while (_incoming.count()) | while (_incoming.count()) |
{ | { |
|
|
msg->op->_request.insert_first(msg); | msg->op->_request.insert_first(msg); |
| |
_incoming.insert_last_wait(msg->op); | _incoming.insert_last_wait(msg->op); |
|
_polling_sem.signal(); |
} | } |
| |
| |
|
|
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc( | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc( |
void * parm) | void * parm) |
{ | { |
try |
|
{ |
|
MessageQueueService* service = | MessageQueueService* service = |
reinterpret_cast<MessageQueueService*>(parm); | reinterpret_cast<MessageQueueService*>(parm); |
PEGASUS_ASSERT(service != 0); | PEGASUS_ASSERT(service != 0); |
|
try |
|
{ |
| |
if (service->_die.value() != 0) | if (service->_die.value() != 0) |
{ | { |
return (0); | return (0); |
} | } |
|
service->_threads++; |
// pull messages off the incoming queue and dispatch them. then | // pull messages off the incoming queue and dispatch them. then |
// check pending messages that are non-blocking | // check pending messages that are non-blocking |
AsyncOpNode *operation = 0; | AsyncOpNode *operation = 0; |
|
|
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, | PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
"Caught unrecognized exception. Exiting _req_proc."); | "Caught unrecognized exception. Exiting _req_proc."); |
} | } |
|
service->_threads--; |
return(0); | return(0); |
} | } |
| |