version 1.57.2.1, 2006/07/27 23:11:52
|
version 1.57.2.4, 2006/07/29 01:08:49
|
|
|
#include <Pegasus/Common/Mutex.h> | #include <Pegasus/Common/Mutex.h> |
#include <Pegasus/Common/Semaphore.h> | #include <Pegasus/Common/Semaphore.h> |
#include <Pegasus/Common/TSDKey.h> | #include <Pegasus/Common/TSDKey.h> |
|
#include <Pegasus/Common/Threads.h> |
|
|
|
#if defined(PEGASUS_HAVE_PTHREADS) |
|
# include <signal.h> |
|
#endif |
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
class PEGASUS_COMMON_LINKAGE cleanup_handler : public Linkable | class PEGASUS_COMMON_LINKAGE cleanup_handler : public Linkable |
{ | { |
|
|
public: | public: |
cleanup_handler( void (*routine)(void *), void *arg ) : _routine(routine), _arg(arg) {} |
cleanup_handler(void (*routine) (void *), void *arg):_routine(routine), |
~cleanup_handler() {; } |
_arg(arg) |
|
{ |
|
} |
|
~cleanup_handler() |
|
{; |
|
} |
| |
private: | private: |
void execute() { _routine(_arg); } |
|
|
void execute() |
|
{ |
|
_routine(_arg); |
|
} |
|
|
cleanup_handler(); | cleanup_handler(); |
void (*_routine)(void *); |
|
| |
|
void (*_routine)(void*); |
void *_arg; | void *_arg; |
ThreadCleanupType _cleanup_buffer; |
|
friend class Thread; | friend class Thread; |
}; | }; |
| |
/////////////////////////////////////////////////////////////////////////////// | /////////////////////////////////////////////////////////////////////////////// |
| |
|
|
class PEGASUS_COMMON_LINKAGE thread_data : public Linkable | class PEGASUS_COMMON_LINKAGE thread_data : public Linkable |
{ | { |
|
|
public: | public: |
|
|
static void default_delete(void *data); | static void default_delete(void *data); |
| |
thread_data( const char *key ) : _delete_func(NULL) , _data(NULL), _size(0) | thread_data( const char *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
|
|
_key.reset(new char[keysize + 1]); | _key.reset(new char[keysize + 1]); |
memcpy(_key.get(), key, keysize); | memcpy(_key.get(), key, keysize); |
_key.get()[keysize] = 0x00; | _key.get()[keysize] = 0x00; |
|
|
} | } |
| |
thread_data(const char *key, size_t size) : _delete_func(default_delete), _size(size) |
thread_data(const char *key, size_t size) : |
|
_delete_func(default_delete), _size(size) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
size_t keysize = strlen(key); | size_t keysize = strlen(key); |
|
|
memcpy(_key.get(), key, keysize); | memcpy(_key.get(), key, keysize); |
_key.get()[keysize] = 0x00; | _key.get()[keysize] = 0x00; |
_data = ::operator new(_size); | _data = ::operator new(_size); |
|
|
} | } |
| |
thread_data(const char *key, size_t size, void *data) : _delete_func(default_delete), _size(size) |
thread_data(const char *key, size_t size, void *data) : |
|
_delete_func(default_delete), _size(size) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
PEGASUS_ASSERT(data != NULL); | PEGASUS_ASSERT(data != NULL); |
|
|
return; | return; |
} | } |
| |
size_t get_size() { return _size; } |
size_t get_size() |
|
{ |
|
return _size; |
|
} |
| |
/** | /** |
* This function is used to retrieve data from the | * This function is used to retrieve data from the |
|
|
}; | }; |
| |
| |
enum ThreadStatus { |
enum ThreadStatus |
|
{ |
PEGASUS_THREAD_OK = 1, /* No problems */ | PEGASUS_THREAD_OK = 1, /* No problems */ |
PEGASUS_THREAD_INSUFFICIENT_RESOURCES, /* Can't allocate a thread. Not enough |
PEGASUS_THREAD_INSUFFICIENT_RESOURCES, /* Can't allocate a thread. |
memory. Try again later */ |
Not enough memory. Try |
PEGASUS_THREAD_SETUP_FAILURE, /* Could not allocate into the thread specific |
again later */ |
data storage. */ |
PEGASUS_THREAD_SETUP_FAILURE, /* Could not allocate into the thread |
PEGASUS_THREAD_UNAVAILABLE /* Service is being destroyed and no new threads can |
specific data storage. */ |
be provided. */ |
PEGASUS_THREAD_UNAVAILABLE /* Service is being destroyed and no new |
|
threads can be provided. */ |
}; | }; |
| |
/////////////////////////////////////////////////////////////////////////// | /////////////////////////////////////////////////////////////////////////// |
|
|
| |
~Thread(); | ~Thread(); |
| |
/** |
/** Start the thread. |
Start the thread. |
|
@return PEGASUS_THREAD_OK if the thread is started successfully, | @return PEGASUS_THREAD_OK if the thread is started successfully, |
PEGASUS_THREAD_INSUFFICIENT_RESOURCES if the resources necessary | PEGASUS_THREAD_INSUFFICIENT_RESOURCES if the resources necessary |
to start the thread are not currently available. | to start the thread are not currently available. |
|
|
ThreadStatus run(); | ThreadStatus run(); |
| |
// get the user parameter | // get the user parameter |
inline void *get_parm() { return _thread_parm; } |
inline void *get_parm() |
|
{ |
|
return _thread_parm; |
|
} |
| |
// cancellation must be deferred (not asynchronous) | // cancellation must be deferred (not asynchronous) |
// for user-level threads the thread itself can decide | // for user-level threads the thread itself can decide |
|
|
// create or re-initialize tsd associated with the key | // create or re-initialize tsd associated with the key |
// if the tsd already exists, delete the existing buffer | // if the tsd already exists, delete the existing buffer |
// @exception IPCException | // @exception IPCException |
void put_tsd(const char *key, void (*delete_func)(void *), Uint32 size, void *value) |
void put_tsd(const char *key, void (*delete_func) (void *), Uint32 size, |
|
void *value) |
{ | { |
PEGASUS_ASSERT(key != NULL); | PEGASUS_ASSERT(key != NULL); |
AutoPtr<thread_data> tsd; | AutoPtr<thread_data> tsd; |
tsd.reset(_tsd.remove(thread_data::equal, key)); // may throw an IPC exception |
tsd.reset(_tsd.remove(thread_data::equal, key)); // may throw |
|
// an IPC |
|
// exception |
tsd.reset(); | tsd.reset(); |
AutoPtr<thread_data> ntsd(new thread_data(key)); | AutoPtr<thread_data> ntsd(new thread_data(key)); |
ntsd->put_data(delete_func, size, value); | ntsd->put_data(delete_func, size, value); |
try { _tsd.insert_front(ntsd.get()); } |
try |
catch(IPCException& e) { e = e; throw; } |
{ |
|
_tsd.insert_front(ntsd.get()); |
|
} |
|
catch(IPCException & e) |
|
{ |
|
e = e; |
|
throw; |
|
} |
ntsd.release(); | ntsd.release(); |
} | } |
inline ThreadReturnType get_exit() { return _exit_code; } |
inline ThreadReturnType get_exit() |
inline ThreadType self() {return Threads::self(); } |
{ |
|
return _exit_code; |
|
} |
|
inline ThreadType self() |
|
{ |
|
return Threads::self(); |
|
} |
| |
ThreadHandle getThreadHandle() {return _handle;} |
ThreadHandle getThreadHandle() |
|
{ |
|
return _handle; |
|
} |
| |
void detach(); | void detach(); |
| |
|
|
@param deallocateWait The minimum time that a thread should be idle | @param deallocateWait The minimum time that a thread should be idle |
before it is removed from the pool and cleaned up. | before it is removed from the pool and cleaned up. |
*/ | */ |
ThreadPool( |
ThreadPool(Sint16 initialSize, |
Sint16 initialSize, |
|
const char* key, | const char* key, |
Sint16 minThreads, | Sint16 minThreads, |
Sint16 maxThreads, |
Sint16 maxThreads, struct timeval &deallocateWait); |
struct timeval& deallocateWait); |
|
| |
/** | /** |
Destructs the ThreadPool object. | Destructs the ThreadPool object. |
|
|
be allocated. | be allocated. |
@exception IPCException | @exception IPCException |
*/ | */ |
ThreadStatus allocate_and_awaken( |
ThreadStatus allocate_and_awaken(void *parm, |
void* parm, |
ThreadReturnType(PEGASUS_THREAD_CDECL * |
ThreadReturnType (PEGASUS_THREAD_CDECL* work)(void *), |
work) (void *), |
Semaphore* blocking = 0); | Semaphore* blocking = 0); |
| |
/** | /** |
|
|
| |
static ThreadReturnType PEGASUS_THREAD_CDECL _loop(void *); | static ThreadReturnType PEGASUS_THREAD_CDECL _loop(void *); |
| |
static Boolean _timeIntervalExpired( |
static Boolean _timeIntervalExpired(struct timeval *start, |
struct timeval* start, |
|
struct timeval* interval); | struct timeval* interval); |
| |
static void _deleteSemaphore(void* p); | static void _deleteSemaphore(void* p); |
|
|
AtomicInt _dying; | AtomicInt _dying; |
}; | }; |
| |
|
//============================================================================== |
|
// |
|
// POSIX Threads Implementation: |
|
// |
|
//============================================================================== |
|
|
|
#if defined(PEGASUS_HAVE_PTHREADS) |
| |
#if defined(PEGASUS_OS_TYPE_WINDOWS) |
struct StartWrapperArg |
# include "ThreadWindows_inline.h" |
{ |
#elif defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
void *(PEGASUS_THREAD_CDECL * start) (void *); |
# include "ThreadzOS_inline.h" |
void *arg; |
#elif defined(PEGASUS_OS_TYPE_UNIX) |
}; |
# include "ThreadUnix_inline.h" |
|
#elif defined(PEGASUS_OS_VMS) |
extern "C" void *_start_wrapper(void *arg); |
# include "ThreadVms_inline.h" |
|
|
inline ThreadStatus Thread::run() |
|
{ |
|
StartWrapperArg *arg = new StartWrapperArg; |
|
arg->start = _start; |
|
arg->arg = this; |
|
|
|
Threads::Type type = _is_detached ? Threads::DETACHED : Threads::JOINABLE; |
|
int rc = Threads::create(_handle.thid, type, _start_wrapper, arg); |
|
|
|
// On Linux distributions released prior 2005, the implementation of |
|
// Native POSIX Thread Library returns ENOMEM instead of EAGAIN when |
|
// there |
|
// are no insufficient memory. Hence we are checking for both. See bug |
|
// 386. |
|
|
|
if ((rc == EAGAIN) || (rc == ENOMEM)) |
|
{ |
|
Threads::clear(_handle.thid); |
|
delete arg; |
|
return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; |
|
} |
|
else if (rc != 0) |
|
{ |
|
Threads::clear(_handle.thid); |
|
delete arg; |
|
return PEGASUS_THREAD_SETUP_FAILURE; |
|
} |
|
return PEGASUS_THREAD_OK; |
|
} |
|
|
|
inline void Thread::cancel() |
|
{ |
|
_cancelled = true; |
|
pthread_cancel(_handle.thid.tt_handle()); |
|
} |
|
|
|
inline void Thread::test_cancel() |
|
{ |
|
#if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
|
pthread_testintr(); |
|
#else |
|
pthread_testcancel(); |
#endif | #endif |
|
} |
| |
PEGASUS_NAMESPACE_END |
inline Boolean Thread::is_cancelled(void) |
|
{ |
|
return _cancelled; |
|
} |
|
|
|
inline void Thread::thread_switch() |
|
{ |
|
#if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
|
pthread_yield(NULL); |
|
#else |
|
sched_yield(); |
|
#endif |
|
} |
|
|
|
/* |
|
ATTN: why are these missing on other platforms? |
|
*/ |
|
#if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU) |
|
inline void Thread::suspend() |
|
{ |
|
pthread_kill(_handle.thid.tt_handle(), SIGSTOP); |
|
} |
| |
|
inline void Thread::resume() |
|
{ |
|
pthread_kill(_handle.thid.tt_handle(), SIGCONT); |
|
} |
|
#endif |
|
|
|
inline void Thread::sleep(Uint32 msec) |
|
{ |
|
Threads::sleep(msec); |
|
} |
|
|
|
inline void Thread::join(void) |
|
{ |
|
if (!_is_detached && Threads::id(_handle.thid) != 0) |
|
pthread_join(_handle.thid.tt_handle(), &_exit_code); |
|
|
|
Threads::clear(_handle.thid); |
|
} |
|
|
|
inline void Thread::thread_init(void) |
|
{ |
|
#if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
|
pthread_setintr(PTHREAD_INTR_ENABLE); |
|
pthread_setintrtype(PTHREAD_INTR_ASYNCHRONOUS); |
|
#else |
|
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); |
|
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); |
|
#endif |
|
_cancel_enabled = true; |
|
} |
|
|
|
inline void Thread::detach(void) |
|
{ |
|
_is_detached = true; |
|
pthread_detach(_handle.thid.tt_handle()); |
|
} |
|
|
|
#endif /* PEGASUS_HAVE_PTHREADS */ |
|
|
|
//============================================================================== |
|
// |
|
// Windows Threads Implementation: |
|
// |
|
//============================================================================== |
|
|
|
#if defined(PEGASUS_HAVE_WINDOWS_THREADS) |
|
|
|
inline ThreadStatus Thread::run(void) |
|
{ |
|
// Note: A Win32 thread ID is not the same thing as a pthread ID. |
|
// Win32 threads have both a thread ID and a handle. The handle |
|
// is used in the wait functions, etc. |
|
// So _handle.thid is actually the thread handle. |
|
|
|
unsigned threadid = 0; |
|
|
|
ThreadType tt; |
|
tt.handle = (HANDLE) _beginthreadex(NULL, 0, _start, this, 0, &threadid); |
|
_handle.thid = tt; |
|
|
|
if (Threads::id(_handle.thid) == 0) |
|
{ |
|
if (errno == EAGAIN) |
|
{ |
|
return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; |
|
} |
|
else |
|
{ |
|
return PEGASUS_THREAD_SETUP_FAILURE; |
|
} |
|
} |
|
return PEGASUS_THREAD_OK; |
|
} |
|
|
|
inline void Thread::cancel(void) |
|
{ |
|
_cancelled = true; |
|
} |
|
|
|
inline void Thread::test_cancel(void) |
|
{ |
|
if (_cancel_enabled && _cancelled) |
|
{ |
|
exit_self(0); |
|
} |
|
} |
|
|
|
inline Boolean Thread::is_cancelled(void) |
|
{ |
|
return _cancelled; |
|
} |
|
|
|
inline void Thread::thread_switch(void) |
|
{ |
|
Sleep(0); |
|
} |
|
|
|
inline void Thread::sleep(Uint32 milliseconds) |
|
{ |
|
Sleep(milliseconds); |
|
} |
|
|
|
inline void Thread::join(void) |
|
{ |
|
if (Threads::id(_handle.thid) != 0) |
|
{ |
|
if (!_is_detached) |
|
{ |
|
if (!_cancelled) |
|
{ |
|
// Emulate the unix join api. Caller sleeps until thread is |
|
// done. |
|
WaitForSingleObject(_handle.thid.handle, INFINITE); |
|
} |
|
else |
|
{ |
|
// Currently this is the only way to ensure this code does |
|
// not |
|
// hang forever. |
|
if (WaitForSingleObject(_handle.thid.handle, 10000) == |
|
WAIT_TIMEOUT) |
|
{ |
|
TerminateThread(_handle.thid.handle, 0); |
|
} |
|
} |
|
|
|
DWORD exit_code = 0; |
|
GetExitCodeThread(_handle.thid.handle, &exit_code); |
|
_exit_code = (ThreadReturnType) exit_code; |
|
} |
|
|
|
CloseHandle(_handle.thid.handle); |
|
Threads::clear(_handle.thid); |
|
} |
|
} |
|
|
|
inline void Thread::thread_init(void) |
|
{ |
|
_cancel_enabled = true; |
|
} |
|
|
|
inline void Thread::detach(void) |
|
{ |
|
_is_detached = true; |
|
} |
|
|
|
#endif /* PEGASUS_HAVE_WINDOWS_THREADS */ |
|
|
|
PEGASUS_NAMESPACE_END |
#endif // Pegasus_Thread_h | #endif // Pegasus_Thread_h |