version 1.31, 2002/03/08 01:57:42
|
version 1.32, 2002/03/11 02:36:18
|
|
|
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
_pending(true), | _pending(true), |
_incoming(true, 1000), |
_incoming(true, 100 ), |
_incoming_queue_shutdown(0), | _incoming_queue_shutdown(0), |
_req_thread(_req_proc, this, false) | _req_thread(_req_proc, this, false) |
{ | { |
|
|
_die = 1; | _die = 1; |
if (_incoming_queue_shutdown.value() == 0 ) | if (_incoming_queue_shutdown.value() == 0 ) |
{ | { |
_incoming.shutdown_queue(); |
_shutdown_incoming_queue(); |
|
|
|
//_incoming.shutdown_queue(); |
_req_thread.join(); | _req_thread.join(); |
} | } |
| |
|
|
0); | 0); |
| |
msg->op = get_op(); | msg->op = get_op(); |
msg->op->_request.insert_first(msg); |
msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
msg->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK; |
|
|
msg->op->_op_dest = this; | msg->op->_op_dest = this; |
|
msg->op->_request.insert_first(msg); |
| |
_incoming.insert_last_wait(msg->op); | _incoming.insert_last_wait(msg->op); |
msg->op->_client_sem.wait(); | msg->op->_client_sem.wait(); |
|
|
msg->op->unlock(); | msg->op->unlock(); |
delete reply; | delete reply; |
| |
|
|
msg->op->_request.remove(msg); | msg->op->_request.remove(msg); |
msg->op->_state |= ASYNC_OPSTATE_RELEASED; | msg->op->_state |= ASYNC_OPSTATE_RELEASED; |
return_op(msg->op); | return_op(msg->op); |
|
|
void MessageQueueService::_handle_async_callback(AsyncOpNode *op) | void MessageQueueService::_handle_async_callback(AsyncOpNode *op) |
{ | { |
// note that _callback_node may be different from op | // note that _callback_node may be different from op |
// op->_callback_q is a "this" pointer we can use for static callback methods |
// op->_callback_response_q is a "this" pointer we can use for |
op->_async_callback(op->_callback_node, op->_callback_q, op->_callback_ptr); |
// static callback methods |
|
op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr); |
} | } |
| |
| |
|
|
return; | return; |
} | } |
| |
if ( operation->_state & ASYNC_OPFLAGS_CALLBACK && |
if ( operation->_flags & ASYNC_OPFLAGS_CALLBACK && |
(operation->_state & ASYNC_OPSTATE_COMPLETE)) | (operation->_state & ASYNC_OPSTATE_COMPLETE)) |
{ | { |
operation->unlock(); | operation->unlock(); |
|
|
} | } |
| |
| |
|
void MessageQueueService::_complete_op_node(AsyncOpNode *op, |
|
Uint32 state, |
|
Uint32 flag, |
|
Uint32 code) |
|
{ |
|
cimom::_complete_op_node(op, state, flag, code); |
|
} |
|
|
| |
Boolean MessageQueueService::accept_async(AsyncOpNode *op) | Boolean MessageQueueService::accept_async(AsyncOpNode *op) |
{ | { |
|
|
_incoming.insert_last_wait(op); | _incoming.insert_last_wait(op); |
return true; | return true; |
} | } |
|
// else |
|
// { |
|
// if( (rq != 0 && (true == MessageQueueService::messageOK(rq))) || |
|
// (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) && |
|
// _die.value() == 0) |
|
// { |
|
// MessageQueueService::_incoming.insert_last_wait(op); |
|
// return true; |
|
// } |
|
// } |
|
|
return false; | return false; |
} | } |
| |
|
|
// get the queue handle for the destination | // get the queue handle for the destination |
| |
op->lock(); | op->lock(); |
op->_op_dest = MessageQueue::lookup(destination); |
op->_op_dest = MessageQueue::lookup(destination); // destination of this message |
op->_flags |= ASYNC_OPFLAGS_CALLBACK; | op->_flags |= ASYNC_OPFLAGS_CALLBACK; |
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); | op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); |
op->_state &= ~ASYNC_OPSTATE_COMPLETE; | op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
// initialize the callback data | // initialize the callback data |
op->_async_callback = callback; |
op->_async_callback = callback; // callback function to be executed by recpt. of response |
op->_callback_node = op; |
op->_callback_node = op; // the op node |
op->_callback_response_q = callback_response_q; |
op->_callback_response_q = callback_response_q; // the queue that will receive the response |
op->_callback_ptr = callback_ptr; |
op->_callback_ptr = callback_ptr; // user data for callback |
op->_callback_q = this; |
op->_callback_request_q = this; // I am the originator of this request |
| |
op->unlock(); | op->unlock(); |
if(op->_op_dest == 0) | if(op->_op_dest == 0) |