version 1.5, 2007/01/11 16:21:54
|
version 1.16.2.1, 2008/08/20 23:05:50
|
|
|
// Set the dying flag so all thread know the destructor has been | // Set the dying flag so all thread know the destructor has been |
// entered | // entered |
_dying++; | _dying++; |
Tracer::trace(TRC_THREAD, Tracer::LEVEL2, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL3, |
"Cleaning up %d idle threads.", _currentThreads.get()); |
"Cleaning up %d idle threads.", _currentThreads.get())); |
| |
while (_currentThreads.get() > 0) | while (_currentThreads.get() > 0) |
{ | { |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: Failure getting sleep_sem or " | "ThreadPool::_loop: Failure getting sleep_sem or " |
"lastActivityTime."); | "lastActivityTime."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: failure on sleep_sem->wait()."); | "ThreadPool::_loop: failure on sleep_sem->wait()."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
pool->_idleThreads.remove(myself); | pool->_idleThreads.remove(myself); |
|
|
// _idleThreads queue. | // _idleThreads queue. |
| |
ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0; | ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0; |
void *parm = 0; |
void *workParm = 0; |
Semaphore *blocking_sem = 0; | Semaphore *blocking_sem = 0; |
| |
try | try |
|
|
work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)) | work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)) |
myself->reference_tsd("work func"); | myself->reference_tsd("work func"); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
parm = myself->reference_tsd("work parm"); |
workParm = myself->reference_tsd("work parm"); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
blocking_sem = | blocking_sem = |
(Semaphore *) myself->reference_tsd("blocking sem"); | (Semaphore *) myself->reference_tsd("blocking sem"); |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: Failure accessing work func, work " | "ThreadPool::_loop: Failure accessing work func, work " |
"parm, or blocking sem."); | "parm, or blocking sem."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
|
|
| |
if (work == 0) | if (work == 0) |
{ | { |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
"ThreadPool::_loop: work func is 0, meaning we should " | "ThreadPool::_loop: work func is 0, meaning we should " |
"exit."); | "exit."); |
break; | break; |
|
|
| |
try | try |
{ | { |
PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
"Work starting."); | "Work starting."); |
work(parm); |
work(workParm); |
PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
"Work finished."); | "Work finished."); |
} | } |
catch (Exception& e) | catch (Exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Exception from work in ThreadPool::_loop: ") + | String("Exception from work in ThreadPool::_loop: ") + |
e.getMessage()); | e.getMessage()); |
} | } |
#if !defined(PEGASUS_OS_LSB) |
|
catch (const exception& e) | catch (const exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Exception from work in ThreadPool::_loop: ") + | String("Exception from work in ThreadPool::_loop: ") + |
e.what()); | e.what()); |
} | } |
#endif |
|
catch (...) | catch (...) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Unknown exception from work in ThreadPool::_loop."); | "Unknown exception from work in ThreadPool::_loop."); |
} | } |
| |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: Adding thread to idle pool failed."); | "ThreadPool::_loop: Adding thread to idle pool failed."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
pool->_currentThreads--; | pool->_currentThreads--; |
|
|
} | } |
catch (const Exception & e) | catch (const Exception & e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Caught exception: \"" + e.getMessage() + "\". Exiting _loop."); | "Caught exception: \"" + e.getMessage() + "\". Exiting _loop."); |
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Caught unrecognized exception. Exiting _loop."); | "Caught unrecognized exception. Exiting _loop."); |
} | } |
| |
|
|
{ | { |
if (_dying.get()) | if (_dying.get()) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3, |
"ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); | "ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); |
return PEGASUS_THREAD_UNAVAILABLE; | return PEGASUS_THREAD_UNAVAILABLE; |
} | } |
|
|
| |
if (th == 0) | if (th == 0) |
{ | { |
Tracer::trace(TRC_THREAD, Tracer::LEVEL2, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL1, |
"ThreadPool::allocate_and_awaken: Insufficient resources: " | "ThreadPool::allocate_and_awaken: Insufficient resources: " |
" pool = %s, running threads = %d, idle threads = %d", | " pool = %s, running threads = %d, idle threads = %d", |
_key, _runningThreads.size(), _idleThreads.size()); |
_key, _runningThreads.size(), _idleThreads.size())); |
return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; | return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; |
} | } |
| |
// initialize the thread data with the work function and parameters | // initialize the thread data with the work function and parameters |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL4, |
"Initializing thread with work function and parameters: parm = %p", |
"Initializing thread(%s)" |
parm); |
" with work function and parameters: parm = %p", |
|
Threads::id(th->getThreadHandle().thid).buffer, |
|
parm)); |
| |
th->delete_tsd("work func"); | th->delete_tsd("work func"); |
th->put_tsd("work func", NULL, | th->put_tsd("work func", NULL, |
|
|
Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem"); | Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem"); |
PEGASUS_ASSERT(sleep_sem != 0); | PEGASUS_ASSERT(sleep_sem != 0); |
| |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken"); |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
|
"Signal thread to awaken"); |
sleep_sem->signal(); | sleep_sem->signal(); |
th->dereference_tsd(); | th->dereference_tsd(); |
} | } |
catch (...) | catch (...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::allocate_and_awaken: Operation Failed."); | "ThreadPool::allocate_and_awaken: Operation Failed."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
// ATTN: Error result has not yet been defined | // ATTN: Error result has not yet been defined |
|
|
| |
Uint32 numThreadsCleanedUp = 0; | Uint32 numThreadsCleanedUp = 0; |
| |
size_t numIdleThreads = _idleThreads.size(); |
Uint32 numIdleThreads = _idleThreads.size(); |
for (size_t i = 0; i < numIdleThreads; i++) |
for (Uint32 i = 0; i < numIdleThreads; i++) |
{ | { |
// Do not dip below the minimum thread count | // Do not dip below the minimum thread count |
if (_currentThreads.get() <= (Uint32) _minThreads) | if (_currentThreads.get() <= (Uint32) _minThreads) |
|
|
} | } |
| |
struct timeval *lastActivityTime; | struct timeval *lastActivityTime; |
try |
if (!thread->try_reference_tsd( |
{ |
"last activity time", (void**)&lastActivityTime)) |
lastActivityTime = |
|
(struct timeval *) thread-> |
|
try_reference_tsd("last activity time"); |
|
PEGASUS_ASSERT(lastActivityTime != 0); |
|
} |
|
catch (...) |
|
{ | { |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
_idleThreads.insert_back(thread); | _idleThreads.insert_back(thread); |
break; | break; |
} | } |
|
PEGASUS_ASSERT(lastActivityTime != 0); |
| |
Boolean cleanupThisThread = | Boolean cleanupThisThread = |
_timeIntervalExpired(lastActivityTime, &_deallocateWait); | _timeIntervalExpired(lastActivityTime, &_deallocateWait); |
|
|
struct timeval* start, | struct timeval* start, |
struct timeval* interval) | struct timeval* interval) |
{ | { |
|
PEGASUS_ASSERT(interval != 0); |
|
|
// never time out if the interval is zero | // never time out if the interval is zero |
if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0)) |
if ((interval->tv_sec == 0) && (interval->tv_usec == 0)) |
{ | { |
return false; | return false; |
} | } |
|
|
struct timeval now, finish, remaining; | struct timeval now, finish, remaining; |
Uint32 usec; | Uint32 usec; |
Time::gettimeofday(&now); | Time::gettimeofday(&now); |
|
|
|
#if defined(PEGASUS_OS_SOLARIS) |
|
memset(&remaining, 0, sizeof(remaining)); |
|
#else |
Time::gettimeofday(&remaining); // Avoid valgrind error | Time::gettimeofday(&remaining); // Avoid valgrind error |
|
#endif |
| |
finish.tv_sec = start->tv_sec + interval->tv_sec; | finish.tv_sec = start->tv_sec + interval->tv_sec; |
usec = start->tv_usec + interval->tv_usec; | usec = start->tv_usec + interval->tv_usec; |
|
|
| |
if (th->run() != PEGASUS_THREAD_OK) | if (th->run() != PEGASUS_THREAD_OK) |
{ | { |
Tracer::trace(TRC_THREAD, Tracer::LEVEL2, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL1, |
"Could not create thread. Error code is %d.", errno); |
"Could not create thread. Error code is %d.", errno)); |
delete th; | delete th; |
return 0; | return 0; |
} | } |
_currentThreads++; | _currentThreads++; |
Threads::yield(); |
|
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return th; | return th; |
|
|
{ | { |
if (th == 0) | if (th == 0) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_addToIdleThreadsQueue: Thread pointer is null."); | "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null."); |
throw NullPointer(); | throw NullPointer(); |
} | } |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front " | "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front " |
"failed."); | "failed."); |
} | } |