version 1.5, 2001/12/25 08:26:09
|
version 1.53, 2005/06/24 19:34:23
|
|
|
//%///////////-*-c++-*-////////////////////////////////////////////////////// |
//%2005//////////////////////////////////////////////////////////////////////// |
// | // |
// Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM |
// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development |
|
// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. |
|
// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.; |
|
// IBM Corp.; EMC Corporation, The Open Group. |
|
// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; |
|
// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. |
|
// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
|
// EMC Corporation; VERITAS Software Corporation; The Open Group. |
// | // |
// Permission is hereby granted, free of charge, to any person obtaining a copy | // Permission is hereby granted, free of charge, to any person obtaining a copy |
// of this software and associated documentation files (the "Software"), to | // of this software and associated documentation files (the "Software"), to |
|
|
// Author: Mike Day (mdday@us.ibm.com) | // Author: Mike Day (mdday@us.ibm.com) |
// | // |
// Modified By: Markus Mueller | // Modified By: Markus Mueller |
|
// Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) |
|
// Amit K Arora, IBM (amita@in.ibm.com) for PEP#101 |
|
// David Dillard, VERITAS Software Corp. |
|
// (david.dillard@veritas.com) |
|
// Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com) |
|
// Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#2393 |
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
|
#ifndef Pegasus_Thread_h | #ifndef Pegasus_Thread_h |
#define Pegasus_Thread_h | #define Pegasus_Thread_h |
|
|
|
#include <cstring> |
#include <Pegasus/Common/Config.h> | #include <Pegasus/Common/Config.h> |
#include <Pegasus/Common/IPC.h> | #include <Pegasus/Common/IPC.h> |
#include <Pegasus/Common/Exception.h> |
#include <Pegasus/Common/InternalException.h> |
#include <Pegasus/Common/DQueue.h> | #include <Pegasus/Common/DQueue.h> |
|
#include <Pegasus/Common/AcceptLanguages.h> // l10n |
// REVIEW: Spend time getting to know this. |
#include <Pegasus/Common/Linkage.h> |
|
#include <Pegasus/Common/AutoPtr.h> |
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
return(operator==((const void *)b._routine)); | return(operator==((const void *)b._routine)); |
} | } |
private: | private: |
void execute(void) { _routine(_arg); } |
void execute() { _routine(_arg); } |
cleanup_handler(); | cleanup_handler(); |
void (*_routine)(void *); | void (*_routine)(void *); |
| |
|
|
public: | public: |
static void default_delete(void *data); | static void default_delete(void *data); |
| |
thread_data( Sint8 *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
thread_data( const char *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
size_t keysize = strlen(key); | size_t keysize = strlen(key); |
_key = new Sint8 [keysize + 1]; |
_key.reset(new char[keysize + 1]); |
memcpy(_key, key, keysize); |
memcpy(_key.get(), key, keysize); |
_key[keysize] = 0x00; |
_key.get()[keysize] = 0x00; |
| |
} | } |
| |
thread_data(Sint8 *key, size_t size) : _delete_func(default_delete), _size(size) |
thread_data(const char *key, size_t size) : _delete_func(default_delete), _size(size) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
size_t keysize = strlen(key); | size_t keysize = strlen(key); |
_key = new Sint8 [keysize + 1]; |
_key.reset(new char[keysize + 1]); |
memcpy(_key, key, keysize); |
memcpy(_key.get(), key, keysize); |
_key[keysize] = 0x00; |
_key.get()[keysize] = 0x00; |
_data = ::operator new(_size) ; | _data = ::operator new(_size) ; |
| |
} | } |
| |
thread_data(Sint8 *key, size_t size, void *data) : _delete_func(default_delete), _size(size) |
thread_data(const char *key, size_t size, void *data) : _delete_func(default_delete), _size(size) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
PEGASUS_ASSERT(data != NULL); | PEGASUS_ASSERT(data != NULL); |
size_t keysize = strlen(key); | size_t keysize = strlen(key); |
| |
_key = new Sint8[keysize + 1]; |
_key.reset(new char[keysize + 1]); |
memcpy(_key, key, keysize); |
memcpy(_key.get(), key, keysize); |
_key[keysize] = 0x00; |
_key.get()[keysize] = 0x00; |
_data = ::operator new(_size); | _data = ::operator new(_size); |
memcpy(_data, data, size); | memcpy(_data, data, size); |
} | } |
|
|
{ | { |
if( _data != NULL) | if( _data != NULL) |
if(_delete_func != NULL) | if(_delete_func != NULL) |
|
{ |
_delete_func( _data ); | _delete_func( _data ); |
if( _key != NULL ) |
} |
delete [] _key; |
|
} | } |
| |
void put_data(void (*del)(void *), size_t size, void *data ) throw(NullPointer) |
/** |
|
* This function is used to put data in thread space. |
|
* |
|
* Be aware that there is NOTHING in place to stop |
|
* other users of the thread to remove this data. |
|
* Or change the data. |
|
* |
|
* You, the developer has to make sure that there are |
|
* no situations in which this can arise (ie, have a |
|
* lock for the function which manipulates the TSD. |
|
* |
|
* @exception NullPointer |
|
*/ |
|
void put_data(void (*del)(void *), size_t size, void *data ) |
{ | { |
if(_data != NULL) | if(_data != NULL) |
if(_delete_func != NULL) | if(_delete_func != NULL) |
|
|
return ; | return ; |
} | } |
| |
size_t get_size(void) { return _size; } |
size_t get_size() { return _size; } |
| |
|
/** |
|
* This function is used to retrieve data from the |
|
* TSD, the thread specific data. |
|
* |
|
* Be aware that there is NOTHING in place to stop |
|
* other users of the thread to change the data you |
|
* get from this function. |
|
* |
|
* You, the developer has to make sure that there are |
|
* no situations in which this can arise (ie, have a |
|
* lock for the function which manipulates the TSD. |
|
*/ |
void get_data(void **data, size_t *size) | void get_data(void **data, size_t *size) |
{ | { |
if(data == NULL || size == NULL) | if(data == NULL || size == NULL) |
|
|
| |
} | } |
| |
void copy_data(void **buf, size_t *size) throw(BufferTooSmall, NullPointer) |
// @exception NullPointer |
|
void copy_data(void **buf, size_t *size) |
{ | { |
if((buf == NULL) || (size == NULL)) | if((buf == NULL) || (size == NULL)) |
throw NullPointer() ; | throw NullPointer() ; |
|
|
| |
inline Boolean operator==(const void *key) const | inline Boolean operator==(const void *key) const |
{ | { |
if ( ! strcmp(_key, (Sint8 *)key)) |
if ( ! strcmp(_key.get(), reinterpret_cast<const char *>(key))) |
return(true); | return(true); |
return(false); | return(false); |
} | } |
|
|
inline Boolean operator==(const thread_data& b) const | inline Boolean operator==(const thread_data& b) const |
{ | { |
return(operator==((const void *)b._key)); |
return(operator==(b._key.get())); |
} | } |
| |
private: | private: |
|
|
thread_data(); | thread_data(); |
void *_data; | void *_data; |
size_t _size; | size_t _size; |
Sint8 *_key; |
AutoArrayPtr<char> _key; |
| |
friend class DQueue<thread_data>; | friend class DQueue<thread_data>; |
friend class Thread; | friend class Thread; |
}; | }; |
| |
| |
/////////////////////////////////////////////////////////////////////////// |
enum ThreadStatus { |
|
PEGASUS_THREAD_OK = 1, /* No problems */ |
|
PEGASUS_THREAD_INSUFFICIENT_RESOURCES, /* Can't allocate a thread. Not enough |
|
memory. Try again later */ |
|
PEGASUS_THREAD_SETUP_FAILURE, /* Could not allocate into the thread specific |
|
data storage. */ |
|
PEGASUS_THREAD_UNAVAILABLE /* Service is being destroyed and no new threads can |
|
be provided. */ |
|
}; |
| |
class PEGASUS_COMMON_LINKAGE ThreadPool; |
/////////////////////////////////////////////////////////////////////////// |
| |
class PEGASUS_COMMON_LINKAGE Thread | class PEGASUS_COMMON_LINKAGE Thread |
{ | { |
|
|
public: | public: |
|
|
Thread( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *start )(void *), | Thread( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *start )(void *), |
void *parameter, Boolean detached ); | void *parameter, Boolean detached ); |
| |
~Thread(); | ~Thread(); |
| |
void run(void); |
/** |
|
Start the thread. |
|
@return PEGASUS_THREAD_OK if the thread is started successfully, |
|
PEGASUS_THREAD_INSUFFICIENT_RESOURCES if the resources necessary |
|
to start the thread are not currently available. |
|
PEGASUS_THREAD_SETUP_FAILURE if the thread could not |
|
be create properly - check the 'errno' value for specific operating |
|
system return code. |
|
*/ |
|
ThreadStatus run(); |
| |
// get the user parameter | // get the user parameter |
inline void *get_parm(void) { return _thread_parm; } |
inline void *get_parm() { return _thread_parm; } |
|
|
// send the thread a signal -- may not be appropriate due to Windows |
|
// void kill(int signum); |
|
| |
// cancellation must be deferred (not asynchronous) | // cancellation must be deferred (not asynchronous) |
// for user-level threads the thread itself can decide | // for user-level threads the thread itself can decide |
// when it should die. | // when it should die. |
void cancel(void); |
void cancel(); |
| |
// cancel if there is a pending cancellation request | // cancel if there is a pending cancellation request |
void test_cancel(void); |
void test_cancel(); |
| |
Boolean is_cancelled(void); |
Boolean is_cancelled(); |
| |
// for user-level threads - put the calling thread | // for user-level threads - put the calling thread |
// to sleep and jump to the thread scheduler. | // to sleep and jump to the thread scheduler. |
|
|
// or gnu portable threads will have an existing | // or gnu portable threads will have an existing |
// routine that can be mapped to this method | // routine that can be mapped to this method |
| |
void thread_switch(void); |
void thread_switch(); |
| |
#ifdef PEGASUS_PLATFORM_LINUX_IX86_GNU |
#if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU) |
// suspend this thread | // suspend this thread |
void suspend(void) ; |
void suspend(); |
| |
// resume this thread | // resume this thread |
void resume(void) ; |
void resume(); |
#endif | #endif |
| |
static void sleep(Uint32 msec) ; | static void sleep(Uint32 msec) ; |
| |
// block the calling thread until this thread terminates | // block the calling thread until this thread terminates |
void join( void ); |
void join(); |
void thread_init(void); |
void thread_init(); |
| |
// thread routine needs to call this function when | // thread routine needs to call this function when |
// it is ready to exit | // it is ready to exit |
|
|
| |
// stack of functions to be called when thread terminates | // stack of functions to be called when thread terminates |
// will be called last in first out (LIFO) | // will be called last in first out (LIFO) |
void cleanup_push( void (*routine) (void *), void *parm ) throw(IPCException); |
// @exception IPCException |
void cleanup_pop(Boolean execute = true) throw(IPCException); |
void cleanup_push(void (*routine) (void *), void *parm); |
|
|
|
// @exception IPCException |
|
void cleanup_pop(Boolean execute = true); |
| |
// create and initialize a tsd | // create and initialize a tsd |
inline void create_tsd(Sint8 *key, int size, void *buffer) throw(IPCException) |
// @exception IPCException |
|
inline void create_tsd(const char *key, int size, void *buffer) |
{ | { |
thread_data *tsd = new thread_data(key, size, buffer); |
AutoPtr<thread_data> tsd(new thread_data(key, size, buffer)); |
try { _tsd.insert_first(tsd); } |
_tsd.insert_first(tsd.get()); |
catch(IPCException& e) { e = e; delete tsd; throw; } |
tsd.release(); |
} | } |
| |
// get the buffer associated with the key | // get the buffer associated with the key |
// NOTE: this call leaves the tsd LOCKED !!!! | // NOTE: this call leaves the tsd LOCKED !!!! |
inline void *reference_tsd(Sint8 *key) throw(IPCException) |
// @exception IPCException |
|
inline void *reference_tsd(const char *key) |
{ | { |
_tsd.lock(); | _tsd.lock(); |
thread_data *tsd = _tsd.reference((void *)key); |
thread_data *tsd = _tsd.reference(key); |
if(tsd != NULL) | if(tsd != NULL) |
return( (void *)(tsd->_data) ); | return( (void *)(tsd->_data) ); |
else | else |
return(NULL); | return(NULL); |
} | } |
| |
inline void *try_reference_tsd(Sint8 *key) throw(IPCException) |
// @exception IPCException |
|
inline void *try_reference_tsd(const char *key) |
{ | { |
_tsd.try_lock(); | _tsd.try_lock(); |
thread_data *tsd = _tsd.reference((void *)key); |
thread_data *tsd = _tsd.reference(key); |
if(tsd != NULL) | if(tsd != NULL) |
return((void *)(tsd->_data) ); | return((void *)(tsd->_data) ); |
else | else |
|
|
| |
// release the lock held on the tsd | // release the lock held on the tsd |
// NOTE: assumes a corresponding and prior call to reference_tsd() !!! | // NOTE: assumes a corresponding and prior call to reference_tsd() !!! |
inline void dereference_tsd(void) throw(IPCException) |
// @exception IPCException |
|
inline void dereference_tsd() |
{ | { |
_tsd.unlock(); | _tsd.unlock(); |
} | } |
| |
// delete the tsd associated with the key | // delete the tsd associated with the key |
inline void delete_tsd(Sint8 *key) throw(IPCException) |
// @exception IPCException |
|
inline void delete_tsd(const char *key) |
{ | { |
thread_data *tsd = _tsd.remove((void *)key); |
AutoPtr<thread_data> tsd(_tsd.remove(key)); |
if(tsd != NULL) |
|
delete tsd; |
|
} | } |
| |
inline void *remove_tsd(Sint8 *key) throw(IPCException) |
// Note: Caller must delete the thread_data object returned (if not null) |
|
// @exception IPCException |
|
inline void *remove_tsd(const char *key) |
{ | { |
return(_tsd.remove((void *)key)); |
return(_tsd.remove((const void *)key)); |
} | } |
| |
inline void empty_tsd(void) throw(IPCException) |
// @exception IPCException |
|
inline void empty_tsd() |
|
{ |
|
try |
{ | { |
_tsd.empty_list(); |
_tsd.try_lock(); |
|
} |
|
catch(IPCException&) |
|
{ |
|
return; |
} | } |
| |
// create or re-initialize tsd associated with the key |
AutoPtr<thread_data> tsd(_tsd.next(0)); |
// if the tsd already exists, return the existing buffer |
while(tsd.get()) |
thread_data *put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) |
{ |
throw(IPCException) |
_tsd.remove_no_lock(tsd.get()); |
|
tsd.reset(_tsd.next(0)); |
|
} |
|
_tsd.unlock(); |
|
} |
| |
|
// create or re-initialize tsd associated with the key |
|
// if the tsd already exists, delete the existing buffer |
|
// @exception IPCException |
|
void put_tsd(const char *key, void (*delete_func)(void *), Uint32 size, void *value) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
thread_data *tsd ; |
AutoPtr<thread_data> tsd; |
tsd = _tsd.remove((void *)key); // may throw an IPC exception |
tsd.reset(_tsd.remove((const void *)key)); // may throw an IPC exception |
thread_data *ntsd = new thread_data(key); |
tsd.reset(); |
|
AutoPtr<thread_data> ntsd(new thread_data(key)); |
ntsd->put_data(delete_func, size, value); | ntsd->put_data(delete_func, size, value); |
try { _tsd.insert_first(ntsd); } |
try { _tsd.insert_first(ntsd.get()); } |
catch(IPCException& e) { e = e; delete ntsd; throw; } |
catch(IPCException& e) { e = e; throw; } |
return(tsd); |
ntsd.release(); |
} | } |
inline PEGASUS_THREAD_RETURN get_exit(void) { return _exit_code; } |
inline PEGASUS_THREAD_RETURN get_exit() { return _exit_code; } |
inline PEGASUS_THREAD_TYPE self(void) {return pegasus_thread_self(); } |
inline PEGASUS_THREAD_TYPE self() {return pegasus_thread_self(); } |
| |
PEGASUS_THREAD_HANDLE getThreadHandle() {return _handle;} | PEGASUS_THREAD_HANDLE getThreadHandle() {return _handle;} |
| |
|
|
return(operator==((const void *)&b )); | return(operator==((const void *)&b )); |
} | } |
| |
void detach(void); |
void detach(); |
|
|
|
// |
|
// Gets the Thread object associated with the caller's thread. |
|
// Note: this may return NULL if no Thread object is associated |
|
// with the caller's thread. |
|
// |
|
static Thread * getCurrent(); // l10n |
|
|
|
// |
|
// Sets the Thread object associated with the caller's thread. |
|
// Note: the Thread object must be placed on the heap. |
|
// |
|
static void setCurrent(Thread * thrd); // l10n |
|
|
|
// |
|
// Gets the AcceptLanguages object associated with the caller's |
|
// Thread. |
|
// Note: this may return NULL if no Thread object, or no |
|
// AcceptLanguages object, is associated with the caller's thread. |
|
// |
|
static AcceptLanguages * getLanguages(); //l10n |
|
|
|
// |
|
// Sets the AcceptLanguages object associated with the caller's |
|
// Thread. |
|
// Note: a Thread object must have been previously associated with |
|
// the caller's thread. |
|
// Note: the AcceptLanguages object must be placed on the heap. |
|
// |
|
static void setLanguages(AcceptLanguages *langs); //l10n |
|
|
|
// |
|
// Removes the AcceptLanguages object associated with the caller's |
|
// Thread. |
|
// |
|
static void clearLanguages(); //l10n |
| |
private: | private: |
Thread(); | Thread(); |
inline void create_tsd(Sint8 *key ) throw(IPCException) |
|
|
static Sint8 initializeKey(); // l10n |
|
|
|
// @exception IPCException |
|
inline void create_tsd(const char *key ) |
{ | { |
thread_data *tsd = new thread_data(key); |
AutoPtr<thread_data> tsd(new thread_data(key)); |
try { _tsd.insert_first(tsd); } |
_tsd.insert_first(tsd.get()); |
catch(IPCException& e) { e = e; delete tsd; throw; } |
tsd.release(); |
} | } |
PEGASUS_THREAD_HANDLE _handle; | PEGASUS_THREAD_HANDLE _handle; |
Boolean _is_detached; | Boolean _is_detached; |
|
|
void *_thread_parm; | void *_thread_parm; |
PEGASUS_THREAD_RETURN _exit_code; | PEGASUS_THREAD_RETURN _exit_code; |
static Boolean _signals_blocked; | static Boolean _signals_blocked; |
friend class ThreadPool; |
static PEGASUS_THREAD_KEY_TYPE _platform_thread_key; //l10n |
|
static Boolean _key_initialized; // l10n |
|
static Boolean _key_error; // l10n |
} ; | } ; |
| |
| |
|
|
{ | { |
public: | public: |
| |
ThreadPool(Sint16 initial_size, |
/** |
Sint8 *key, |
Constructs a new ThreadPool object. |
Sint16 min, |
@param initialSize The number of threads that are initially added to |
Sint16 max, |
the thread pool. |
struct timeval & alloc_wait, |
@param key A name for this thread pool that can be used to determine |
struct timeval & dealloc_wait, |
equality of two thread pool objects. Only the first 16 characters |
struct timeval & deadlock_detect); |
of this value are used. |
|
@param minThreads The minimum number of threads that should be |
~ThreadPool(void); |
contained in this thread pool at any given time. |
void allocate_and_awaken(void *parm, |
@param maxThreads The maximum number of threads that should be |
PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *work)(void *)) |
contained in this thread pool at any given time. |
throw(IPCException); |
@param deallocateWait The minimum time that a thread should be idle |
|
before it is removed from the pool and cleaned up. |
void kill_dead_threads( void ) |
*/ |
throw(IPCException); |
ThreadPool( |
|
Sint16 initialSize, |
|
const char* key, |
|
Sint16 minThreads, |
|
Sint16 maxThreads, |
|
struct timeval& deallocateWait); |
|
|
|
/** |
|
Destructs the ThreadPool object. |
|
*/ |
|
~ThreadPool(); |
|
|
|
/** |
|
Allocate and start a thread to do a unit of work. |
|
@param parm A generic parameter to pass to the thread |
|
@param work A pointer to the function that is to be executed by |
|
the thread |
|
@param blocking A pointer to an optional semaphore which, if |
|
specified, is signaled after the thread finishes |
|
executing the work function |
|
@return PEGASUS_THREAD_OK if the thread is started successfully, |
|
PEGASUS_THREAD_INSUFFICIENT_RESOURCES if the |
|
resources necessary to start the thread are not currently |
|
available. PEGASUS_THREAD_SETUP_FAILURE if the thread |
|
could not be setup properly. PEGASUS_THREAD_UNAVAILABLE |
|
if this service is shutting down and no more threads can |
|
be allocated. |
|
@exception IPCException |
|
*/ |
|
ThreadStatus allocate_and_awaken( |
|
void* parm, |
|
PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *), |
|
Semaphore* blocking = 0); |
|
|
|
/** |
|
Cleans up idle threads if they have been running longer than the |
|
deallocate_wait configuration and more than the configured |
|
minimum number of threads is running. |
|
@return The number of threads that were cleaned up. |
|
@exception IPCException |
|
*/ |
|
Uint32 cleanupIdleThreads(); |
| |
void get_key(Sint8 *buf, int bufsize); | void get_key(Sint8 *buf, int bufsize); |
| |
inline Boolean operator==(const void *key) const |
inline Boolean operator==(const char* key) const |
{ |
|
if ( ! strncmp( reinterpret_cast<Sint8 *>(const_cast<void *>(key)), _key, 16 )) |
|
return(true); |
|
return(false); |
|
} |
|
inline Boolean operator==(const ThreadPool & b) const |
|
{ |
|
return(operator==((const void *) b._key )); |
|
} |
|
|
|
inline void set_min_threads(Sint16 min) |
|
{ | { |
_min_threads = min; |
return (!strncmp(key, _key, 16)); |
} | } |
| |
inline Sint16 get_min_threads(void) const |
Boolean operator==(const void* p) const |
{ | { |
return _min_threads; |
return ((void *)this == p); |
} | } |
| |
inline void set_max_threads(Sint16 max) |
Boolean operator==(const ThreadPool & p) const |
{ | { |
_max_threads = max; |
return operator==((const void *)&p); |
} | } |
| |
inline Sint16 get_max_threads(void) const |
inline void setMinThreads(Sint16 min) |
{ | { |
return _max_threads; |
_minThreads = min; |
} | } |
| |
inline void set_allocate_wait(const struct timeval & alloc_wait) |
inline Sint16 getMinThreads() const |
{ | { |
_allocate_wait.tv_sec = alloc_wait.tv_sec; |
return _minThreads; |
_allocate_wait.tv_usec = alloc_wait.tv_usec; |
|
} | } |
| |
inline struct timeval *get_allocate_wait(struct timeval *buffer) const |
inline void setMaxThreads(Sint16 max) |
{ | { |
if(buffer == 0) |
_maxThreads = max; |
throw NullPointer(); |
|
buffer->tv_sec = _allocate_wait.tv_sec; |
|
buffer->tv_usec = _allocate_wait.tv_usec; |
|
return buffer; |
|
} |
|
|
|
inline void set_deallocate_wait(const struct timeval & dealloc_wait) |
|
{ |
|
_deallocate_wait.tv_sec = dealloc_wait.tv_sec; |
|
_deallocate_wait.tv_usec = dealloc_wait.tv_usec; |
|
} | } |
| |
inline struct timeval *get_deallocate_wait(struct timeval *buffer) const |
inline Sint16 getMaxThreads() const |
{ | { |
if(buffer == 0) |
return _maxThreads; |
throw NullPointer(); |
|
buffer->tv_sec = _deallocate_wait.tv_sec; |
|
buffer->tv_usec = _deallocate_wait.tv_usec; |
|
return buffer; |
|
} | } |
| |
inline void set_deadlock_detect(const struct timeval & deadlock) |
inline Uint32 runningCount() |
{ | { |
_deadlock_detect.tv_sec = deadlock.tv_sec; |
return _runningThreads.count(); |
_deadlock_detect.tv_usec = deadlock.tv_usec; |
|
} | } |
| |
inline struct timeval * get_deadlock_detect(struct timeval *buffer) const |
inline Uint32 idleCount() |
{ | { |
if(buffer == 0) |
return _idleThreads.count(); |
throw NullPointer(); |
|
buffer->tv_sec = _deadlock_detect.tv_sec; |
|
buffer->tv_usec = _deadlock_detect.tv_usec; |
|
return buffer; |
|
} | } |
| |
inline Uint32 running_count(void) |
|
{ |
|
return _running.count(); |
|
} |
|
|
|
static Boolean check_time(struct timeval *start, struct timeval *interval); |
|
|
|
private: | private: |
ThreadPool(void); |
|
Sint16 _max_threads; |
|
Sint16 _min_threads; |
|
AtomicInt _current_threads; |
|
struct timeval _allocate_wait; |
|
struct timeval _deallocate_wait; |
|
struct timeval _deadlock_detect; |
|
static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _loop(void *); |
|
Semaphore _waiters; |
|
Sint8 _key[17]; |
|
Semaphore _pool_sem; |
|
DQueue<Thread> _pool; |
|
DQueue<Thread> _running; |
|
DQueue<Thread> _dead; |
|
AtomicInt _dying; |
|
| |
|
ThreadPool(); // Unimplemented |
|
ThreadPool(const ThreadPool&); // Unimplemented |
|
ThreadPool& operator=(const ThreadPool&); // Unimplemented |
| |
static void _sleep_sem_del(void *p); |
static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _loop(void *); |
|
|
void _check_deadlock(struct timeval *start) throw(Deadlock); |
|
Boolean _check_deadlock_no_throw(struct timeval *start); |
|
Boolean _check_dealloc(struct timeval *start); |
|
Thread *_init_thread(void) throw(IPCException); |
|
void _link_pool(Thread *th) throw(IPCException); |
|
static PEGASUS_THREAD_RETURN _undertaker(void *); |
|
| |
|
static Boolean _timeIntervalExpired( |
|
struct timeval* start, |
|
struct timeval* interval); |
|
|
|
static void _deleteSemaphore(void* p); |
|
|
|
void _cleanupThread(Thread* thread); |
|
Thread* _initializeThread(); |
|
void _addToIdleThreadsQueue(Thread* th); |
|
|
|
Sint16 _maxThreads; |
|
Sint16 _minThreads; |
|
AtomicInt _currentThreads; |
|
struct timeval _deallocateWait; |
|
char _key[17]; |
|
DQueue<Thread> _idleThreads; |
|
DQueue<Thread> _runningThreads; |
|
AtomicInt _dying; |
}; | }; |
| |
| |
inline void ThreadPool::_sleep_sem_del(void *p) |
|
{ |
|
if(p != 0) |
|
{ |
|
delete (Semaphore *)p; |
|
} |
|
} |
|
|
|
inline void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock) |
|
{ |
|
if (true == check_time(start, &_deadlock_detect)) |
|
throw Deadlock(pegasus_thread_self()); |
|
return; |
|
} |
|
|
|
|
|
inline Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start) |
|
{ |
|
return(check_time(start, &_deadlock_detect)); |
|
} |
|
|
|
inline Boolean ThreadPool::_check_dealloc(struct timeval *start) |
|
{ |
|
return(check_time(start, &_deallocate_wait)); |
|
} |
|
|
|
inline Thread *ThreadPool::_init_thread(void) throw(IPCException) |
|
{ |
|
Thread *th = (Thread *) new Thread(&_loop, this, false); |
|
// allocate a sleep semaphore and pass it in the thread context |
|
// initial count is zero, loop function will sleep until |
|
// we signal the semaphore |
|
Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); |
|
th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem); |
|
struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval)); |
|
th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt); |
|
// thread will enter _loop(void *) and sleep on sleep_sem until we signal it |
|
th->run(); |
|
_current_threads++; |
|
return th; |
|
} |
|
|
|
inline void ThreadPool::_link_pool(Thread *th) throw(IPCException) |
|
{ |
|
if(th == 0) |
|
throw NullPointer(); |
|
_pool.insert_first(th); |
|
_pool_sem.signal(); |
|
} |
|
|
|
|
|
#if defined(PEGASUS_OS_TYPE_WINDOWS) | #if defined(PEGASUS_OS_TYPE_WINDOWS) |
# include "ThreadWindows_inline.h" | # include "ThreadWindows_inline.h" |
|
#elif defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
|
# include "ThreadzOS_inline.h" |
#elif defined(PEGASUS_OS_TYPE_UNIX) | #elif defined(PEGASUS_OS_TYPE_UNIX) |
# include "ThreadUnix_inline.h" | # include "ThreadUnix_inline.h" |
|
#elif defined(PEGASUS_OS_VMS) |
|
# include "ThreadVms_inline.h" |
#endif | #endif |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |