version 1.13, 2007/06/05 08:13:24
|
version 1.22, 2008/12/01 17:49:57
|
|
|
//%2006//////////////////////////////////////////////////////////////////////// |
//%LICENSE//////////////////////////////////////////////////////////////// |
// | // |
// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development |
// Licensed to The Open Group (TOG) under one or more contributor license |
// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. |
// agreements. Refer to the OpenPegasusNOTICE.txt file distributed with |
// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.; |
// this work for additional information regarding copyright ownership. |
// IBM Corp.; EMC Corporation, The Open Group. |
// Each contributor licenses this file to you under the OpenPegasus Open |
// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; |
// Source License; you may not use this file except in compliance with the |
// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. |
// License. |
// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
|
// EMC Corporation; VERITAS Software Corporation; The Open Group. |
|
// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
|
// EMC Corporation; Symantec Corporation; The Open Group. |
|
// | // |
// Permission is hereby granted, free of charge, to any person obtaining a copy |
// Permission is hereby granted, free of charge, to any person obtaining a |
// of this software and associated documentation files (the "Software"), to |
// copy of this software and associated documentation files (the "Software"), |
// deal in the Software without restriction, including without limitation the |
// to deal in the Software without restriction, including without limitation |
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
// the rights to use, copy, modify, merge, publish, distribute, sublicense, |
// sell copies of the Software, and to permit persons to whom the Software is |
// and/or sell copies of the Software, and to permit persons to whom the |
// furnished to do so, subject to the following conditions: |
// Software is furnished to do so, subject to the following conditions: |
// | // |
// THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN |
// The above copyright notice and this permission notice shall be included |
// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED |
// in all copies or substantial portions of the Software. |
// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT |
|
// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR |
|
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT |
|
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
|
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
|
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
// | // |
//============================================================================== |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
|
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
|
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, |
|
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
|
// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
// |
|
////////////////////////////////////////////////////////////////////////// |
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
|
// Set the dying flag so all thread know the destructor has been | // Set the dying flag so all thread know the destructor has been |
// entered | // entered |
_dying++; | _dying++; |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL2, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL3, |
"Cleaning up %d idle threads.", _currentThreads.get())); | "Cleaning up %d idle threads.", _currentThreads.get())); |
| |
while (_currentThreads.get() > 0) | while (_currentThreads.get() > 0) |
|
|
| |
try | try |
{ | { |
sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem"); |
sleep_sem = (Semaphore *) myself->reference_tsd(TSD_SLEEP_SEM); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
PEGASUS_ASSERT(sleep_sem != 0); | PEGASUS_ASSERT(sleep_sem != 0); |
| |
lastActivityTime = | lastActivityTime = |
(struct timeval *) myself-> | (struct timeval *) myself-> |
reference_tsd("last activity time"); |
reference_tsd(TSD_LAST_ACTIVITY_TIME); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
PEGASUS_ASSERT(lastActivityTime != 0); | PEGASUS_ASSERT(lastActivityTime != 0); |
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: Failure getting sleep_sem or " | "ThreadPool::_loop: Failure getting sleep_sem or " |
"lastActivityTime."); | "lastActivityTime."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: failure on sleep_sem->wait()."); | "ThreadPool::_loop: failure on sleep_sem->wait()."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
pool->_idleThreads.remove(myself); | pool->_idleThreads.remove(myself); |
|
|
// _idleThreads queue. | // _idleThreads queue. |
| |
ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0; | ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0; |
void *parm = 0; |
void *workParm = 0; |
Semaphore *blocking_sem = 0; | Semaphore *blocking_sem = 0; |
| |
try | try |
{ | { |
work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)) | work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)) |
myself->reference_tsd("work func"); |
myself->reference_tsd(TSD_WORK_FUNC); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
parm = myself->reference_tsd("work parm"); |
workParm = myself->reference_tsd(TSD_WORK_PARM); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
blocking_sem = | blocking_sem = |
(Semaphore *) myself->reference_tsd("blocking sem"); |
(Semaphore *) myself->reference_tsd(TSD_BLOCKING_SEM); |
myself->dereference_tsd(); | myself->dereference_tsd(); |
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: Failure accessing work func, work " | "ThreadPool::_loop: Failure accessing work func, work " |
"parm, or blocking sem."); | "parm, or blocking sem."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
|
|
{ | { |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, | PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
"Work starting."); | "Work starting."); |
work(parm); |
work(workParm); |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, | PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
"Work finished."); | "Work finished."); |
} | } |
catch (Exception& e) | catch (Exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Exception from work in ThreadPool::_loop: ") + |
"Exception from work in ThreadPool::_loop: %s", |
e.getMessage()); |
(const char*)e.getMessage().getCString())); |
} | } |
catch (const exception& e) | catch (const exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, |
String("Exception from work in ThreadPool::_loop: ") + |
"Exception from work in ThreadPool::_loop: %s",e.what())); |
e.what()); |
|
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Unknown exception from work in ThreadPool::_loop."); | "Unknown exception from work in ThreadPool::_loop."); |
} | } |
| |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_loop: Adding thread to idle pool failed."); | "ThreadPool::_loop: Adding thread to idle pool failed."); |
PEGASUS_ASSERT(false); | PEGASUS_ASSERT(false); |
pool->_currentThreads--; | pool->_currentThreads--; |
|
|
} | } |
catch (const Exception & e) | catch (const Exception & e) |
{ | { |
PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Caught exception: \"" + e.getMessage() + "\". Exiting _loop."); |
"Caught exception: \"%s\". Exiting _loop.", |
|
(const char*)e.getMessage().getCString())); |
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"Caught unrecognized exception. Exiting _loop."); | "Caught unrecognized exception. Exiting _loop."); |
} | } |
| |
|
|
{ | { |
if (_dying.get()) | if (_dying.get()) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3, |
"ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); | "ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); |
return PEGASUS_THREAD_UNAVAILABLE; | return PEGASUS_THREAD_UNAVAILABLE; |
} | } |
|
|
| |
if (th == 0) | if (th == 0) |
{ | { |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL2, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL1, |
"ThreadPool::allocate_and_awaken: Insufficient resources: " | "ThreadPool::allocate_and_awaken: Insufficient resources: " |
" pool = %s, running threads = %d, idle threads = %d", | " pool = %s, running threads = %d, idle threads = %d", |
_key, _runningThreads.size(), _idleThreads.size())); | _key, _runningThreads.size(), _idleThreads.size())); |
|
|
| |
// initialize the thread data with the work function and parameters | // initialize the thread data with the work function and parameters |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL4, | PEG_TRACE((TRC_THREAD, Tracer::LEVEL4, |
"Initializing thread with work function and parameters: parm = %p", |
"Initializing thread(%s)" |
|
" with work function and parameters: parm = %p", |
|
Threads::id(th->getThreadHandle().thid).buffer, |
parm)); | parm)); |
| |
th->delete_tsd("work func"); |
th->delete_tsd(TSD_WORK_FUNC); |
th->put_tsd("work func", NULL, |
th->put_tsd(TSD_WORK_FUNC, NULL, |
sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) | sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) |
(void *)), (void *) work); | (void *)), (void *) work); |
th->delete_tsd("work parm"); |
th->delete_tsd(TSD_WORK_PARM); |
th->put_tsd("work parm", NULL, sizeof (void *), parm); |
th->put_tsd(TSD_WORK_PARM, NULL, sizeof (void *), parm); |
th->delete_tsd("blocking sem"); |
th->delete_tsd(TSD_BLOCKING_SEM); |
if (blocking != 0) | if (blocking != 0) |
th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking); |
th->put_tsd(TSD_BLOCKING_SEM, NULL, sizeof (Semaphore *), blocking); |
| |
// put the thread on the running list | // put the thread on the running list |
_runningThreads.insert_front(th); | _runningThreads.insert_front(th); |
| |
// signal the thread's sleep semaphore to awaken it | // signal the thread's sleep semaphore to awaken it |
Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem"); |
Semaphore *sleep_sem = (Semaphore *) th->reference_tsd(TSD_SLEEP_SEM); |
PEGASUS_ASSERT(sleep_sem != 0); | PEGASUS_ASSERT(sleep_sem != 0); |
| |
PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, | PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4, |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::allocate_and_awaken: Operation Failed."); | "ThreadPool::allocate_and_awaken: Operation Failed."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
// ATTN: Error result has not yet been defined | // ATTN: Error result has not yet been defined |
|
|
break; | break; |
} | } |
| |
struct timeval *lastActivityTime; |
void* tsd = thread->reference_tsd(TSD_LAST_ACTIVITY_TIME); |
try |
struct timeval *lastActivityTime = |
{ |
reinterpret_cast<struct timeval*>(tsd); |
lastActivityTime = |
|
(struct timeval *) thread-> |
|
try_reference_tsd("last activity time"); |
|
PEGASUS_ASSERT(lastActivityTime != 0); | PEGASUS_ASSERT(lastActivityTime != 0); |
} |
|
catch (...) |
|
{ |
|
PEGASUS_ASSERT(false); |
|
_idleThreads.insert_back(thread); |
|
break; |
|
} |
|
| |
Boolean cleanupThisThread = | Boolean cleanupThisThread = |
_timeIntervalExpired(lastActivityTime, &_deallocateWait); | _timeIntervalExpired(lastActivityTime, &_deallocateWait); |
|
|
{ | { |
PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread"); | PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread"); |
| |
// Set the "work func" and "work parm" to 0 so _loop() knows to exit. |
// Set the TSD_WORK_FUNC and TSD_WORK_PARM to 0 so _loop() knows to exit. |
thread->delete_tsd("work func"); |
thread->delete_tsd(TSD_WORK_FUNC); |
thread->put_tsd("work func", 0, |
thread->put_tsd(TSD_WORK_FUNC, 0, |
sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) | sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) |
(void *)), (void *) 0); | (void *)), (void *) 0); |
thread->delete_tsd("work parm"); |
thread->delete_tsd(TSD_WORK_PARM); |
thread->put_tsd("work parm", 0, sizeof (void *), 0); |
thread->put_tsd(TSD_WORK_PARM, 0, sizeof (void *), 0); |
| |
// signal the thread's sleep semaphore to awaken it | // signal the thread's sleep semaphore to awaken it |
Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem"); |
Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd(TSD_SLEEP_SEM); |
PEGASUS_ASSERT(sleep_sem != 0); | PEGASUS_ASSERT(sleep_sem != 0); |
sleep_sem->signal(); | sleep_sem->signal(); |
thread->dereference_tsd(); | thread->dereference_tsd(); |
|
|
struct timeval now, finish, remaining; | struct timeval now, finish, remaining; |
Uint32 usec; | Uint32 usec; |
Time::gettimeofday(&now); | Time::gettimeofday(&now); |
Time::gettimeofday(&remaining); // Avoid valgrind error |
|
|
memset(&remaining, 0, sizeof(remaining)); |
| |
finish.tv_sec = start->tv_sec + interval->tv_sec; | finish.tv_sec = start->tv_sec + interval->tv_sec; |
usec = start->tv_usec + interval->tv_usec; | usec = start->tv_usec + interval->tv_usec; |
|
|
// we signal the semaphore | // we signal the semaphore |
Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); | Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); |
th->put_tsd( | th->put_tsd( |
"sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem); |
TSD_SLEEP_SEM, &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem); |
| |
struct timeval* lastActivityTime = | struct timeval* lastActivityTime = |
(struct timeval *)::operator new(sizeof (struct timeval)); | (struct timeval *)::operator new(sizeof (struct timeval)); |
Time::gettimeofday(lastActivityTime); | Time::gettimeofday(lastActivityTime); |
| |
th->put_tsd( | th->put_tsd( |
"last activity time", |
TSD_LAST_ACTIVITY_TIME, |
thread_data::default_delete, | thread_data::default_delete, |
sizeof(struct timeval), | sizeof(struct timeval), |
(void*) lastActivityTime); | (void*) lastActivityTime); |
|
|
| |
if (th->run() != PEGASUS_THREAD_OK) | if (th->run() != PEGASUS_THREAD_OK) |
{ | { |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL2, |
PEG_TRACE((TRC_THREAD, Tracer::LEVEL1, |
"Could not create thread. Error code is %d.", errno)); | "Could not create thread. Error code is %d.", errno)); |
delete th; | delete th; |
return 0; | return 0; |
|
|
{ | { |
if (th == 0) | if (th == 0) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_addToIdleThreadsQueue: Thread pointer is null."); | "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null."); |
throw NullPointer(); | throw NullPointer(); |
} | } |
|
|
} | } |
catch (...) | catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, |
"ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front " | "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front " |
"failed."); | "failed."); |
} | } |