version 1.5, 2001/12/25 08:26:09
|
version 1.15, 2002/05/21 04:07:12
|
|
|
public: | public: |
static void default_delete(void *data); | static void default_delete(void *data); |
| |
thread_data( Sint8 *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
thread_data( const Sint8 *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
size_t keysize = strlen(key); | size_t keysize = strlen(key); |
|
|
| |
} | } |
| |
thread_data(Sint8 *key, size_t size) : _delete_func(default_delete), _size(size) |
thread_data(const Sint8 *key, size_t size) : _delete_func(default_delete), _size(size) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
size_t keysize = strlen(key); | size_t keysize = strlen(key); |
|
|
| |
} | } |
| |
thread_data(Sint8 *key, size_t size, void *data) : _delete_func(default_delete), _size(size) |
thread_data(const Sint8 *key, size_t size, void *data) : _delete_func(default_delete), _size(size) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
PEGASUS_ASSERT(data != NULL); | PEGASUS_ASSERT(data != NULL); |
|
|
{ | { |
if( _data != NULL) | if( _data != NULL) |
if(_delete_func != NULL) | if(_delete_func != NULL) |
|
{ |
_delete_func( _data ); | _delete_func( _data ); |
|
} |
if( _key != NULL ) | if( _key != NULL ) |
delete [] _key; | delete [] _key; |
} | } |
|
|
return(true); | return(true); |
return(false); | return(false); |
} | } |
|
|
inline Boolean operator==(const thread_data& b) const | inline Boolean operator==(const thread_data& b) const |
{ | { |
return(operator==((const void *)b._key)); | return(operator==((const void *)b._key)); |
|
|
| |
void thread_switch(void); | void thread_switch(void); |
| |
#ifdef PEGASUS_PLATFORM_LINUX_IX86_GNU |
#if defined(PEGASUS_PLATFORM_LINUX_IX86_GNU) || defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU) |
// suspend this thread | // suspend this thread |
void suspend(void) ; | void suspend(void) ; |
| |
|
|
void cleanup_pop(Boolean execute = true) throw(IPCException); | void cleanup_pop(Boolean execute = true) throw(IPCException); |
| |
// create and initialize a tsd | // create and initialize a tsd |
inline void create_tsd(Sint8 *key, int size, void *buffer) throw(IPCException) |
inline void create_tsd(const Sint8 *key, int size, void *buffer) throw(IPCException) |
{ | { |
thread_data *tsd = new thread_data(key, size, buffer); | thread_data *tsd = new thread_data(key, size, buffer); |
try { _tsd.insert_first(tsd); } | try { _tsd.insert_first(tsd); } |
|
|
| |
// get the buffer associated with the key | // get the buffer associated with the key |
// NOTE: this call leaves the tsd LOCKED !!!! | // NOTE: this call leaves the tsd LOCKED !!!! |
inline void *reference_tsd(Sint8 *key) throw(IPCException) |
inline void *reference_tsd(const Sint8 *key) throw(IPCException) |
{ | { |
_tsd.lock(); | _tsd.lock(); |
thread_data *tsd = _tsd.reference((void *)key); |
thread_data *tsd = _tsd.reference((const void *)key); |
if(tsd != NULL) | if(tsd != NULL) |
return( (void *)(tsd->_data) ); | return( (void *)(tsd->_data) ); |
else | else |
return(NULL); | return(NULL); |
} | } |
| |
inline void *try_reference_tsd(Sint8 *key) throw(IPCException) |
inline void *try_reference_tsd(const Sint8 *key) throw(IPCException) |
{ | { |
_tsd.try_lock(); | _tsd.try_lock(); |
thread_data *tsd = _tsd.reference((void *)key); |
thread_data *tsd = _tsd.reference((const void *)key); |
if(tsd != NULL) | if(tsd != NULL) |
return((void *)(tsd->_data) ); | return((void *)(tsd->_data) ); |
else | else |
|
|
} | } |
| |
// delete the tsd associated with the key | // delete the tsd associated with the key |
inline void delete_tsd(Sint8 *key) throw(IPCException) |
inline void delete_tsd(const Sint8 *key) throw(IPCException) |
{ | { |
thread_data *tsd = _tsd.remove((void *)key); |
thread_data *tsd = _tsd.remove((const void *)key); |
if(tsd != NULL) | if(tsd != NULL) |
delete tsd; | delete tsd; |
} | } |
| |
inline void *remove_tsd(Sint8 *key) throw(IPCException) |
// Note: Caller must delete the thread_data object returned (if not null) |
|
inline void *remove_tsd(const Sint8 *key) throw(IPCException) |
{ | { |
return(_tsd.remove((void *)key)); |
return(_tsd.remove((const void *)key)); |
} | } |
| |
inline void empty_tsd(void) throw(IPCException) | inline void empty_tsd(void) throw(IPCException) |
{ | { |
_tsd.empty_list(); |
thread_data* tsd; |
|
while (tsd = _tsd.remove_first()) |
|
{ |
|
delete tsd; |
|
} |
|
//_tsd.empty_list(); |
} | } |
| |
// create or re-initialize tsd associated with the key | // create or re-initialize tsd associated with the key |
// if the tsd already exists, return the existing buffer |
// if the tsd already exists, delete the existing buffer |
thread_data *put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) |
void put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) |
throw(IPCException) | throw(IPCException) |
| |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
thread_data *tsd ; | thread_data *tsd ; |
tsd = _tsd.remove((void *)key); // may throw an IPC exception |
tsd = _tsd.remove((const void *)key); // may throw an IPC exception |
|
delete tsd; |
thread_data *ntsd = new thread_data(key); | thread_data *ntsd = new thread_data(key); |
ntsd->put_data(delete_func, size, value); | ntsd->put_data(delete_func, size, value); |
try { _tsd.insert_first(ntsd); } | try { _tsd.insert_first(ntsd); } |
catch(IPCException& e) { e = e; delete ntsd; throw; } | catch(IPCException& e) { e = e; delete ntsd; throw; } |
return(tsd); |
|
} | } |
inline PEGASUS_THREAD_RETURN get_exit(void) { return _exit_code; } | inline PEGASUS_THREAD_RETURN get_exit(void) { return _exit_code; } |
inline PEGASUS_THREAD_TYPE self(void) {return pegasus_thread_self(); } | inline PEGASUS_THREAD_TYPE self(void) {return pegasus_thread_self(); } |
|
|
| |
private: | private: |
Thread(); | Thread(); |
inline void create_tsd(Sint8 *key ) throw(IPCException) |
inline void create_tsd(const Sint8 *key ) throw(IPCException) |
{ | { |
thread_data *tsd = new thread_data(key); | thread_data *tsd = new thread_data(key); |
try { _tsd.insert_first(tsd); } | try { _tsd.insert_first(tsd); } |
|
|
public: | public: |
| |
ThreadPool(Sint16 initial_size, | ThreadPool(Sint16 initial_size, |
Sint8 *key, |
const Sint8 *key, |
Sint16 min, | Sint16 min, |
Sint16 max, | Sint16 max, |
struct timeval & alloc_wait, | struct timeval & alloc_wait, |
|
|
struct timeval & deadlock_detect); | struct timeval & deadlock_detect); |
| |
~ThreadPool(void); | ~ThreadPool(void); |
|
|
void allocate_and_awaken(void *parm, | void allocate_and_awaken(void *parm, |
PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *work)(void *)) |
PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *work)(void *), |
|
Semaphore *blocking = 0) |
throw(IPCException); | throw(IPCException); |
| |
void kill_dead_threads( void ) |
|
|
Uint32 kill_dead_threads( void ) |
throw(IPCException); | throw(IPCException); |
| |
void get_key(Sint8 *buf, int bufsize); | void get_key(Sint8 *buf, int bufsize); |
|
|
struct timeval _deallocate_wait; | struct timeval _deallocate_wait; |
struct timeval _deadlock_detect; | struct timeval _deadlock_detect; |
static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _loop(void *); | static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _loop(void *); |
Semaphore _waiters; |
|
Sint8 _key[17]; | Sint8 _key[17]; |
Semaphore _pool_sem; |
|
DQueue<Thread> _pool; | DQueue<Thread> _pool; |
DQueue<Thread> _running; | DQueue<Thread> _running; |
DQueue<Thread> _dead; | DQueue<Thread> _dead; |
|
|
| |
inline Thread *ThreadPool::_init_thread(void) throw(IPCException) | inline Thread *ThreadPool::_init_thread(void) throw(IPCException) |
{ | { |
Thread *th = (Thread *) new Thread(&_loop, this, false); |
Thread *th = (Thread *) new Thread(_loop, this, false); |
// allocate a sleep semaphore and pass it in the thread context | // allocate a sleep semaphore and pass it in the thread context |
// initial count is zero, loop function will sleep until | // initial count is zero, loop function will sleep until |
// we signal the semaphore | // we signal the semaphore |
Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); | Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); |
th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem); | th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem); |
|
|
struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval)); | struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval)); |
th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt); | th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt); |
// thread will enter _loop(void *) and sleep on sleep_sem until we signal it | // thread will enter _loop(void *) and sleep on sleep_sem until we signal it |
th->run(); | th->run(); |
_current_threads++; | _current_threads++; |
|
pegasus_yield(); |
|
|
return th; | return th; |
} | } |
| |
|
|
if(th == 0) | if(th == 0) |
throw NullPointer(); | throw NullPointer(); |
_pool.insert_first(th); | _pool.insert_first(th); |
_pool_sem.signal(); |
|
} | } |
| |
| |
#if defined(PEGASUS_OS_TYPE_WINDOWS) | #if defined(PEGASUS_OS_TYPE_WINDOWS) |
# include "ThreadWindows_inline.h" | # include "ThreadWindows_inline.h" |
|
#elif defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
|
# include "ThreadzOS_inline.h" |
#elif defined(PEGASUS_OS_TYPE_UNIX) | #elif defined(PEGASUS_OS_TYPE_UNIX) |
# include "ThreadUnix_inline.h" | # include "ThreadUnix_inline.h" |
#endif | #endif |