version 1.107, 2005/05/27 02:34:02
|
version 1.112, 2005/06/14 17:10:37
|
|
|
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
// #include <iostream.h> |
#include "MessageQueueService.h" | #include "MessageQueueService.h" |
#include <Pegasus/Common/Tracer.h> | #include <Pegasus/Common/Tracer.h> |
#include <Pegasus/Common/MessageLoader.h> //l10n | #include <Pegasus/Common/MessageLoader.h> //l10n |
|
|
{ | { |
return _thread_pool; | return _thread_pool; |
} | } |
|
// |
|
// MAX_THREADS_PER_SVC_QUEUE_LIMIT |
|
// |
|
// 5000 is seriously too high a number for the limit but since |
|
// previously there was no limit at all this is intended to approximate |
|
// that behavior. In my testing on a unit processor system the system |
|
// behaved best with a low number 2 to 5 for the MAX_THREADS_PER_SVC_QUEUE. |
|
// When set to 1000 the system deadlocked with indications that were |
|
// not delivered and apparently left sitting within the server in a queue. |
|
// |
|
// JR Wunderlich Jun 6, 2005 |
|
// |
|
|
|
#define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000 |
|
|
|
Uint32 max_threads_per_svc_queue; |
| |
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL |
MessageQueueService::kill_idle_threads(void *parm) | MessageQueueService::kill_idle_threads(void *parm) |
|
|
} | } |
| |
list->lock(); | list->lock(); |
|
int list_index = 0; |
MessageQueueService *service = list->next(0); | MessageQueueService *service = list->next(0); |
while(service != NULL) | while(service != NULL) |
{ | { |
if (service->_incoming.count() > 0 && service->_die.value() == 0) |
int rtn; |
|
rtn = true; |
|
if (service->_incoming.count() > 0 |
|
&& service->_die.value() == 0 |
|
&& service->_threads <= max_threads_per_svc_queue) |
|
rtn = _thread_pool->allocate_and_awaken(service, _req_proc, |
|
&_polling_sem); |
|
|
|
// if no more threads available, break from processing loop |
|
if (rtn == false) |
{ | { |
_thread_pool->allocate_and_awaken(service, _req_proc); |
service = NULL; |
} | } |
|
else |
|
{ |
service = list->next(service); | service = list->next(service); |
} | } |
|
} |
list->unlock(); | list->unlock(); |
|
|
if (_check_idle_flag.value() != 0) | if (_check_idle_flag.value() != 0) |
{ | { |
_check_idle_flag = 0; | _check_idle_flag = 0; |
|
// try to do idle thread clean up processing when system is not busy |
|
// if system is busy there may not be a thread available to allocate |
|
// so nothing will be done and that is OK. |
|
|
|
_thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem); |
| |
// If there are insufficent resources to run |
|
// kill_idle_threads, then just return. |
|
_thread_pool->allocate_and_awaken(service, kill_idle_threads); |
|
} | } |
} | } |
myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); | myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); |
|
|
_incoming(true, 0), | _incoming(true, 0), |
_incoming_queue_shutdown(0) | _incoming_queue_shutdown(0) |
{ | { |
|
|
_capabilities = (capabilities | module_capabilities::async); | _capabilities = (capabilities | module_capabilities::async); |
| |
_default_op_timeout.tv_sec = 30; | _default_op_timeout.tv_sec = 30; |
_default_op_timeout.tv_usec = 100; | _default_op_timeout.tv_usec = 100; |
| |
|
max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE; |
|
|
|
// if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT |
|
// then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT |
|
|
|
if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT) |
|
{ |
|
max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT; |
|
} |
|
|
|
// if requested threads eq 0 (unlimited) |
|
// then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT |
|
|
|
if (max_threads_per_svc_queue == 0) |
|
{ |
|
max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT; |
|
} |
|
|
|
// cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl; |
|
// cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl; |
|
|
|
|
AutoMutex autoMut(_meta_dispatcher_mutex); | AutoMutex autoMut(_meta_dispatcher_mutex); |
| |
if (_meta_dispatcher == 0) | if (_meta_dispatcher == 0) |
|
|
{ | { |
throw NullPointer(); | throw NullPointer(); |
} | } |
|
// _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService", |
|
// minimum_cnt, maximum_cnt, deallocateWait); |
|
// |
_thread_pool = | _thread_pool = |
new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait); | new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait); |
} | } |
|
|
// Clean up in case there are extra stuff on the queue. | // Clean up in case there are extra stuff on the queue. |
while (_incoming.count()) | while (_incoming.count()) |
{ | { |
|
try { |
delete _incoming.remove_first(); | delete _incoming.remove_first(); |
|
} catch (const ListClosed &e) |
|
{ |
|
// If the list is closed, there is nothing we can do. |
|
break; |
|
} |
} | } |
} | } |
| |
|
|
| |
msg->op->_op_dest = this; | msg->op->_op_dest = this; |
msg->op->_request.insert_first(msg); | msg->op->_request.insert_first(msg); |
|
try { |
_incoming.insert_last_wait(msg->op); | _incoming.insert_last_wait(msg->op); |
_polling_sem.signal(); | _polling_sem.signal(); |
|
} catch (const ListClosed &) |
|
{ |
|
// This means the queue has already been shut-down (happens when there |
|
// are two AsyncIoctrl::IO_CLOSE messages generated and one got first |
|
// processed. |
|
delete msg; |
|
} |
|
catch (const Permission &) |
|
{ |
|
delete msg; |
|
} |
} | } |
| |
| |