version 1.2, 2006/08/09 21:12:42
|
version 1.5, 2007/01/11 16:21:54
|
|
|
// | // |
//============================================================================== | //============================================================================== |
// | // |
// Author: Mike Day (mdday@us.ibm.com) |
|
// |
|
// Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01 |
|
// added nsk platform support |
|
// Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) |
|
// Amit K Arora, IBM (amita@in.ibm.com) for PEP#101 |
|
// Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com) |
|
// David Dillard, VERITAS Software Corp. |
|
// (david.dillard@veritas.com) |
|
// |
|
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
#include "ThreadPool.h" | #include "ThreadPool.h" |
|
|
// | // |
/////////////////////////////////////////////////////////////////////////////// | /////////////////////////////////////////////////////////////////////////////// |
| |
ThreadPool::ThreadPool(Sint16 initialSize, |
ThreadPool::ThreadPool( |
|
Sint16 initialSize, |
const char *key, | const char *key, |
Sint16 minThreads, | Sint16 minThreads, |
Sint16 maxThreads, | Sint16 maxThreads, |
struct timeval | struct timeval |
&deallocateWait):_maxThreads(maxThreads), |
&deallocateWait) |
_minThreads(minThreads), _currentThreads(0), _idleThreads(), |
: _maxThreads(maxThreads), |
_runningThreads(), _dying(0) |
_minThreads(minThreads), |
|
_currentThreads(0), |
|
_idleThreads(), |
|
_runningThreads(), |
|
_dying(0) |
{ | { |
_deallocateWait.tv_sec = deallocateWait.tv_sec; | _deallocateWait.tv_sec = deallocateWait.tv_sec; |
_deallocateWait.tv_usec = deallocateWait.tv_usec; | _deallocateWait.tv_usec = deallocateWait.tv_usec; |
|
|
pool->_idleThreads.remove(myself); | pool->_idleThreads.remove(myself); |
pool->_currentThreads--; | pool->_currentThreads--; |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return ((ThreadReturnType) 1); |
return (ThreadReturnType) 1; |
} | } |
| |
while (1) | while (1) |
|
|
pool->_idleThreads.remove(myself); | pool->_idleThreads.remove(myself); |
pool->_currentThreads--; | pool->_currentThreads--; |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return ((ThreadReturnType) 1); |
return (ThreadReturnType) 1; |
} | } |
| |
// When we awaken we reside on the _runningThreads queue, not the | // When we awaken we reside on the _runningThreads queue, not the |
|
|
catch(...) | catch(...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, | Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
"ThreadPool::_loop: Failure accessing work func, work parm, " |
"ThreadPool::_loop: Failure accessing work func, work " |
"or blocking sem."); |
"parm, or blocking sem."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
pool->_idleThreads.remove(myself); | pool->_idleThreads.remove(myself); |
pool->_currentThreads--; | pool->_currentThreads--; |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return ((ThreadReturnType) 1); |
return (ThreadReturnType) 1; |
} | } |
| |
if (work == 0) | if (work == 0) |
{ | { |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, | Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
"ThreadPool::_loop: work func is 0, meaning we should exit."); |
"ThreadPool::_loop: work func is 0, meaning we should " |
|
"exit."); |
break; | break; |
} | } |
| |
|
|
catch(Exception & e) | catch(Exception & e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, | PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
String |
String("Exception from work in ThreadPool::_loop: ") + |
("Exception from work in ThreadPool::_loop: ") |
e.getMessage()); |
+ e.getMessage()); |
|
} | } |
#if !defined(PEGASUS_OS_LSB) | #if !defined(PEGASUS_OS_LSB) |
catch(const exception & e) | catch(const exception & e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, | PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
String |
String("Exception from work in ThreadPool::_loop: ") + |
("Exception from work in ThreadPool::_loop: ") |
e.what()); |
+ e.what()); |
|
} | } |
#endif | #endif |
catch(...) | catch(...) |
|
|
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
pool->_currentThreads--; | pool->_currentThreads--; |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return ((ThreadReturnType) 1); |
return (ThreadReturnType) 1; |
} | } |
} | } |
} | } |
catch(const Exception & e) | catch(const Exception & e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, | PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
"Caught exception: \"" + e.getMessage() + |
"Caught exception: \"" + e.getMessage() + "\". Exiting _loop."); |
"\". Exiting _loop."); |
|
} | } |
catch(...) | catch(...) |
{ | { |
|
|
} | } |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return ((ThreadReturnType) 0); |
return (ThreadReturnType) 0; |
} | } |
| |
ThreadStatus ThreadPool::allocate_and_awaken(void *parm, |
ThreadStatus ThreadPool::allocate_and_awaken( |
ThreadReturnType |
void* parm, |
(PEGASUS_THREAD_CDECL * |
ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*), |
work) (void *), |
|
Semaphore * blocking) | Semaphore * blocking) |
{ | { |
PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken"); | PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken"); |
|
|
| |
if (th == 0) | if (th == 0) |
{ | { |
// ATTN-DME-P3-20031103: This trace message should not be |
Tracer::trace(TRC_THREAD, Tracer::LEVEL2, |
// be labeled TRC_DISCARDED_DATA, because it does not |
|
// necessarily imply that a failure has occurred. However, |
|
// this label is being used temporarily to help isolate |
|
// the cause of client timeout problems. |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::allocate_and_awaken: Insufficient resources: " | "ThreadPool::allocate_and_awaken: Insufficient resources: " |
" pool = %s, running threads = %d, idle threads = %d", | " pool = %s, running threads = %d, idle threads = %d", |
_key, _runningThreads.size(), _idleThreads.size()); | _key, _runningThreads.size(), _idleThreads.size()); |
|
|
| |
Uint32 numThreadsCleanedUp = 0; | Uint32 numThreadsCleanedUp = 0; |
| |
Uint32 numIdleThreads = _idleThreads.size(); |
size_t numIdleThreads = _idleThreads.size(); |
for (Uint32 i = 0; i < numIdleThreads; i++) |
for (size_t i = 0; i < numIdleThreads; i++) |
{ | { |
// Do not dip below the minimum thread count | // Do not dip below the minimum thread count |
if (_currentThreads.get() <= (Uint32) _minThreads) | if (_currentThreads.get() <= (Uint32) _minThreads) |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
Boolean ThreadPool::_timeIntervalExpired(struct timeval *start, |
Boolean ThreadPool::_timeIntervalExpired( |
|
struct timeval* start, |
struct timeval *interval) | struct timeval *interval) |
{ | { |
// never time out if the interval is zero | // never time out if the interval is zero |
|
|
// initial count is zero, loop function will sleep until | // initial count is zero, loop function will sleep until |
// we signal the semaphore | // we signal the semaphore |
Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); | Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); |
th->put_tsd("sleep sem", &_deleteSemaphore, sizeof (Semaphore), |
th->put_tsd( |
(void *) sleep_sem); |
"sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem); |
| |
struct timeval *lastActivityTime = | struct timeval *lastActivityTime = |
(struct timeval *)::operator new(sizeof (struct timeval)); | (struct timeval *)::operator new(sizeof (struct timeval)); |
Time::gettimeofday(lastActivityTime); | Time::gettimeofday(lastActivityTime); |
| |
th->put_tsd("last activity time", thread_data::default_delete, |
th->put_tsd( |
sizeof (struct timeval), (void *) lastActivityTime); |
"last activity time", |
|
thread_data::default_delete, |
|
sizeof(struct timeval), |
|
(void*) lastActivityTime); |
// thread will enter _loop() and sleep on sleep_sem until we signal it | // thread will enter _loop() and sleep on sleep_sem until we signal it |
| |
if (th->run() != PEGASUS_THREAD_OK) | if (th->run() != PEGASUS_THREAD_OK) |