version 1.131, 2006/11/10 18:14:58
|
version 1.140, 2008/08/19 17:20:21
|
|
|
if (rtn != PEGASUS_THREAD_OK ) | if (rtn != PEGASUS_THREAD_OK ) |
{ | { |
service->_threads--; | service->_threads--; |
Logger::put( |
PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1, |
Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE, |
|
"Not enough threads to process this request. " |
|
"Skipping."); |
|
|
|
Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2, |
|
"Could not allocate thread for %s. Queue has %d " | "Could not allocate thread for %s. Queue has %d " |
"messages waiting and %d threads servicing." | "messages waiting and %d threads servicing." |
"Skipping the service for right now. ", | "Skipping the service for right now. ", |
service->getQueueName(), | service->getQueueName(), |
service->_incoming.count(), | service->_incoming.count(), |
service->_threads.get()); |
service->_threads.get())); |
| |
Threads::yield(); | Threads::yield(); |
service = NULL; | service = NULL; |
|
|
} | } |
list->unlock(); | list->unlock(); |
} | } |
myself->exit_self( (ThreadReturnType) 1 ); |
return ThreadReturnType(0); |
return 0; |
|
} | } |
| |
| |
|
|
max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT; | max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT; |
} | } |
| |
Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2, |
PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3, |
"max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue); |
"max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue)); |
| |
AutoMutex autoMut(_meta_dispatcher_mutex); | AutoMutex autoMut(_meta_dispatcher_mutex); |
| |
|
|
"Common.MessageQueueService.UNABLE_TO_REGISTER", | "Common.MessageQueueService.UNABLE_TO_REGISTER", |
"CIM base message queue service is unable to register with the " | "CIM base message queue service is unable to register with the " |
"CIMOM dispatcher."); | "CIMOM dispatcher."); |
throw BindFailedException(parms); |
throw Exception(parms); |
} | } |
| |
_get_polling_list()->insert_back(this); | _get_polling_list()->insert_back(this); |
|
|
msg->op->_request.reset(msg); | msg->op->_request.reset(msg); |
try | try |
{ | { |
_incoming.enqueue_wait(msg->op); |
_incoming.enqueue(msg->op); |
_polling_sem.signal(); | _polling_sem.signal(); |
} | } |
catch (const ListClosed&) | catch (const ListClosed&) |
|
|
catch (ListClosed&) | catch (ListClosed&) |
{ | { |
// ATTN: This appears to be a common loop exit path. | // ATTN: This appears to be a common loop exit path. |
//PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
//PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
// "Caught ListClosed exception. Exiting _req_proc."); | // "Caught ListClosed exception. Exiting _req_proc."); |
break; | break; |
} | } |
|
|
} | } |
catch (const Exception& e) | catch (const Exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Caught exception: \"") + e.getMessage() + | String("Caught exception: \"") + e.getMessage() + |
"\". Exiting _req_proc."); | "\". Exiting _req_proc."); |
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Caught unrecognized exception. Exiting _req_proc."); | "Caught unrecognized exception. Exiting _req_proc."); |
} | } |
service->_threads--; | service->_threads--; |
|
|
Message *msg = op->removeRequest(); | Message *msg = op->removeRequest(); |
if (msg && (msg->getMask() & MessageMask::ha_async)) | if (msg && (msg->getMask() & MessageMask::ha_async)) |
{ | { |
if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START) |
if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START) |
{ | { |
AsyncLegacyOperationStart *wrapper = | AsyncLegacyOperationStart *wrapper = |
static_cast<AsyncLegacyOperationStart *>(msg); | static_cast<AsyncLegacyOperationStart *>(msg); |
msg = wrapper->get_action(); | msg = wrapper->get_action(); |
delete wrapper; | delete wrapper; |
} | } |
else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START) |
else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START) |
{ | { |
AsyncModuleOperationStart *wrapper = | AsyncModuleOperationStart *wrapper = |
static_cast<AsyncModuleOperationStart *>(msg); | static_cast<AsyncModuleOperationStart *>(msg); |
msg = wrapper->get_action(); | msg = wrapper->get_action(); |
delete wrapper; | delete wrapper; |
} | } |
else if (msg->getType() == async_messages::ASYNC_OP_START) |
else if (msg->getType() == ASYNC_ASYNC_OP_START) |
{ | { |
AsyncOperationStart *wrapper = | AsyncOperationStart *wrapper = |
static_cast<AsyncOperationStart *>(msg); | static_cast<AsyncOperationStart *>(msg); |
|
|
msg = op->removeResponse(); | msg = op->removeResponse(); |
if (msg && (msg->getMask() & MessageMask::ha_async)) | if (msg && (msg->getMask() & MessageMask::ha_async)) |
{ | { |
if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT) |
if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT) |
{ | { |
AsyncLegacyOperationResult *wrapper = | AsyncLegacyOperationResult *wrapper = |
static_cast<AsyncLegacyOperationResult *>(msg); | static_cast<AsyncLegacyOperationResult *>(msg); |
msg = wrapper->get_result(); | msg = wrapper->get_result(); |
delete wrapper; | delete wrapper; |
} | } |
else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT) |
else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT) |
{ | { |
AsyncModuleOperationResult *wrapper = | AsyncModuleOperationResult *wrapper = |
static_cast<AsyncModuleOperationResult *>(msg); | static_cast<AsyncModuleOperationResult *>(msg); |
|
|
{ | { |
req->op->processing(); | req->op->processing(); |
| |
Uint32 type = req->getType(); |
MessageType type = req->getType(); |
if (type == async_messages::HEARTBEAT) |
if (type == ASYNC_HEARTBEAT) |
handle_heartbeat_request(req); | handle_heartbeat_request(req); |
else if (type == async_messages::IOCTL) |
else if (type == ASYNC_IOCTL) |
handle_AsyncIoctl(static_cast<AsyncIoctl *>(req)); | handle_AsyncIoctl(static_cast<AsyncIoctl *>(req)); |
else if (type == async_messages::CIMSERVICE_START) |
else if (type == ASYNC_CIMSERVICE_START) |
handle_CimServiceStart(static_cast<CimServiceStart *>(req)); | handle_CimServiceStart(static_cast<CimServiceStart *>(req)); |
else if (type == async_messages::CIMSERVICE_STOP) |
else if (type == ASYNC_CIMSERVICE_STOP) |
handle_CimServiceStop(static_cast<CimServiceStop *>(req)); | handle_CimServiceStop(static_cast<CimServiceStop *>(req)); |
else if (type == async_messages::CIMSERVICE_PAUSE) |
else if (type == ASYNC_CIMSERVICE_PAUSE) |
handle_CimServicePause(static_cast<CimServicePause *>(req)); | handle_CimServicePause(static_cast<CimServicePause *>(req)); |
else if (type == async_messages::CIMSERVICE_RESUME) |
else if (type == ASYNC_CIMSERVICE_RESUME) |
handle_CimServiceResume(static_cast<CimServiceResume *>(req)); | handle_CimServiceResume(static_cast<CimServiceResume *>(req)); |
else if (type == async_messages::ASYNC_OP_START) |
else if (type == ASYNC_ASYNC_OP_START) |
handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req)); | handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req)); |
else | else |
{ | { |
|
|
} | } |
} | } |
| |
if (request->_async != 0) |
AsyncRequest* asyncRequest = |
|
static_cast<AsyncRequest*>(request->get_async()); |
|
|
|
if (asyncRequest != 0) |
{ | { |
Uint32 mask = request->_async->getMask(); |
PEGASUS_ASSERT(asyncRequest->getMask() & |
PEGASUS_ASSERT(mask & |
|
(MessageMask::ha_async | MessageMask::ha_request)); | (MessageMask::ha_async | MessageMask::ha_request)); |
| |
AsyncRequest *async = static_cast<AsyncRequest *>(request->_async); |
AsyncOpNode* op = asyncRequest->op; |
AsyncOpNode *op = async->op; |
|
request->_async = 0; |
|
// the legacy request is going to be deleted by its handler | // the legacy request is going to be deleted by its handler |
// remove it from the op node | // remove it from the op node |
| |
static_cast<AsyncLegacyOperationStart *>(async)->get_action(); |
static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action(); |
| |
AsyncLegacyOperationResult *async_result = | AsyncLegacyOperationResult *async_result = |
new AsyncLegacyOperationResult( | new AsyncLegacyOperationResult( |
op, | op, |
response); | response); |
_completeAsyncResponse( | _completeAsyncResponse( |
async, |
asyncRequest, |
async_result, | async_result, |
ASYNC_OPSTATE_COMPLETE, | ASYNC_OPSTATE_COMPLETE, |
0); | 0); |
|
|
if ((rq != 0 && (true == messageOK(rq))) || | if ((rq != 0 && (true == messageOK(rq))) || |
(rp != 0 && (true == messageOK(rp))) && _die.get() == 0) | (rp != 0 && (true == messageOK(rp))) && _die.get() == 0) |
{ | { |
_incoming.enqueue_wait(op); |
_incoming.enqueue(op); |
_polling_sem.signal(); | _polling_sem.signal(); |
return true; | return true; |
} | } |
|
|
// default action is to echo a heartbeat response | // default action is to echo a heartbeat response |
| |
AsyncReply *reply = new AsyncReply( | AsyncReply *reply = new AsyncReply( |
async_messages::HEARTBEAT, |
ASYNC_HEARTBEAT, |
0, | 0, |
req->op, | req->op, |
async_results::OK, | async_results::OK, |
|
|
{ | { |
if (reply->getMask() & MessageMask::ha_reply) | if (reply->getMask() & MessageMask::ha_reply) |
{ | { |
if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT) |
if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT) |
{ | { |
if ((static_cast<FindServiceQueueResult*>(reply))->result == | if ((static_cast<FindServiceQueueResult*>(reply))->result == |
async_results::OK) | async_results::OK) |
|
|
{ | { |
if (reply->getMask() & MessageMask::ha_reply) | if (reply->getMask() & MessageMask::ha_reply) |
{ | { |
if (reply->getType() == |
if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT) |
async_messages::ENUMERATE_SERVICE_RESULT) |
|
{ | { |
if ((static_cast<EnumerateServiceResponse*>(reply))-> | if ((static_cast<EnumerateServiceResponse*>(reply))-> |
result == async_results::OK) | result == async_results::OK) |