version 1.13, 2002/02/06 15:15:35
|
version 1.17, 2002/02/12 23:28:58
|
|
|
| |
#include "MessageQueueService.h" | #include "MessageQueueService.h" |
| |
PEGASUS_USING_STD; |
|
|
|
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
|
cimom *MessageQueueService::_meta_dispatcher = 0; |
|
AtomicInt MessageQueueService::_service_count = 0; |
|
AtomicInt MessageQueueService::_xid(1); |
|
Mutex MessageQueueService::_meta_dispatcher_mutex = Mutex(); |
|
|
|
|
MessageQueueService::MessageQueueService(const char *name, | MessageQueueService::MessageQueueService(const char *name, |
Uint32 queueID, | Uint32 queueID, |
Uint32 capabilities, | Uint32 capabilities, |
Uint32 mask) | Uint32 mask) |
: Base(name, false, queueID), |
: Base(name, true, queueID), |
_capabilities(capabilities), | _capabilities(capabilities), |
_mask(mask), | _mask(mask), |
_die(0), | _die(0), |
|
|
{ | { |
_default_op_timeout.tv_sec = 30; | _default_op_timeout.tv_sec = 30; |
_default_op_timeout.tv_usec = 100; | _default_op_timeout.tv_usec = 100; |
_meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID)); |
|
|
_meta_dispatcher_mutex.lock(pegasus_thread_self()); |
|
|
if(_meta_dispatcher == 0 ) | if(_meta_dispatcher == 0 ) |
|
{ |
|
PEGASUS_ASSERT( _service_count.value() == 0 ); |
|
_meta_dispatcher = new cimom(); |
|
if (_meta_dispatcher == NULL ) |
|
{ |
|
_meta_dispatcher_mutex.unlock(); |
|
|
throw NullPointer(); | throw NullPointer(); |
_req_thread.run(); |
} |
|
|
|
} |
|
_service_count++; |
|
|
|
|
| |
|
if( false == register_service(name, _capabilities, _mask) ) |
|
{ |
|
_meta_dispatcher_mutex.unlock(); |
|
throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher"); |
|
} |
|
|
|
_meta_dispatcher_mutex.unlock(); |
|
|
|
_req_thread.run(); |
} | } |
| |
| |
|
|
{ | { |
_die = 1; | _die = 1; |
if (_incoming_queue_shutdown.value() == 0 ) | if (_incoming_queue_shutdown.value() == 0 ) |
|
{ |
_incoming.shutdown_queue(); | _incoming.shutdown_queue(); |
|
|
_req_thread.join(); | _req_thread.join(); |
|
} |
|
|
|
_meta_dispatcher_mutex.lock(pegasus_thread_self()); |
|
_service_count--; |
|
if (_service_count.value() == 0 ) |
|
{ |
|
_meta_dispatcher->_shutdown_routed_queue(); |
|
delete _meta_dispatcher; |
|
} |
|
_meta_dispatcher_mutex.unlock(); |
| |
} | } |
| |
AtomicInt MessageQueueService::_xid(1); |
|
| |
void MessageQueueService::_shutdown_incoming_queue(void) | void MessageQueueService::_shutdown_incoming_queue(void) |
{ | { |
|
|
| |
msg->op = 0; | msg->op = 0; |
delete msg; | delete msg; |
|
_req_thread.join(); |
|
|
} | } |
| |
| |
|
|
else if (type == async_messages::CIMSERVICE_STOP) | else if (type == async_messages::CIMSERVICE_STOP) |
handle_CimServiceStop(static_cast<CimServiceStop *>(req)); | handle_CimServiceStop(static_cast<CimServiceStop *>(req)); |
else if (type == async_messages::CIMSERVICE_PAUSE) | else if (type == async_messages::CIMSERVICE_PAUSE) |
{ |
|
handle_CimServicePause(static_cast<CimServicePause *>(req)); | handle_CimServicePause(static_cast<CimServicePause *>(req)); |
} |
|
|
|
else if (type == async_messages::CIMSERVICE_RESUME) | else if (type == async_messages::CIMSERVICE_RESUME) |
handle_CimServiceResume(static_cast<CimServiceResume *>(req)); | handle_CimServiceResume(static_cast<CimServiceResume *>(req)); |
else if ( type == async_messages::ASYNC_OP_START) | else if ( type == async_messages::ASYNC_OP_START) |
|
|
} | } |
} | } |
| |
|
|
|
Boolean MessageQueueService::_enqueueResponse( |
|
Message* request, |
|
Message* response) |
|
{ |
|
if(request->_async != 0 ) |
|
{ |
|
Uint32 mask = request->_async->getMask(); |
|
if ( mask & message_mask::ha_async) |
|
{ |
|
if ( mask & message_mask::ha_request) |
|
{ |
|
AsyncOpNode *op = (static_cast<AsyncRequest *>(request->_async)->op); |
|
|
|
AsyncLegacyOperationResult *async_result = |
|
new AsyncLegacyOperationResult( |
|
(static_cast<AsyncRequest *>(request->_async))->getKey(), |
|
(static_cast<AsyncRequest *>(request->_async))->getRouting(), |
|
op, |
|
response); |
|
_completeAsyncResponse(static_cast<AsyncRequest *>(request->_async), |
|
async_result, |
|
ASYNC_OPSTATE_COMPLETE, |
|
0); |
|
return true; |
|
} |
|
} |
|
} |
|
return false; |
|
} |
|
|
void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code) | void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code) |
{ | { |
AsyncReply *reply = | AsyncReply *reply = |
|
|
| |
void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) | void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) |
{ | { |
|
; |
|
} |
|
|
|
|
|
void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req) |
|
{ |
|
// remove the legacy message from the request and enqueue it to its destination |
|
Uint32 result = async_results::CIM_NAK; |
| |
|
Message *legacy = req->act; |
|
if ( legacy != 0 ) |
|
{ |
|
MessageQueue* queue = MessageQueue::lookup(req->legacy_destination); |
|
if( queue != 0 ) |
|
{ |
|
// Enqueue the response: |
|
queue->enqueue(legacy); |
|
result = async_results::OK; |
|
} |
|
} |
|
_make_response(req, result); |
|
} |
|
|
|
void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep) |
|
{ |
|
; |
} | } |
| |
AsyncOpNode *MessageQueueService::get_op(void) | AsyncOpNode *MessageQueueService::get_op(void) |
|
|
{ | { |
if(reply->getMask() & message_mask::ha_reply) | if(reply->getMask() & message_mask::ha_reply) |
{ | { |
if(reply->result == async_results::OK) |
if(reply->result == async_results::OK || |
|
reply->result == async_results::MODULE_ALREADY_REGISTERED ) |
registered = true; | registered = true; |
} | } |
} | } |