version 1.38, 2002/04/11 13:15:59
|
version 1.39, 2002/04/23 17:17:22
|
|
|
| |
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
_pending(true), |
|
_incoming(true, 1000), | _incoming(true, 1000), |
|
_callback(true), |
_incoming_queue_shutdown(0), | _incoming_queue_shutdown(0), |
_req_thread(_req_proc, this, false) |
_callback_ready(0), |
|
_req_thread(_req_proc, this, false), |
|
_callback_thread(_callback_proc, this, false) |
{ | { |
_capabilities = (capabilities | module_capabilities::async); | _capabilities = (capabilities | module_capabilities::async); |
| |
|
|
} | } |
| |
_meta_dispatcher_mutex.unlock(); | _meta_dispatcher_mutex.unlock(); |
|
// _callback_thread.run(); |
| |
_req_thread.run(); | _req_thread.run(); |
} | } |
|
|
_req_thread.join(); | _req_thread.join(); |
} | } |
| |
|
_callback_ready.signal(); |
|
_callback_thread.join(); |
|
|
_meta_dispatcher_mutex.lock(pegasus_thread_self()); | _meta_dispatcher_mutex.lock(pegasus_thread_self()); |
_service_count--; | _service_count--; |
if (_service_count.value() == 0 ) | if (_service_count.value() == 0 ) |
|
|
} | } |
| |
| |
|
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm) |
|
{ |
|
Thread *myself = reinterpret_cast<Thread *>(parm); |
|
MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm()); |
|
AsyncOpNode *operation = 0; |
|
|
|
while ( service->_die.value() == 0 ) |
|
{ |
|
service->_callback_ready.wait(); |
|
|
|
service->_callback.lock(); |
|
operation = service->_callback.next(0); |
|
while( operation != NULL) |
|
{ |
|
if( ASYNC_OPSTATE_COMPLETE & operation->read_state()) |
|
{ |
|
operation = service->_callback.remove_no_lock(operation); |
|
PEGASUS_ASSERT(operation != NULL); |
|
operation->_thread_ptr = myself; |
|
operation->_service_ptr = service; |
|
service->_handle_async_callback(operation); |
|
break; |
|
} |
|
operation = service->_callback.next(operation); |
|
} |
|
service->_callback.unlock(); |
|
} |
|
myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); |
|
return(0); |
|
} |
|
|
| |
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) |
{ | { |
|
|
{ | { |
break; | break; |
} | } |
|
catch(Deadlock & ) |
|
{ |
|
cout << "Deadlock Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
catch(AlreadyLocked & ) |
|
{ |
|
cout << "Already Locked Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(TimeOut & ) |
|
{ |
|
cout << "TimeOut Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(Permission & ) |
|
{ |
|
cout << "Permission Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(WaitFailed & ) |
|
{ |
|
cout << "WaitFailed Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(TooManyReaders & ) |
|
{ |
|
cout << "TooManyReaders Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(ListFull & ) |
|
{ |
|
cout << "ListFull Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(ListClosed & ) |
|
{ |
|
cout << "ListClosed Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(ModuleClosed & ) |
|
{ |
|
cout << "ModuleClosed Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
|
catch(IPCException & ) |
|
{ |
|
// << Tue Apr 23 10:21:17 2002 mdd >> |
|
cout << "IPC Exception in MessageQueueService::_req_proc" << endl; |
|
abort(); |
|
} |
|
|
if( operation ) | if( operation ) |
{ | { |
operation->_thread_ptr = myself; | operation->_thread_ptr = myself; |
|
|
// including op, op->_callback_node, and op->_callback_ptr | // including op, op->_callback_node, and op->_callback_ptr |
void MessageQueueService::_handle_async_callback(AsyncOpNode *op) | void MessageQueueService::_handle_async_callback(AsyncOpNode *op) |
{ | { |
|
if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK ) |
|
{ |
|
|
|
Message *msg = op->get_request(); |
|
if( msg && ( msg->getMask() & message_mask::ha_async)) |
|
{ |
|
if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START ) |
|
{ |
|
AsyncLegacyOperationStart *wrapper = |
|
static_cast<AsyncLegacyOperationStart *>(msg); |
|
msg = wrapper->get_action(); |
|
delete wrapper; |
|
} |
|
else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START) |
|
{ |
|
AsyncModuleOperationStart *wrapper = |
|
static_cast<AsyncModuleOperationStart *>(msg); |
|
msg = wrapper->get_action(); |
|
delete wrapper; |
|
} |
|
else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START) |
|
{ |
|
AsyncModuleOperationStart *wrapper = |
|
static_cast<AsyncModuleOperationStart *>(msg); |
|
msg = wrapper->get_action(); |
|
delete wrapper; |
|
} |
|
else if (msg->getType() == async_messages::ASYNC_OP_START) |
|
{ |
|
AsyncOperationStart *wrapper = |
|
static_cast<AsyncOperationStart *>(msg); |
|
msg = wrapper->get_action(); |
|
delete wrapper; |
|
} |
|
delete msg; |
|
} |
|
|
|
msg = op->get_response(); |
|
if( msg && ( msg->getMask() & message_mask::ha_async)) |
|
{ |
|
if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT ) |
|
{ |
|
AsyncLegacyOperationResult *wrapper = |
|
static_cast<AsyncLegacyOperationResult *>(msg); |
|
msg = wrapper->get_result(); |
|
delete wrapper; |
|
} |
|
else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT) |
|
{ |
|
AsyncModuleOperationResult *wrapper = |
|
static_cast<AsyncModuleOperationResult *>(msg); |
|
msg = wrapper->get_result(); |
|
delete wrapper; |
|
} |
|
} |
|
void (*callback)(Message *, void *, void *) = op->__async_callback; |
|
void *handle = op->_callback_handle; |
|
void *parm = op->_callback_parameter; |
|
op->release(); |
|
return_op(op); |
|
callback(msg, handle, parm); |
|
} |
|
else if( op->_flags & ASYNC_OPFLAGS_CALLBACK ) |
|
{ |
// note that _callback_node may be different from op | // note that _callback_node may be different from op |
// op->_callback_response_q is a "this" pointer we can use for | // op->_callback_response_q is a "this" pointer we can use for |
// static callback methods | // static callback methods |
op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr); | op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr); |
} | } |
|
} |
| |
| |
void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) | void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) |
|
|
rq = operation->_request.remove_first() ; | rq = operation->_request.remove_first() ; |
operation->unlock(); | operation->unlock(); |
// delete the op node | // delete the op node |
delete operation; |
operation->release(); |
|
return_op( operation); |
| |
handleEnqueue(rq); | handleEnqueue(rq); |
return; | return; |
} | } |
| |
if ( operation->_flags & ASYNC_OPFLAGS_CALLBACK && |
if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || |
|
operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && |
(operation->_state & ASYNC_OPSTATE_COMPLETE)) | (operation->_state & ASYNC_OPSTATE_COMPLETE)) |
{ | { |
operation->unlock(); | operation->unlock(); |
|
|
} | } |
| |
| |
|
Boolean MessageQueueService::SendAsync(Message *msg, |
|
Uint32 destination, |
|
void (*callback)(Message *response, |
|
void *handle, |
|
void *parameter), |
|
void *handle, |
|
void *parameter) |
|
{ |
|
if(msg == NULL) |
|
return false; |
|
if(callback == NULL) |
|
return SendForget(msg); |
|
AsyncOpNode *op = get_op(); |
|
if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest))) |
|
{ |
|
op->release(); |
|
return_op(op); |
|
return false; |
|
} |
|
op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK; |
|
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); |
|
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
op->__async_callback = callback; |
|
op->_callback_node = op; |
|
op->_callback_handle = handle; |
|
op->_callback_parameter = parameter; |
|
op->_callback_response_q = this; |
|
|
|
|
|
if( ! (msg->getMask() & message_mask::ha_async) ) |
|
{ |
|
AsyncLegacyOperationStart *wrapper = |
|
new AsyncLegacyOperationStart(get_next_xid(), |
|
op, |
|
destination, |
|
msg, |
|
destination); |
|
msg = static_cast<Message *>(wrapper); |
|
} |
|
else |
|
{ |
|
op->_request.insert_first(msg); |
|
(static_cast<AsyncMessage *>(msg))->op = op; |
|
} |
|
|
|
op->_callback_notify = &_callback_ready; |
|
_callback.insert_last(op); |
|
|
|
return _meta_dispatcher->route_async(op); |
|
} |
|
|
|
|
Boolean MessageQueueService::SendForget(Message *msg) | Boolean MessageQueueService::SendForget(Message *msg) |
{ | { |
| |
|
|
if (mask & message_mask::ha_async) | if (mask & message_mask::ha_async) |
(static_cast<AsyncMessage *>(msg))->op = op; | (static_cast<AsyncMessage *>(msg))->op = op; |
} | } |
op->lock(); |
|
op->_op_dest = MessageQueue::lookup(msg->dest); | op->_op_dest = MessageQueue::lookup(msg->dest); |
op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; | op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); | op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); |
op->_state &= ~ASYNC_OPSTATE_COMPLETE; | op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
op->unlock(); |
|
if ( op->_op_dest == 0 ) | if ( op->_op_dest == 0 ) |
|
{ |
|
op->release(); |
|
return_op(op); |
return false; | return false; |
|
} |
| |
// now see if the meta dispatcher will take it | // now see if the meta dispatcher will take it |
return _meta_dispatcher->route_async(op); | return _meta_dispatcher->route_async(op); |
|
|
return_op(request->op); | return_op(request->op); |
request->op = 0; | request->op = 0; |
} | } |
|
|
return rpl; | return rpl; |
} | } |
| |