version 1.28.2.1, 2002/03/04 11:57:39
|
version 1.29, 2002/03/06 21:25:06
|
|
|
Base::enqueue(msg); | Base::enqueue(msg); |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
|
|
// PEGASUS_ASSERT(msg != 0 ); |
|
|
|
// cout << "inside overriden enqueue" << endl; |
|
// if (!msg) |
|
// { |
|
// Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3, |
|
// "MessageQueue::enqueue failure"); |
|
// throw NullPointer(); |
|
// } |
|
|
|
// if (getenv("PEGASUS_TRACE")) |
|
// { |
|
// cout << "===== " << getQueueName() << ": "; |
|
// msg->print(cout); |
|
// } |
|
|
|
// msg->dest = _queueId; |
|
|
|
// SendForget(msg); |
|
|
|
} | } |
| |
| |
|
|
// ATTN: optimization | // ATTN: optimization |
// << Tue Feb 19 14:10:38 2002 mdd >> | // << Tue Feb 19 14:10:38 2002 mdd >> |
operation->lock(); | operation->lock(); |
if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) && |
|
(operation->_state & ASYNC_OPSTATE_COMPLETE)) |
|
{ |
|
operation->unlock(); |
|
_handle_async_callback(operation); |
|
} |
|
| |
Message *rq = operation->_request.next(0); | Message *rq = operation->_request.next(0); |
PEGASUS_ASSERT(rq != 0 ); |
|
| |
// divert "plain" legacy messages to handleEnqueue |
// divert legacy messages to handleEnqueue |
if ( ! (rq->getMask() & message_mask::ha_async) ) |
if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async))) |
{ | { |
rq = operation->_request.remove_first() ; | rq = operation->_request.remove_first() ; |
operation->unlock(); | operation->unlock(); |
// delete the op node | // delete the op node |
delete operation; | delete operation; |
| |
|
|
// Attn: change to handleEnqueue(msg) when we have that method in all messagequeueservices |
|
// make handleEnqueue pure virtual !!! |
|
// << Fri Feb 22 13:39:09 2002 mdd >> |
|
|
|
handleEnqueue(rq); | handleEnqueue(rq); |
return; | return; |
} | } |
| |
|
if ( operation->_state & ASYNC_OPFLAGS_CALLBACK && |
|
(operation->_state & ASYNC_OPSTATE_COMPLETE)) |
|
{ |
|
operation->unlock(); |
|
_handle_async_callback(operation); |
|
} |
|
else |
|
{ |
|
PEGASUS_ASSERT(rq != 0 ); |
|
// ATTN: optimization |
|
// << Wed Mar 6 15:00:39 2002 mdd >> |
|
// put thread and queue into the asyncopnode structure. |
|
(static_cast<AsyncMessage *>(rq))->_myself = thread; |
|
(static_cast<AsyncMessage *>(rq))->_service = queue; |
operation->unlock(); | operation->unlock(); |
static_cast<AsyncMessage *>(rq)->_myself = thread; |
|
static_cast<AsyncMessage *>(rq)->_service = queue; |
|
_handle_async_request(static_cast<AsyncRequest *>(rq)); | _handle_async_request(static_cast<AsyncRequest *>(rq)); |
} | } |
|
} |
return; | return; |
|
|
} | } |
| |
void MessageQueueService::_handle_async_request(AsyncRequest *req) | void MessageQueueService::_handle_async_request(AsyncRequest *req) |
|
|
return true; | return true; |
} | } |
| |
|
|
|
// made pure virtual |
|
// << Wed Mar 6 15:11:31 2002 mdd >> |
// void MessageQueueService::handleEnqueue(Message *msg) | // void MessageQueueService::handleEnqueue(Message *msg) |
// { | // { |
|
|
|
|
// if ( msg ) | // if ( msg ) |
// delete msg; | // delete msg; |
// } | // } |
| |
|
// made pure virtual |
|
// << Wed Mar 6 15:11:56 2002 mdd >> |
// void MessageQueueService::handleEnqueue(void) | // void MessageQueueService::handleEnqueue(void) |
// { | // { |
// Message *msg = dequeue(); | // Message *msg = dequeue(); |
|
|
} | } |
| |
| |
void MessageQueueService::ReplyAsync(AsyncOpNode *op, |
Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, |
Uint32 destination) | Uint32 destination) |
{ | { |
PEGASUS_ASSERT( op->_flags & ASYNC_OPFLAGS_CALLBACK ); |
PEGASUS_ASSERT(op != 0 ); |
|
|
|
|
// get the queue handle for the destination |
|
if ( 0 == (op->_op_dest = MessageQueue::lookup(destination))) | if ( 0 == (op->_op_dest = MessageQueue::lookup(destination))) |
{ |
return false; |
delete op; |
|
} |
|
|
|
op->_response.next(0)->dest = destination; |
|
| |
|
op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD); |
|
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK); |
| |
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); |
return _meta_dispatcher->route_async(op); |
op->_state |= ASYNC_OPSTATE_COMPLETE; |
|
|
|
if ( false == _meta_dispatcher->route_async(op) ) |
|
delete op; |
|
return; |
|
} | } |
| |
| |
|
|
Uint32 destination, | Uint32 destination, |
void (*callback)(AsyncOpNode *, | void (*callback)(AsyncOpNode *, |
MessageQueue *, | MessageQueue *, |
void *), |
void *)) |
void * parm) |
|
{ | { |
PEGASUS_ASSERT( callback != 0 && op != 0 ); |
PEGASUS_ASSERT(op != 0 && callback != 0 ); |
|
|
op->_callback_ptr = parm; |
|
| |
// get the queue handle for the destination | // get the queue handle for the destination |
if ( 0 == (op->_op_dest = MessageQueue::lookup(destination))) | if ( 0 == (op->_op_dest = MessageQueue::lookup(destination))) |
return false; | return false; |
op->_request.next(0)->dest = destination; |
|
| |
op->_flags |= ASYNC_OPFLAGS_CALLBACK; | op->_flags |= ASYNC_OPFLAGS_CALLBACK; |
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); | op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); |
|
|
return _meta_dispatcher->route_async(op); | return _meta_dispatcher->route_async(op); |
} | } |
| |
void MessageQueueService::ForwardRequest(AsyncOpNode *op, MessageQueue *dest) |
|
{ |
|
|
|
Message *msg = op->_request.next(0); |
|
if ( msg == 0 ) |
|
{ |
|
delete op; |
|
return; |
|
} |
|
msg->_async = 0; |
|
msg->dest = dest->getQueueId(); |
|
if(msg->getMask() & message_mask::ha_async) |
|
(static_cast<AsyncMessage *>(msg))->op = 0; |
|
|
|
op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
|
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); |
|
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
|
|
op->_op_dest = dest; |
|
|
|
// now see if the meta dispatcher will take it |
|
if( false == _meta_dispatcher->route_async(op)) |
|
delete op; |
|
|
|
return; |
|
} |
|
|
|
|
|
void MessageQueueService::ForwardResponse(AsyncOpNode *op, MessageQueue *dest) |
|
{ |
|
|
|
Message *msg = op->_response.next(0); |
|
if ( msg == 0 || dest == 0 ) |
|
{ |
|
delete op; |
|
return; |
|
} |
|
msg->_async = 0; |
|
msg->dest = dest->getQueueId(); |
|
if(msg->getMask() & message_mask::ha_async) |
|
(static_cast<AsyncMessage *>(msg))->op = 0; |
|
|
|
op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
|
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); |
|
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
|
|
op->_op_dest = dest; |
|
|
|
// now see if the meta dispatcher will take it |
|
if( false == _meta_dispatcher->route_async(op)) |
|
delete op; |
|
|
|
return; |
|
} |
|
| |
Boolean MessageQueueService::SendForget(Message *msg) | Boolean MessageQueueService::SendForget(Message *msg) |
{ | { |
|
|
op->_request.insert_first(msg); | op->_request.insert_first(msg); |
if (mask & message_mask::ha_async) | if (mask & message_mask::ha_async) |
(static_cast<AsyncMessage *>(msg))->op = op; | (static_cast<AsyncMessage *>(msg))->op = op; |
else |
|
msg->_async = 0; |
|
} | } |
op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; | op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); | op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); |