version 1.57, 2003/10/23 05:59:53
|
version 1.58, 2003/10/23 19:30:23
|
|
|
Mutex* _mut; | Mutex* _mut; |
}; | }; |
| |
|
class auto_int |
|
{ |
|
public: |
|
auto_int(AtomicInt* num) |
|
: _int(num) |
|
{ |
|
_int->operator++(); |
|
} |
|
~auto_int(void) |
|
{ |
|
_int->operator--(); |
|
} |
|
AtomicInt *_int; |
|
}; |
| |
DQueue<ThreadPool> ThreadPool::_pools(true); |
|
| |
|
AtomicInt _idle_control; |
|
|
|
DQueue<ThreadPool> ThreadPool::_pools(true); |
| |
void ThreadPool::kill_idle_threads(void) | void ThreadPool::kill_idle_threads(void) |
{ | { |
|
|
try | try |
{ | { |
// Set the dying flag so all thread know the destructor has been entered | // Set the dying flag so all thread know the destructor has been entered |
{ |
|
// ThreadPool::~ThreadPool will wait to set the _dying flag until |
|
// it can acquire the lock. Once this lock is acquired, the thread |
|
// acquiring the lock must be able to assume that the value of |
|
// _dying will not change until the lock is released. |
|
|
|
// Once the _dying flag is set, functions (e.g., kill_dead_threads, |
|
// and allocate_and_awaken) that manipulate the ThreadPool |
|
// queues (_pool, _dead, and _running), should not be allowed |
|
// to run. |
|
|
|
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
|
"Attempting to aquire _monitor lock"); |
|
auto_mutex dying_lock(&(this->_monitor)); |
|
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, |
|
"Acquired _monitor lock"); |
|
_dying++; | _dying++; |
} |
|
// remove from the global pools list | // remove from the global pools list |
_pools.remove(this); | _pools.remove(this); |
| |
|
|
th = _pool.remove_first(); | th = _pool.remove_first(); |
} | } |
| |
|
while(_idle_control.value()) |
|
pegasus_yield(); |
|
|
th = _dead.remove_first(); | th = _dead.remove_first(); |
while(th != 0) | while(th != 0) |
{ | { |
|
|
| |
while(1) | while(1) |
{ | { |
|
if(pool->_dying.value()) |
|
break; |
|
|
try | try |
{ | { |
sleep_sem->wait(); | sleep_sem->wait(); |
|
|
| |
try | try |
{ | { |
auto_mutex dying_lock(&(this->_monitor)); |
|
if (_dying.value()) | if (_dying.value()) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, | Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
|
// manipulate the threads on the ThreadPool queues, they should never | // manipulate the threads on the ThreadPool queues, they should never |
// be allowed to run at the same time. | // be allowed to run at the same time. |
| |
// kill_dead_threads will "hold" the _monitor lock until it has |
// << Thu Oct 23 14:41:02 2003 mdd >> |
// completed executing. ~ThreadPool will not set the dying flag until |
// not true, the queues are thread safe. they are syncrhonized. |
// it can get access to the lock. |
|
|
auto_int do_not_destruct(&_idle_control); |
| |
try | try |
{ | { |
try_mutex dying_lock(&(this->_monitor)); |
|
if (_dying.value()) | if (_dying.value()) |
{ | { |
return 0; | return 0; |
|
|
// first go thread the dead q and clean it up as much as possible | // first go thread the dead q and clean it up as much as possible |
try | try |
{ | { |
while(_dead.count() > 0) |
while(_dying.value() == 0 && _dead.count() > 0) |
{ | { |
Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread"); | Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread"); |
Thread *dead = _dead.remove_first(); | Thread *dead = _dead.remove_first(); |
| |
if(dead == 0) |
if(dead ) |
throw NullPointer(); |
{ |
dead->join(); | dead->join(); |
delete dead; | delete dead; |
} | } |
} | } |
|
} |
catch(...) | catch(...) |
{ | { |
} | } |
| |
|
if (_dying.value()) |
|
{ |
|
return 0; |
|
} |
| |
DQueue<Thread> * map[2] = | DQueue<Thread> * map[2] = |
{ | { |
|
|
DQueue<Thread> *q = 0; | DQueue<Thread> *q = 0; |
int i = 0; | int i = 0; |
AtomicInt needed(0); | AtomicInt needed(0); |
|
Thread *th = 0; |
|
internal_dq idq; |
| |
#ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS | #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS |
// This change prevents the thread pool from killing "hung" threads. | // This change prevents the thread pool from killing "hung" threads. |
|
|
| |
struct timeval dt = { 0, 0 }; | struct timeval dt = { 0, 0 }; |
struct timeval *dtp; | struct timeval *dtp; |
Thread *th = 0; |
|
th = q->next(th); | th = q->next(th); |
while (th != 0 ) | while (th != 0 ) |
{ | { |
|
|
} | } |
| |
th = q->remove_no_lock((void *)th); | th = q->remove_no_lock((void *)th); |
|
idq.insert_first((void*)th); |
|
} |
|
th = q->next(th); |
|
} |
|
q->unlock(); |
|
} |
| |
if(th != 0) |
th = (Thread*)idq.remove_last(); |
|
while(th != 0) |
{ | { |
if( i == 0 ) | if( i == 0 ) |
{ | { |
|
|
th->cancel(); | th->cancel(); |
delete th; | delete th; |
} | } |
} |
th = (Thread*)idq.remove_last(); |
} |
|
th = q->next(th); |
|
pegasus_yield(); |
|
} |
|
q->unlock(); |
|
} | } |
} | } |
| |
|
|
} | } |
return bodies; | return bodies; |
} | } |
catch (AlreadyLocked &) |
catch (...) |
{ | { |
// kill_dead_threads was not able to obtain the |
|
// _monitor lock that controls access to the |
|
// _dying flag and the ThreadPool queues. |
|
// This means that one of the queue manipulating |
|
// functions (e.g., ~ThreadPool or allocate_and_awaken) |
|
// is running. Since cleanup is opportunistic, this |
|
// function just returns. |
|
} | } |
return 0; | return 0; |
} | } |