version 1.122, 2006/07/23 18:02:33
|
version 1.123, 2006/07/26 20:34:00
|
|
|
msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE; | msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
| |
msg->op->_op_dest = this; | msg->op->_op_dest = this; |
msg->op->_request.insert_front(msg); |
msg->op->_request.reset(msg); |
try { | try { |
_incoming.enqueue_wait(msg->op); | _incoming.enqueue_wait(msg->op); |
_polling_sem.signal(); | _polling_sem.signal(); |
|
|
if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) | if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) |
{ | { |
| |
Message *msg = op->get_request(); |
Message *msg = op->removeRequest(); |
if (msg && (msg->getMask() & message_mask::ha_async)) | if (msg && (msg->getMask() & message_mask::ha_async)) |
{ | { |
if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START) | if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START) |
|
|
delete msg; | delete msg; |
} | } |
| |
msg = op->get_response(); |
msg = op->removeResponse(); |
if (msg && (msg->getMask() & message_mask::ha_async)) | if (msg && (msg->getMask() & message_mask::ha_async)) |
{ | { |
if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT) | if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT) |
|
|
// << Tue Feb 19 14:10:38 2002 mdd >> | // << Tue Feb 19 14:10:38 2002 mdd >> |
operation->lock(); | operation->lock(); |
| |
Message *rq = operation->_request.front(); |
Message *rq = operation->_request.get(); |
| |
// optimization <<< Thu Mar 7 21:04:05 2002 mdd >>> | // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>> |
// move this to the bottom of the loop when the majority of | // move this to the bottom of the loop when the majority of |
|
|
// divert legacy messages to handleEnqueue | // divert legacy messages to handleEnqueue |
if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async))) | if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async))) |
{ | { |
rq = operation->_request.remove_front() ; |
operation->_request.release(); |
operation->unlock(); | operation->unlock(); |
// delete the op node | // delete the op node |
operation->release(); | operation->release(); |
|
|
// ATTN optimization remove the message checking altogether in the base | // ATTN optimization remove the message checking altogether in the base |
// << Mon Feb 18 14:02:20 2002 mdd >> | // << Mon Feb 18 14:02:20 2002 mdd >> |
op->lock(); | op->lock(); |
Message *rq = op->_request.front(); |
Message *rq = op->_request.get(); |
Message *rp = op->_response.front(); |
Message *rp = op->_response.get(); |
op->unlock(); | op->unlock(); |
| |
if ((rq != 0 && (true == messageOK(rq))) || | if ((rq != 0 && (true == messageOK(rq))) || |
|
|
| |
void MessageQueueService::return_op(AsyncOpNode *op) | void MessageQueueService::return_op(AsyncOpNode *op) |
{ | { |
PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED); |
PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED); |
delete op; | delete op; |
} | } |
| |
|
|
} | } |
else | else |
{ | { |
op->_request.insert_front(msg); |
op->_request.reset(msg); |
(static_cast<AsyncMessage *>(msg))->op = op; | (static_cast<AsyncMessage *>(msg))->op = op; |
} | } |
return _meta_dispatcher->route_async(op); | return _meta_dispatcher->route_async(op); |
|
|
if (op == 0) | if (op == 0) |
{ | { |
op = get_op(); | op = get_op(); |
op->_request.insert_front(msg); |
op->_request.reset(msg); |
if (mask & message_mask::ha_async) | if (mask & message_mask::ha_async) |
{ | { |
(static_cast<AsyncMessage *>(msg))->op = op; | (static_cast<AsyncMessage *>(msg))->op = op; |
|
|
if (request->op == 0) | if (request->op == 0) |
{ | { |
request->op = get_op(); | request->op = get_op(); |
request->op->_request.insert_front(request); |
request->op->_request.reset(request); |
destroy_op = true; | destroy_op = true; |
} | } |
| |
|
|
| |
request->op->_client_sem.wait(); | request->op->_client_sem.wait(); |
| |
request->op->lock(); |
AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse()); |
AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_front()); |
|
rpl->op = 0; | rpl->op = 0; |
request->op->unlock(); |
|
| |
if (destroy_op == true) | if (destroy_op == true) |
{ | { |
request->op->lock(); | request->op->lock(); |
request->op->_request.remove(request); |
request->op->_request.release(); |
request->op->_state |= ASYNC_OPSTATE_RELEASED; | request->op->_state |= ASYNC_OPSTATE_RELEASED; |
request->op->unlock(); | request->op->unlock(); |
return_op(request->op); | return_op(request->op); |