version 1.32, 2002/03/11 02:36:18
|
version 1.34, 2002/03/12 22:59:35
|
|
|
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
_pending(true), | _pending(true), |
_incoming(true, 100 ), |
_incoming(true, 1000), |
_incoming_queue_shutdown(0), | _incoming_queue_shutdown(0), |
_req_thread(_req_proc, this, false) | _req_thread(_req_proc, this, false) |
{ | { |
|
|
if (_incoming_queue_shutdown.value() == 0 ) | if (_incoming_queue_shutdown.value() == 0 ) |
{ | { |
_shutdown_incoming_queue(); | _shutdown_incoming_queue(); |
|
|
//_incoming.shutdown_queue(); |
|
_req_thread.join(); | _req_thread.join(); |
} | } |
| |
|
|
} | } |
if( operation ) | if( operation ) |
{ | { |
|
operation->_thread_ptr = myself; |
service->_handle_incoming_operation(operation, myself, service); |
operation->_service_ptr = service; |
|
service->_handle_incoming_operation(operation); |
} | } |
} | } |
| |
|
|
return(0); | return(0); |
} | } |
| |
|
void MessageQueueService::_sendwait_callback(AsyncOpNode *op, |
|
MessageQueue *q, |
|
void *parm) |
|
{ |
|
op->_client_sem.signal(); |
|
} |
|
|
| |
// callback function is responsible for cleaning up all resources | // callback function is responsible for cleaning up all resources |
// including op, op->_callback_node, and op->_callback_ptr | // including op, op->_callback_node, and op->_callback_ptr |
|
|
} | } |
| |
| |
void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation, |
void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) |
Thread *thread, |
// Thread *thread, |
MessageQueue *queue) |
// MessageQueue *queue) |
{ | { |
if ( operation != 0 ) | if ( operation != 0 ) |
{ | { |
|
|
// ATTN: optimization | // ATTN: optimization |
// << Wed Mar 6 15:00:39 2002 mdd >> | // << Wed Mar 6 15:00:39 2002 mdd >> |
// put thread and queue into the asyncopnode structure. | // put thread and queue into the asyncopnode structure. |
(static_cast<AsyncMessage *>(rq))->_myself = thread; |
// (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr; |
(static_cast<AsyncMessage *>(rq))->_service = queue; |
// (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr; |
|
// done << Tue Mar 12 14:49:07 2002 mdd >> |
operation->unlock(); | operation->unlock(); |
_handle_async_request(static_cast<AsyncRequest *>(rq)); | _handle_async_request(static_cast<AsyncRequest *>(rq)); |
} | } |
|
|
case AsyncIoctl::IO_CLOSE: | case AsyncIoctl::IO_CLOSE: |
{ | { |
// save my bearings | // save my bearings |
Thread *myself = req->_myself; |
Thread *myself = req->op->_thread_ptr; |
MessageQueueService *service = static_cast<MessageQueueService *>(req->_service); |
MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr); |
| |
// respond to this message. | // respond to this message. |
_make_response(req, async_results::OK); | _make_response(req, async_results::OK); |
|
|
} | } |
if( operation ) | if( operation ) |
{ | { |
service->_handle_incoming_operation(operation, myself, service); |
operation->_thread_ptr = myself; |
|
operation->_service_ptr = service; |
|
service->_handle_incoming_operation(operation); |
} | } |
else | else |
break; | break; |
|
|
destroy_op = true; | destroy_op = true; |
} | } |
| |
request->block = true; |
request->block = false; |
request->op->lock(); |
|
request->op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK; |
|
| |
request->op->_op_dest = MessageQueue::lookup(request->dest); |
SendAsync(request->op, |
request->op->unlock(); |
request->dest, |
|
_sendwait_callback, |
|
this, |
|
(void *)0); |
| |
if ( request->op->_op_dest == 0 ) |
|
return 0; |
|
|
|
// now see if the meta dispatcher will take it |
|
if (true == _meta_dispatcher->route_async(request->op)) |
|
{ |
|
request->op->_client_sem.wait(); | request->op->_client_sem.wait(); |
PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE); |
|
} |
|
|
|
request->op->lock(); | request->op->lock(); |
AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first()); | AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first()); |
rpl->op = 0; | rpl->op = 0; |