version 1.138, 2008/06/19 17:57:01
|
version 1.145, 2008/09/17 18:47:22
|
|
|
"Common.MessageQueueService.UNABLE_TO_REGISTER", | "Common.MessageQueueService.UNABLE_TO_REGISTER", |
"CIM base message queue service is unable to register with the " | "CIM base message queue service is unable to register with the " |
"CIMOM dispatcher."); | "CIMOM dispatcher."); |
throw BindFailedException(parms); |
throw Exception(parms); |
} | } |
| |
_get_polling_list()->insert_back(this); | _get_polling_list()->insert_back(this); |
|
|
_thread_pool = 0; | _thread_pool = 0; |
} | } |
} // mutex unlocks here | } // mutex unlocks here |
// Clean up in case there are extra stuff on the queue. |
|
while (_incoming.count()) |
// Clean up any extra stuff on the queue. |
{ |
AsyncOpNode* op = 0; |
try |
while ((op = _incoming.dequeue())) |
{ |
|
delete _incoming.dequeue(); |
|
} |
|
catch (const ListClosed&) |
|
{ | { |
// If the list is closed, there is nothing we can do. |
delete op; |
break; |
|
} |
|
} | } |
} | } |
| |
|
|
| |
msg->op->_op_dest = this; | msg->op->_op_dest = this; |
msg->op->_request.reset(msg); | msg->op->_request.reset(msg); |
try |
if (_incoming.enqueue(msg->op)) |
{ | { |
_incoming.enqueue_wait(msg->op); |
|
_polling_sem.signal(); | _polling_sem.signal(); |
} | } |
catch (const ListClosed&) |
else |
{ | { |
// This means the queue has already been shut-down (happens when there | // This means the queue has already been shut-down (happens when there |
// are two AsyncIoctrl::IO_CLOSE messages generated and one got first | // are two AsyncIoctrl::IO_CLOSE messages generated and one got first |
// processed. | // processed. |
delete msg; | delete msg; |
} | } |
catch (const Permission&) |
|
{ |
|
delete msg; |
|
} |
|
} | } |
| |
| |
|
|
void MessageQueueService::enqueue(Message* msg) | void MessageQueueService::enqueue(Message* msg) |
{ | { |
PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()"); | PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()"); |
|
|
// many operations may have been queued. | // many operations may have been queued. |
do | do |
{ | { |
try |
|
{ |
|
operation = service->_incoming.dequeue(); | operation = service->_incoming.dequeue(); |
} |
|
catch (ListClosed&) |
|
{ |
|
// ATTN: This appears to be a common loop exit path. |
|
//PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
// "Caught ListClosed exception. Exiting _req_proc."); |
|
break; |
|
} |
|
| |
if (operation) | if (operation) |
{ | { |
|
|
} | } |
catch (const Exception& e) | catch (const Exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Caught exception: \"") + e.getMessage() + |
"Caught exception: \"%s\". Exiting _req_proc.", |
"\". Exiting _req_proc."); |
(const char*)e.getMessage().getCString())); |
} | } |
catch (...) | catch (...) |
{ | { |
|
|
if ((rq != 0 && (true == messageOK(rq))) || | if ((rq != 0 && (true == messageOK(rq))) || |
(rp != 0 && (true == messageOK(rp))) && _die.get() == 0) | (rp != 0 && (true == messageOK(rp))) && _die.get() == 0) |
{ | { |
_incoming.enqueue_wait(op); |
if (_incoming.enqueue(op)) |
|
{ |
_polling_sem.signal(); | _polling_sem.signal(); |
return true; | return true; |
} | } |
|
} |
return false; | return false; |
} | } |
| |
|
|
// empty out the queue | // empty out the queue |
while (1) | while (1) |
{ | { |
AsyncOpNode *operation; |
AsyncOpNode* operation = 0; |
try | try |
{ | { |
operation = service->_incoming.dequeue(); | operation = service->_incoming.dequeue(); |
} | } |
catch (IPCException&) |
catch (...) |
{ | { |
break; | break; |
} | } |
|
|
{ | { |
} | } |
| |
|
|
void MessageQueueService::handle_AsyncLegacyOperationStart( |
|
AsyncLegacyOperationStart* req) |
|
{ |
|
// remove the legacy message from the request and enqueue it to its |
|
// destination |
|
Uint32 result = async_results::CIM_NAK; |
|
|
|
Message* legacy = req->_act; |
|
if (legacy != 0) |
|
{ |
|
MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination); |
|
if (queue != 0) |
|
{ |
|
if (queue->isAsync() == true) |
|
{ |
|
(static_cast<MessageQueueService *>(queue))->handleEnqueue( |
|
legacy); |
|
} |
|
else |
|
{ |
|
// Enqueue the response: |
|
queue->enqueue(req->get_action()); |
|
} |
|
|
|
result = async_results::OK; |
|
} |
|
} |
|
_make_response(req, result); |
|
} |
|
|
|
void MessageQueueService::handle_AsyncLegacyOperationResult( |
|
AsyncLegacyOperationResult* rep) |
|
{ |
|
} |
|
|
|
AsyncOpNode* MessageQueueService::get_op() | AsyncOpNode* MessageQueueService::get_op() |
{ | { |
AsyncOpNode* op = new AsyncOpNode(); | AsyncOpNode* op = new AsyncOpNode(); |
|
|
AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart( | AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart( |
op, | op, |
destination, | destination, |
msg, |
msg); |
destination); |
|
} | } |
else | else |
{ | { |