version 1.56, 2003/10/22 14:26:04
|
version 1.57, 2003/10/23 05:59:53
|
|
|
} | } |
// l10n end | // l10n end |
| |
|
#if 0 |
|
|
// two special synchronization classes for ThreadPool | // two special synchronization classes for ThreadPool |
// | // |
| |
|
|
} | } |
Mutex* _mut; | Mutex* _mut; |
}; | }; |
|
#endif |
| |
class try_mutex | class try_mutex |
{ | { |
|
|
| |
ThreadPool::~ThreadPool(void) | ThreadPool::~ThreadPool(void) |
{ | { |
|
PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool"); |
try | try |
{ | { |
// set the dying flag so all thread know the destructor has been entered |
// Set the dying flag so all thread know the destructor has been entered |
{ | { |
auto_mutex(&(this->_monitor)); |
// ThreadPool::~ThreadPool will wait to set the _dying flag until |
|
// it can acquire the lock. Once this lock is acquired, the thread |
|
// acquiring the lock must be able to assume that the value of |
|
// _dying will not change until the lock is released. |
|
|
|
// Once the _dying flag is set, functions (e.g., kill_dead_threads, |
|
// and allocate_and_awaken) that manipulate the ThreadPool |
|
// queues (_pool, _dead, and _running), should not be allowed |
|
// to run. |
|
|
|
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
|
"Attempting to aquire _monitor lock"); |
|
auto_mutex dying_lock(&(this->_monitor)); |
|
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
|
"Acquired _monitor lock"); |
_dying++; | _dying++; |
} | } |
// remove from the global pools list | // remove from the global pools list |
|
|
while(th != 0) | while(th != 0) |
{ | { |
sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); | sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); |
|
PEGASUS_ASSERT(sleep_sem != 0); |
|
|
if(sleep_sem == 0) | if(sleep_sem == 0) |
{ | { |
th->dereference_tsd(); | th->dereference_tsd(); |
throw NullPointer(); |
|
} | } |
|
else |
|
{ |
// Signal to get the thread out of the work loop. | // Signal to get the thread out of the work loop. |
sleep_sem->signal(); | sleep_sem->signal(); |
| |
|
|
th->cancel(); | th->cancel(); |
th->join(); | th->join(); |
delete th; | delete th; |
|
} |
th = _pool.remove_first(); | th = _pool.remove_first(); |
} | } |
|
|
th = _dead.remove_first(); | th = _dead.remove_first(); |
while(th != 0) | while(th != 0) |
{ | { |
sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); | sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); |
|
PEGASUS_ASSERT(sleep_sem != 0); |
| |
if(sleep_sem == 0) | if(sleep_sem == 0) |
{ | { |
th->dereference_tsd(); | th->dereference_tsd(); |
throw NullPointer(); |
|
} | } |
|
else |
|
{ |
|
//ATTN-DME-P3-20030322: _dead queue processing in |
|
//ThreadPool::~ThreadPool is inconsistent with the |
|
//processing in kill_dead_threads. Is this correct? |
| |
// signal the thread's sleep semaphore | // signal the thread's sleep semaphore |
sleep_sem->signal(); | sleep_sem->signal(); |
|
|
th->cancel(); | th->cancel(); |
th->join(); | th->join(); |
delete th; | delete th; |
|
} |
th = _dead.remove_first(); | th = _dead.remove_first(); |
} | } |
| |
|
|
// signal the thread's sleep semaphore | // signal the thread's sleep semaphore |
| |
sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); | sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); |
|
PEGASUS_ASSERT(sleep_sem != 0); |
|
|
if(sleep_sem == 0 ) | if(sleep_sem == 0 ) |
{ | { |
th->dereference_tsd(); | th->dereference_tsd(); |
throw NullPointer(); |
|
} | } |
|
else |
|
{ |
sleep_sem->signal(); | sleep_sem->signal(); |
sleep_sem->signal(); | sleep_sem->signal(); |
th->dereference_tsd(); | th->dereference_tsd(); |
|
|
| |
th->join(); | th->join(); |
delete th; | delete th; |
|
} |
th = _running.remove_first(); | th = _running.remove_first(); |
} | } |
} | } |
|
|
} | } |
| |
catch(...) | catch(...) |
|
|
Thread *myself = (Thread *)parm; | Thread *myself = (Thread *)parm; |
if(myself == 0) | if(myself == 0) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: Thread pointer is null"); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
throw NullPointer(); | throw NullPointer(); |
} | } |
|
|
ThreadPool *pool = (ThreadPool *)myself->get_parm(); | ThreadPool *pool = (ThreadPool *)myself->get_parm(); |
if(pool == 0 ) | if(pool == 0 ) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: ThreadPool pointer is null"); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
throw NullPointer(); | throw NullPointer(); |
} | } |
if(pool->_dying.value()) | if(pool->_dying.value()) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: ThreadPool is dying(1)"); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
|
|
| |
catch(...) | catch(...) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer"); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
| |
if(sleep_sem == 0 || deadlock_timer == 0) | if(sleep_sem == 0 || deadlock_timer == 0) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: sleep_sem or deadlock_timer are null."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
|
|
{ | { |
try | try |
{ | { |
if(pool->_dying.value()) |
|
break; |
|
} |
|
catch(...) |
|
{ |
|
return((PEGASUS_THREAD_RETURN)0); |
|
} |
|
|
|
try |
|
{ |
|
sleep_sem->wait(); | sleep_sem->wait(); |
} | } |
catch(IPCException& ) | catch(IPCException& ) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: failure on sleep_sem->wait()."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
| |
// when we awaken we reside on the running queue, not the pool queue | // when we awaken we reside on the running queue, not the pool queue |
if(pool->_dying.value()) |
|
{ |
|
PEG_METHOD_EXIT(); |
|
return((PEGASUS_THREAD_RETURN)0); |
|
} |
|
|
|
| |
PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0; | PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0; |
void *parm = 0; | void *parm = 0; |
|
|
} | } |
catch(IPCException &) | catch(IPCException &) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
| |
if(_work == 0) | if(_work == 0) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: work func is null."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
throw NullPointer(); |
return((PEGASUS_THREAD_RETURN)0); |
} | } |
| |
if(_work == | if(_work == |
(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker) | (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker) |
{ | { |
|
PEG_METHOD_EXIT(); |
_work(parm); | _work(parm); |
} | } |
| |
gettimeofday(deadlock_timer, NULL); | gettimeofday(deadlock_timer, NULL); |
try |
|
{ |
if (pool->_dying.value() == 0) |
{ | { |
timed_mutex(&(pool->_monitor), 1000); |
try |
if(pool->_dying.value()) |
|
{ | { |
_undertaker(parm); |
|
} |
|
} |
|
_work(parm); | _work(parm); |
} | } |
catch(...) | catch(...) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: execution of _work failed."); |
|
PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
|
} |
| |
// put myself back onto the available list | // put myself back onto the available list |
try | try |
{ | { |
timed_mutex(&(pool->_monitor), 1000); |
|
if(pool->_dying.value() == 0) | if(pool->_dying.value() == 0) |
{ | { |
gettimeofday(deadlock_timer, NULL); | gettimeofday(deadlock_timer, NULL); |
|
|
// If we are not on _running then ~ThreadPool has removed | // If we are not on _running then ~ThreadPool has removed |
// us and now "owns" our pointer. | // us and now "owns" our pointer. |
if( pool->_running.remove((void *)myself) != 0 ) | if( pool->_running.remove((void *)myself) != 0 ) |
|
{ |
pool->_pool.insert_first(myself); | pool->_pool.insert_first(myself); |
|
} |
else | else |
{ | { |
|
PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
|
|
} | } |
else | else |
{ | { |
|
|
} | } |
catch(...) | catch(...) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_loop: Adding thread to idle pool failed."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return((PEGASUS_THREAD_RETURN)0); | return((PEGASUS_THREAD_RETURN)0); |
} | } |
|
|
throw(IPCException) | throw(IPCException) |
{ | { |
PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken"); | PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken"); |
struct timeval start; |
|
gettimeofday(&start, NULL); |
// Allocate_and_awaken will not run if the _dying flag is set. |
Thread *th = 0; |
// Once the lock is acquired, ~ThreadPool will not change |
|
// the value of _dying until the lock is released. |
| |
try | try |
{ | { |
timed_mutex(&(this->_monitor), 1000); |
auto_mutex dying_lock(&(this->_monitor)); |
if(_dying.value()) | if(_dying.value()) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); |
return; | return; |
} | } |
th = _pool.remove_first(); |
struct timeval now; |
} |
struct timeval start; |
catch(...) |
gettimeofday(&start, NULL); |
{ |
Thread *th = 0; |
return; |
|
|
|
} |
|
| |
|
th = _pool.remove_first(); |
| |
// wait for the right interval and try again | // wait for the right interval and try again |
while (th == 0 && _dying.value() < 1) |
while (th == 0) |
{ | { |
// will throw an IPCException& | // will throw an IPCException& |
_check_deadlock(&start) ; | _check_deadlock(&start) ; |
|
|
continue; | continue; |
} | } |
pegasus_yield(); | pegasus_yield(); |
try |
|
{ |
|
timed_mutex(&(this->_monitor), 1000); |
|
if(_dying.value()) |
|
{ |
|
return; |
|
} |
|
th = _pool.remove_first(); | th = _pool.remove_first(); |
} | } |
catch(...) |
|
{ |
|
return ; |
|
} |
|
} |
|
| |
if(_dying.value() < 1) |
|
{ |
|
// initialize the thread data with the work function and parameters | // initialize the thread data with the work function and parameters |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, | Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
"Initializing thread with work function and parameters: parm = %p", | "Initializing thread with work function and parameters: parm = %p", |
|
|
th->delete_tsd("blocking sem"); | th->delete_tsd("blocking sem"); |
if(blocking != 0 ) | if(blocking != 0 ) |
th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking); | th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking); |
try |
|
{ |
|
timed_mutex(&(this->_monitor), 1000); |
|
if(_dying.value()) |
|
{ |
|
th->cancel(); |
|
th->join(); |
|
delete th; |
|
return; |
|
} |
|
| |
// put the thread on the running list | // put the thread on the running list |
|
|
|
|
_running.insert_first(th); | _running.insert_first(th); |
|
|
// signal the thread's sleep semaphore to awaken it | // signal the thread's sleep semaphore to awaken it |
Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); | Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem"); |
| |
if(sleep_sem == 0) | if(sleep_sem == 0) |
{ | { |
th->dereference_tsd(); | th->dereference_tsd(); |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::allocate_and_awaken: thread data is corrupted."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
throw NullPointer(); | throw NullPointer(); |
} | } |
|
|
} | } |
catch(...) | catch(...) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::allocate_and_awaken: Operation Failed."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return; | return; |
} | } |
|
|
} |
|
else |
|
{ |
|
th->cancel(); |
|
th->join(); |
|
delete th; |
|
} |
|
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
|
|
Uint32 ThreadPool::kill_dead_threads(void) | Uint32 ThreadPool::kill_dead_threads(void) |
throw(IPCException) | throw(IPCException) |
{ | { |
struct timeval now; |
// Since the kill_dead_threads, ThreadPool or allocate_and_awaken |
gettimeofday(&now, NULL); |
// manipulate the threads on the ThreadPool queues, they should never |
Uint32 bodies = 0; |
// be allowed to run at the same time. |
|
|
|
// kill_dead_threads will "hold" the _monitor lock until it has |
|
// completed executing. ~ThreadPool will not set the dying flag until |
|
// it can get access to the lock. |
| |
// first go thread the dead q and clean it up as much as possible |
|
try | try |
{ | { |
timed_mutex(&(this->_monitor), 1000); |
try_mutex dying_lock(&(this->_monitor)); |
if(_dying.value() ) | if(_dying.value() ) |
{ | { |
return 0; | return 0; |
} | } |
| |
while(_dead.count() > 0 && _dying.value() == 0 ) |
struct timeval now; |
|
gettimeofday(&now, NULL); |
|
Uint32 bodies = 0; |
|
|
|
// first go thread the dead q and clean it up as much as possible |
|
try |
|
{ |
|
while(_dead.count() > 0) |
{ | { |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread"); | Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread"); |
Thread *dead = _dead.remove_first(); | Thread *dead = _dead.remove_first(); |
|
|
for( ; i < 2; i++) | for( ; i < 2; i++) |
#endif | #endif |
{ | { |
try |
|
{ |
|
try_mutex(&(this->_monitor)); |
|
} |
|
catch(IPCException&) |
|
{ |
|
return bodies; |
|
} |
|
|
|
q = map[i]; | q = map[i]; |
if(q->count() > 0 ) | if(q->count() > 0 ) |
{ | { |
try | try |
{ | { |
if(_dying.value()) |
|
{ |
|
return bodies; |
|
} |
|
|
|
q->try_lock(); | q->try_lock(); |
} | } |
catch(...) | catch(...) |
|
|
q->unlock(); | q->unlock(); |
} | } |
} | } |
if(_dying.value() ) |
|
return bodies; |
|
| |
while (needed.value() > 0) { | while (needed.value() > 0) { |
_link_pool(_init_thread()); | _link_pool(_init_thread()); |
|
|
} | } |
return bodies; | return bodies; |
} | } |
|
catch (AlreadyLocked &) |
|
{ |
|
// kill_dead_threads was not able to obtain the |
|
// _monitor lock that controls access to the |
|
// _dying flag and the ThreadPool queues. |
|
// This means that one of the queue manipulating |
|
// functions (e.g., ~ThreadPool or allocate_and_awaken) |
|
// is running. Since cleanup is opportunistic, this |
|
// function just returns. |
|
} |
|
return 0; |
|
} |
| |
| |
Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval) | Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval) |
|
|
void ThreadPool::_link_pool(Thread *th) throw(IPCException) | void ThreadPool::_link_pool(Thread *th) throw(IPCException) |
{ | { |
if(th == 0) | if(th == 0) |
|
{ |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_link_pool: Thread pointer is null."); |
throw NullPointer(); | throw NullPointer(); |
|
} |
try | try |
{ | { |
|
|
timed_mutex(&(this->_monitor), 1000); |
|
if(_dying.value()) |
|
{ |
|
th->cancel(); |
|
th->join(); |
|
delete th; |
|
} |
|
|
|
_pool.insert_first(th); | _pool.insert_first(th); |
|
|
} | } |
catch(...) | catch(...) |
{ | { |
|
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"ThreadPool::_link_pool: _pool.insert_first failed."); |
} | } |
} | } |
| |