version 1.129, 2006/09/13 21:05:37
|
version 1.130, 2006/10/03 18:16:03
|
|
|
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
// #include <iostream.h> |
|
#include "MessageQueueService.h" | #include "MessageQueueService.h" |
#include <Pegasus/Common/Tracer.h> | #include <Pegasus/Common/Tracer.h> |
#include <Pegasus/Common/MessageLoader.h> | #include <Pegasus/Common/MessageLoader.h> |
#include <Pegasus/Common/StatisticalData.h> |
|
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
return _thread_pool; | return _thread_pool; |
} | } |
| |
void MessageQueueService::cleanupThreadPool() |
|
{ |
|
_check_idle_flag = 1; |
|
_polling_sem.signal(); |
|
} |
|
|
|
// | // |
// MAX_THREADS_PER_SVC_QUEUE | // MAX_THREADS_PER_SVC_QUEUE |
// | // |
|
|
| |
Uint32 max_threads_per_svc_queue; | Uint32 max_threads_per_svc_queue; |
| |
ThreadReturnType PEGASUS_THREAD_CDECL |
|
MessageQueueService::kill_idle_threads(void *parm) |
|
{ |
|
|
|
static struct timeval now, last = {0,0}; |
|
Time::gettimeofday(&now); |
|
int dead_threads = 0; |
|
|
|
if (now.tv_sec - last.tv_sec > 120) |
|
{ |
|
Time::gettimeofday(&last); |
|
try |
|
{ |
|
dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads(); |
|
} |
|
catch(...) |
|
{ |
|
|
|
} |
|
} |
|
|
|
#ifdef PEGASUS_POINTER_64BIT |
|
return (ThreadReturnType)(Uint64)dead_threads; |
|
#elif PEGASUS_PLATFORM_AIX_RS_IBMCXX |
|
return (ThreadReturnType)(unsigned long)dead_threads; |
|
#else |
|
return (ThreadReturnType)(Uint32)dead_threads; |
|
#endif |
|
} |
|
|
|
ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) | ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) |
{ | { |
Thread *myself = reinterpret_cast<Thread *>(parm); | Thread *myself = reinterpret_cast<Thread *>(parm); |
|
|
} | } |
| |
// The polling_routine thread must hold the lock on the | // The polling_routine thread must hold the lock on the |
// _polling_thread list while processing incoming messages. |
// _polling_list while processing incoming messages. |
// This lock is used to give this thread ownership of | // This lock is used to give this thread ownership of |
// services on the _polling_routine list. | // services on the _polling_routine list. |
| |
|
|
} | } |
} | } |
list->unlock(); | list->unlock(); |
|
|
if (_check_idle_flag.get() != 0) |
|
{ |
|
_check_idle_flag = 0; |
|
// try to do idle thread clean up processing when system is not busy |
|
// if system is busy there may not be a thread available to allocate |
|
// so nothing will be done and that is OK. |
|
|
|
if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK) |
|
{ |
|
Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE, |
|
"Not enough threads to kill idle threads. What an irony."); |
|
|
|
Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2, |
|
"Could not allocate thread to kill idle threads." \ |
|
"Skipping. "); |
|
} |
|
|
|
|
|
} |
|
} | } |
myself->exit_self( (ThreadReturnType) 1 ); | myself->exit_self( (ThreadReturnType) 1 ); |
return(0); | return(0); |
|
|
| |
Semaphore MessageQueueService::_polling_sem(0); | Semaphore MessageQueueService::_polling_sem(0); |
AtomicInt MessageQueueService::_stop_polling(0); | AtomicInt MessageQueueService::_stop_polling(0); |
AtomicInt MessageQueueService::_check_idle_flag(0); |
|
| |
| |
MessageQueueService::MessageQueueService( | MessageQueueService::MessageQueueService( |
|
|
_stop_polling = 0; | _stop_polling = 0; |
PEGASUS_ASSERT(_service_count.get() == 0); | PEGASUS_ASSERT(_service_count.get() == 0); |
_meta_dispatcher = new cimom(); | _meta_dispatcher = new cimom(); |
if (_meta_dispatcher == NULL) |
|
{ |
|
throw NullPointer(); |
|
} |
|
// _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService", | // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService", |
// minimum_cnt, maximum_cnt, deallocateWait); | // minimum_cnt, maximum_cnt, deallocateWait); |
// | // |
|
|
| |
if (false == register_service(name, _capabilities, _mask)) | if (false == register_service(name, _capabilities, _mask)) |
{ | { |
//l10n |
|
//throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher"); |
|
MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER", | MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER", |
"MessageQueueService Base Unable to register with Meta Dispatcher"); |
"CIM base message queue service is unable to register with the CIMOM " |
|
"dispatcher."); |
throw BindFailedException(parms); | throw BindFailedException(parms); |
} | } |
| |
_get_polling_list()->insert_back(this); | _get_polling_list()->insert_back(this); |
|
|
} | } |
| |
| |
|
|
} | } |
| |
| |
Boolean MessageQueueService::ForwardOp( |
|
AsyncOpNode *op, |
|
Uint32 destination) |
|
{ |
|
PEGASUS_ASSERT(op != 0); |
|
op->lock(); |
|
op->_op_dest = MessageQueue::lookup(destination); |
|
op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD); |
|
op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK); |
|
op->unlock(); |
|
if (op->_op_dest == 0) |
|
return false; |
|
|
|
return _meta_dispatcher->route_async(op); |
|
} |
|
|
|
|
|
Boolean MessageQueueService::SendAsync( | Boolean MessageQueueService::SendAsync( |
AsyncOpNode *op, | AsyncOpNode *op, |
Uint32 destination, | Uint32 destination, |