version 1.21, 2002/02/19 18:21:14
|
version 1.22, 2002/02/20 22:00:51
|
|
|
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
#include "MessageQueueService.h" | #include "MessageQueueService.h" |
|
#include <Pegasus/Common/Tracer.h> |
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
Uint32 capabilities, | Uint32 capabilities, |
Uint32 mask) | Uint32 mask) |
: Base(name, true, queueID), | : Base(name, true, queueID), |
_capabilities(capabilities), |
|
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
_pending(true), | _pending(true), |
|
|
_incoming_queue_shutdown(0), | _incoming_queue_shutdown(0), |
_req_thread(_req_proc, this, false) | _req_thread(_req_proc, this, false) |
{ | { |
|
_capabilities = (capabilities | module_capabilities::async); |
|
|
_default_op_timeout.tv_sec = 30; | _default_op_timeout.tv_sec = 30; |
_default_op_timeout.tv_usec = 100; | _default_op_timeout.tv_usec = 100; |
| |
|
|
_service_count++; | _service_count++; |
| |
| |
|
|
if( false == register_service(name, _capabilities, _mask) ) | if( false == register_service(name, _capabilities, _mask) ) |
{ | { |
_meta_dispatcher_mutex.unlock(); | _meta_dispatcher_mutex.unlock(); |
|
|
| |
} | } |
| |
|
|
|
|
|
void MessageQueueService::enqueue(Message *msg) throw(IPCException) |
|
{ |
|
Base::enqueue(msg); |
|
|
|
// PEGASUS_ASSERT(msg != 0 ); |
|
|
|
// cout << "inside overriden enqueue" << endl; |
|
// if (!msg) |
|
// { |
|
// Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3, |
|
// "MessageQueue::enqueue failure"); |
|
// throw NullPointer(); |
|
// } |
|
|
|
// if (getenv("PEGASUS_TRACE")) |
|
// { |
|
// cout << "===== " << getQueueName() << ": "; |
|
// msg->print(cout); |
|
// } |
|
|
|
// msg->dest = _queueId; |
|
// SendForget(msg); |
|
|
|
} |
|
|
|
|
|
|
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) |
{ | { |
Thread *myself = reinterpret_cast<Thread *>(parm); | Thread *myself = reinterpret_cast<Thread *>(parm); |
|
|
return(0); | return(0); |
} | } |
| |
|
void MessageQueueService::_handle_async_callback(AsyncOpNode *op) |
|
{ |
|
return_op(op); |
|
} |
|
|
| |
void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation, | void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation, |
Thread *thread, | Thread *thread, |
|
|
{ | { |
if ( operation != 0 ) | if ( operation != 0 ) |
{ | { |
|
|
|
// ATTN: optimization |
|
// << Tue Feb 19 14:10:38 2002 mdd >> |
operation->lock(); | operation->lock(); |
|
if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) && |
|
(operation->_state & ASYNC_OPSTATE_COMPLETE)) |
|
{ |
|
operation->unlock(); |
|
_handle_async_callback(operation); |
|
} |
|
|
Message *rq = operation->_request.next(0); | Message *rq = operation->_request.next(0); |
PEGASUS_ASSERT(rq != 0 ); | PEGASUS_ASSERT(rq != 0 ); |
| |
|
|
| |
} | } |
| |
|
|
|
|
|
|
void MessageQueueService::_handle_async_request(AsyncRequest *req) | void MessageQueueService::_handle_async_request(AsyncRequest *req) |
{ | { |
if ( req != 0 ) | if ( req != 0 ) |
|
|
Message *rp = op->_response.next(0); | Message *rp = op->_response.next(0); |
op->unlock(); | op->unlock(); |
| |
if( ((true == messageOK(rq)) || ( true == messageOK(rp) )) && _die.value() == 0 ) |
if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) && |
|
_die.value() == 0 ) |
{ | { |
_incoming.insert_last_wait(op); | _incoming.insert_last_wait(op); |
return true; | return true; |
|
|
| |
void MessageQueueService::handleEnqueue(Message *msg) | void MessageQueueService::handleEnqueue(Message *msg) |
{ | { |
|
|
if ( msg ) | if ( msg ) |
delete msg; | delete msg; |
| |
|
|
| |
void MessageQueueService::handleEnqueue(void) | void MessageQueueService::handleEnqueue(void) |
{ | { |
|
|
Message *msg = dequeue(); | Message *msg = dequeue(); |
handleEnqueue(msg); | handleEnqueue(msg); |
} | } |
|
|
if ( 0 == (op->_op_dest = MessageQueue::lookup(destination))) | if ( 0 == (op->_op_dest = MessageQueue::lookup(destination))) |
return false; | return false; |
| |
op->_flags &= ASYNC_OPFLAGS_CALLBACK; |
op->_flags |= ASYNC_OPFLAGS_CALLBACK; |
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPSTATE_COMPLETE); |
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); |
|
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
| |
| |
return _meta_dispatcher->route_async(op); | return _meta_dispatcher->route_async(op); |
|
|
{ | { |
| |
AsyncOpNode *op = 0; | AsyncOpNode *op = 0; |
|
Uint32 mask = msg->getMask(); |
| |
if (msg->getMask() & message_mask::ha_async) |
if (mask & message_mask::ha_async) |
{ | { |
op = (static_cast<AsyncMessage *>(msg))->op ; | op = (static_cast<AsyncMessage *>(msg))->op ; |
} | } |
|
|
if( op == 0 ) | if( op == 0 ) |
{ | { |
op = get_op(); | op = get_op(); |
op->_request.insert_first(msg); | op->_request.insert_first(msg); |
|
if (mask & message_mask::ha_async) |
|
(static_cast<AsyncMessage *>(msg))->op = op; |
} | } |
| |
op->_flags &= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS | ASYNC_OPSTATE_COMPLETE); |
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); |
op->put_response(0); |
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
| |
// get the queue handle for the destination | // get the queue handle for the destination |
if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest))) | if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest))) |
|
|
} | } |
| |
request->block = true; | request->block = true; |
request->op->_state &= ~(ASYNC_OPSTATE_COMPLETE | ASYNC_OPFLAGS_CALLBACK); |
request->op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
request->op->put_response(0); |
request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK; |
| |
// get the queue handle for the destination | // get the queue handle for the destination |
if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest))) | if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest))) |