version 1.119, 2006/01/30 16:17:05
|
version 1.119.12.2, 2006/06/29 21:02:17
|
|
|
| |
cimom *MessageQueueService::_meta_dispatcher = 0; | cimom *MessageQueueService::_meta_dispatcher = 0; |
AtomicInt MessageQueueService::_service_count(0); | AtomicInt MessageQueueService::_service_count(0); |
AtomicInt MessageQueueService::_xid(1); |
Mutex MessageQueueService::_xidMutex; |
|
Uint32 MessageQueueService::_xid = 1; |
Mutex MessageQueueService::_meta_dispatcher_mutex; | Mutex MessageQueueService::_meta_dispatcher_mutex; |
| |
static struct timeval deallocateWait = {300, 0}; | static struct timeval deallocateWait = {300, 0}; |
| |
ThreadPool *MessageQueueService::_thread_pool = 0; | ThreadPool *MessageQueueService::_thread_pool = 0; |
| |
DQueue<MessageQueueService> MessageQueueService::_polling_list(true); |
List<MessageQueueService, RMutex> MessageQueueService::_polling_list; |
| |
Thread* MessageQueueService::_polling_thread = 0; | Thread* MessageQueueService::_polling_thread = 0; |
| |
|
|
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) |
{ | { |
Thread *myself = reinterpret_cast<Thread *>(parm); | Thread *myself = reinterpret_cast<Thread *>(parm); |
DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm()); |
List<MessageQueueService, RMutex> *list = |
|
reinterpret_cast<List<MessageQueueService, RMutex>*>(myself->get_parm()); |
|
|
while (_stop_polling.get() == 0) | while (_stop_polling.get() == 0) |
{ | { |
_polling_sem.wait(); | _polling_sem.wait(); |
|
|
// (e.g., MessageQueueServer::~MessageQueueService). | // (e.g., MessageQueueServer::~MessageQueueService). |
| |
list->lock(); | list->lock(); |
MessageQueueService *service = list->next(0); |
MessageQueueService *service = list->front(); |
ThreadStatus rtn = PEGASUS_THREAD_OK; | ThreadStatus rtn = PEGASUS_THREAD_OK; |
while (service != NULL) | while (service != NULL) |
{ | { |
|
|
} | } |
if (service != NULL) | if (service != NULL) |
{ | { |
service = list->next(service); |
service = list->next_of(service); |
} | } |
} | } |
list->unlock(); | list->unlock(); |
|
|
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
_threads(0), | _threads(0), |
_incoming(true, 0), |
_incoming(0), |
_incoming_queue_shutdown(0) | _incoming_queue_shutdown(0) |
{ | { |
| |
|
|
throw BindFailedException(parms); | throw BindFailedException(parms); |
} | } |
| |
_polling_list.insert_last(this); |
_polling_list.insert_back(this); |
| |
} | } |
| |
|
|
// prior to processing, avoids synchronization issues | // prior to processing, avoids synchronization issues |
// with the _polling_routine. | // with the _polling_routine. |
| |
|
// ATTN: added to prevent assertion in List in which the list does not |
|
// contain this element. |
_polling_list.remove(this); | _polling_list.remove(this); |
| |
// ATTN: The code for closing the _incoming queue | // ATTN: The code for closing the _incoming queue |
|
|
while (_incoming.count()) | while (_incoming.count()) |
{ | { |
try { | try { |
delete _incoming.remove_first(); |
delete _incoming.dequeue(); |
} catch (const ListClosed &e) | } catch (const ListClosed &e) |
{ | { |
// If the list is closed, there is nothing we can do. | // If the list is closed, there is nothing we can do. |
|
|
msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE; | msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
| |
msg->op->_op_dest = this; | msg->op->_op_dest = this; |
msg->op->_request.insert_first(msg); |
msg->op->_request.insert_front(msg); |
try { | try { |
_incoming.insert_last_wait(msg->op); |
_incoming.enqueue_wait(msg->op); |
_polling_sem.signal(); | _polling_sem.signal(); |
} catch (const ListClosed &) | } catch (const ListClosed &) |
{ | { |
|
|
{ | { |
try | try |
{ | { |
operation = service->_incoming.remove_first(); |
operation = service->_incoming.dequeue(); |
} | } |
catch (ListClosed &) | catch (ListClosed &) |
{ | { |
|
|
// << Tue Feb 19 14:10:38 2002 mdd >> | // << Tue Feb 19 14:10:38 2002 mdd >> |
operation->lock(); | operation->lock(); |
| |
Message *rq = operation->_request.next(0); |
Message *rq = operation->_request.front(); |
| |
// optimization <<< Thu Mar 7 21:04:05 2002 mdd >>> | // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>> |
// move this to the bottom of the loop when the majority of | // move this to the bottom of the loop when the majority of |
|
|
// divert legacy messages to handleEnqueue | // divert legacy messages to handleEnqueue |
if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async))) | if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async))) |
{ | { |
rq = operation->_request.remove_first() ; |
rq = operation->_request.remove_front() ; |
operation->unlock(); | operation->unlock(); |
// delete the op node | // delete the op node |
operation->release(); | operation->release(); |
|
|
// ATTN optimization remove the message checking altogether in the base | // ATTN optimization remove the message checking altogether in the base |
// << Mon Feb 18 14:02:20 2002 mdd >> | // << Mon Feb 18 14:02:20 2002 mdd >> |
op->lock(); | op->lock(); |
Message *rq = op->_request.next(0); |
Message *rq = op->_request.front(); |
Message *rp = op->_response.next(0); |
Message *rp = op->_response.front(); |
op->unlock(); | op->unlock(); |
| |
if ((rq != 0 && (true == messageOK(rq))) || | if ((rq != 0 && (true == messageOK(rq))) || |
(rp != 0 && (true == messageOK(rp))) && _die.get() == 0) | (rp != 0 && (true == messageOK(rp))) && _die.get() == 0) |
{ | { |
_incoming.insert_last_wait(op); |
_incoming.enqueue_wait(op); |
_polling_sem.signal(); | _polling_sem.signal(); |
return true; | return true; |
} | } |
|
|
AsyncOpNode *operation; | AsyncOpNode *operation; |
try | try |
{ | { |
operation = service->_incoming.remove_first(); |
operation = service->_incoming.dequeue(); |
} | } |
catch(IPCException &) | catch(IPCException &) |
{ | { |
|
|
break; | break; |
} // message processing loop | } // message processing loop |
| |
// shutdown the AsyncDQueue |
// shutdown the AsyncQueue |
service->_incoming.shutdown_queue(); | service->_incoming.shutdown_queue(); |
return; | return; |
} | } |
|
|
} | } |
else | else |
{ | { |
op->_request.insert_first(msg); |
op->_request.insert_front(msg); |
(static_cast<AsyncMessage *>(msg))->op = op; | (static_cast<AsyncMessage *>(msg))->op = op; |
} | } |
return _meta_dispatcher->route_async(op); | return _meta_dispatcher->route_async(op); |
|
|
if (op == 0) | if (op == 0) |
{ | { |
op = get_op(); | op = get_op(); |
op->_request.insert_first(msg); |
op->_request.insert_front(msg); |
if (mask & message_mask::ha_async) | if (mask & message_mask::ha_async) |
{ | { |
(static_cast<AsyncMessage *>(msg))->op = op; | (static_cast<AsyncMessage *>(msg))->op = op; |
|
|
if (request->op == 0) | if (request->op == 0) |
{ | { |
request->op = get_op(); | request->op = get_op(); |
request->op->_request.insert_first(request); |
request->op->_request.insert_front(request); |
destroy_op = true; | destroy_op = true; |
} | } |
| |
|
|
request->op->_client_sem.wait(); | request->op->_client_sem.wait(); |
| |
request->op->lock(); | request->op->lock(); |
AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first()); |
AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_front()); |
rpl->op = 0; | rpl->op = 0; |
request->op->unlock(); | request->op->unlock(); |
| |
|
|
| |
Uint32 MessageQueueService::get_next_xid() | Uint32 MessageQueueService::get_next_xid() |
{ | { |
static Mutex _monitor; |
AutoMutex autoMut(_xidMutex); |
Uint32 value; |
return ++_xid; |
AutoMutex autoMut(_monitor); |
|
_xid++; |
|
value = _xid.get(); |
|
return value; |
|
|
|
} | } |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |