version 1.133.2.3, 2008/10/08 21:30:02
|
version 1.138, 2008/06/19 17:57:01
|
|
|
AtomicInt MessageQueueService::_service_count(0); | AtomicInt MessageQueueService::_service_count(0); |
Mutex MessageQueueService::_meta_dispatcher_mutex; | Mutex MessageQueueService::_meta_dispatcher_mutex; |
| |
static struct timeval deallocateWait = {30, 0}; |
static struct timeval deallocateWait = {300, 0}; |
|
|
// set max threads in a single thread pool. Since there is only |
|
// a single threadpool, this sets the maximum threads that can |
|
// be used minus permanent threads and provider added threads. |
|
const Uint32 maximumThreadsInPool = 20; |
|
| |
ThreadPool *MessageQueueService::_thread_pool = 0; | ThreadPool *MessageQueueService::_thread_pool = 0; |
| |
|
|
if (rtn != PEGASUS_THREAD_OK ) | if (rtn != PEGASUS_THREAD_OK ) |
{ | { |
service->_threads--; | service->_threads--; |
Logger::put( |
PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1, |
Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE, |
|
"Not enough threads to process this request. " |
|
"Skipping."); |
|
|
|
PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2, |
|
"Could not allocate thread for %s. Queue has %d " | "Could not allocate thread for %s. Queue has %d " |
"messages waiting and %d threads servicing." | "messages waiting and %d threads servicing." |
"Skipping the service for right now. ", | "Skipping the service for right now. ", |
|
|
} | } |
list->unlock(); | list->unlock(); |
} | } |
myself->exit_self( (ThreadReturnType) 1 ); |
return ThreadReturnType(0); |
return 0; |
|
} | } |
| |
| |
|
|
max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT; | max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT; |
} | } |
| |
PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2, |
PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3, |
"max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue)); | "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue)); |
| |
AutoMutex autoMut(_meta_dispatcher_mutex); | AutoMutex autoMut(_meta_dispatcher_mutex); |
|
|
// _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService", | // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService", |
// minimum_cnt, maximum_cnt, deallocateWait); | // minimum_cnt, maximum_cnt, deallocateWait); |
// | // |
|
_thread_pool = |
_thread_pool = new ThreadPool(0, "MessageQueueService", 0, |
new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait); |
maximumThreadsInPool, deallocateWait); |
|
} | } |
_service_count++; | _service_count++; |
| |
|
|
} | } |
catch (const Exception& e) | catch (const Exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Caught exception: \"") + e.getMessage() + | String("Caught exception: \"") + e.getMessage() + |
"\". Exiting _req_proc."); | "\". Exiting _req_proc."); |
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Caught unrecognized exception. Exiting _req_proc."); | "Caught unrecognized exception. Exiting _req_proc."); |
} | } |
service->_threads--; | service->_threads--; |
|
|
} | } |
} | } |
| |
if (request->_async != 0) |
AsyncRequest* asyncRequest = |
|
static_cast<AsyncRequest*>(request->get_async()); |
|
|
|
if (asyncRequest != 0) |
{ | { |
Uint32 mask = request->_async->getMask(); |
PEGASUS_ASSERT(asyncRequest->getMask() & |
PEGASUS_ASSERT(mask & |
|
(MessageMask::ha_async | MessageMask::ha_request)); | (MessageMask::ha_async | MessageMask::ha_request)); |
| |
AsyncRequest *async = static_cast<AsyncRequest *>(request->_async); |
AsyncOpNode* op = asyncRequest->op; |
AsyncOpNode *op = async->op; |
|
request->_async = 0; |
|
// the legacy request is going to be deleted by its handler | // the legacy request is going to be deleted by its handler |
// remove it from the op node | // remove it from the op node |
| |
static_cast<AsyncLegacyOperationStart *>(async)->get_action(); |
static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action(); |
| |
AsyncLegacyOperationResult *async_result = | AsyncLegacyOperationResult *async_result = |
new AsyncLegacyOperationResult( | new AsyncLegacyOperationResult( |
op, | op, |
response); | response); |
_completeAsyncResponse( | _completeAsyncResponse( |
async, |
asyncRequest, |
async_result, | async_result, |
ASYNC_OPSTATE_COMPLETE, | ASYNC_OPSTATE_COMPLETE, |
0); | 0); |
|
|
return false; | return false; |
if (_polling_thread == NULL) | if (_polling_thread == NULL) |
{ | { |
printf("===== NEW3\n"); |
|
_polling_thread = new Thread( | _polling_thread = new Thread( |
polling_routine, | polling_routine, |
reinterpret_cast<void *>(_get_polling_list()), | reinterpret_cast<void *>(_get_polling_list()), |