version 1.1.2.2, 2001/07/30 14:37:56
|
version 1.27, 2003/06/14 19:25:38
|
|
|
//%///////////////////////////////////////////////////////////////////////////// |
//%/-*-c++-*-//////////////////////////////////////////////////////////////////////////// |
// | // |
// Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM |
// Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM, |
|
// The Open Group, Tivoli Systems |
// | // |
// Permission is hereby granted, free of charge, to any person obtaining a copy | // Permission is hereby granted, free of charge, to any person obtaining a copy |
// of this software and associated documentation files (the "Software"), to | // of this software and associated documentation files (the "Software"), to |
|
|
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
|
#ifndef Pegasus_Thread_h | #ifndef Pegasus_Thread_h |
#define Pegasus_Thread_h | #define Pegasus_Thread_h |
| |
#include <Pegasus/Common/IPC.h> |
#include <cstring> |
#include <Pegasus/Common/Config.h> | #include <Pegasus/Common/Config.h> |
#include <Pegasus/Common/Exception.h> |
#include <Pegasus/Common/IPC.h> |
|
#include <Pegasus/Common/InternalException.h> |
#include <Pegasus/Common/DQueue.h> | #include <Pegasus/Common/DQueue.h> |
|
#include <Pegasus/Common/Linkage.h> |
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
class PEGASUS_EXPORT cleanup_handler |
class PEGASUS_COMMON_LINKAGE cleanup_handler |
{ | { |
| |
public: | public: |
cleanup_handler( void (*routine)(void *), void *arg ) : _routine(routine), _arg(arg) {} | cleanup_handler( void (*routine)(void *), void *arg ) : _routine(routine), _arg(arg) {} |
~cleanup_handler() ; |
~cleanup_handler() {; } |
|
inline Boolean operator==(const void *key) const |
|
{ |
|
if(key == (void *)_routine) |
|
return true; |
|
return false; |
|
} |
|
inline Boolean operator ==(const cleanup_handler & b) const |
|
{ |
|
return(operator==((const void *)b._routine)); |
|
} |
private: | private: |
void execute(void) { _routine(_arg); } | void execute(void) { _routine(_arg); } |
cleanup_handler(); | cleanup_handler(); |
void (*_routine)(void *); | void (*_routine)(void *); |
inline Boolean operator==(void *key) { if(key == (void *)_routine) return true; return false; } |
|
void *_arg; | void *_arg; |
PEGASUS_CLEANUP_HANDLE _cleanup_buffer; | PEGASUS_CLEANUP_HANDLE _cleanup_buffer; |
friend class Dqueue; |
friend class DQueue<class cleanup_handler>; |
friend class Thread; | friend class Thread; |
}; | }; |
| |
/////////////////////////////////////////////////////////////////////////////// | /////////////////////////////////////////////////////////////////////////////// |
| |
class PEGASUS_EXPORT SimpleThread |
|
|
class PEGASUS_COMMON_LINKAGE thread_data |
{ | { |
| |
public: | public: |
SimpleThread( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *start )(void *), |
static void default_delete(void *data); |
void *parameter, Boolean detached ); |
|
|
|
~SimpleThread(); |
|
|
|
void run(void); |
|
|
|
Uint32 threadId(void); |
|
|
|
// get the user parameter |
|
void *get_parm(void); |
|
|
|
// cancellation must be deferred (not asynchronous) |
|
// for user-level threads the thread itself can decide |
|
// when it should die. |
|
void cancel(void); |
|
|
|
void kill(int signum); |
|
|
|
// cancel if there is a pending cancellation request |
|
void test_cancel(void); |
|
|
|
// for user-level threads - put the calling thread |
|
// to sleep and jump to the thread scheduler. |
|
// platforms with preemptive scheduling and native threads |
|
// can define this to be a no-op. |
|
// platforms without preemptive scheduling like NetWare |
|
// or gnu portable threads will have an existing |
|
// routine that can be mapped to this method |
|
|
|
void thread_switch(void); |
|
|
|
// suspend this thread |
|
void suspend(void) ; |
|
|
|
// resume this thread |
|
void resume(void) ; |
|
|
|
void sleep(Uint32 msec) ; |
|
|
|
// block the calling thread until this thread terminates |
|
void join( PEGASUS_THREAD_RETURN *ret_val); |
|
|
|
| |
// stack of functions to be called when thread terminates |
thread_data( const Sint8 *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
// will be called last in first out (LIFO) |
{ |
void cleanup_push( void (*routine) (void *), void *parm ); |
PEGASUS_ASSERT(key != NULL); |
void cleanup_pop(Boolean execute) ; |
size_t keysize = strlen(key); |
|
_key = new Sint8 [keysize + 1]; |
PEGASUS_THREAD_TYPE self(void) ; |
memcpy(_key, key, keysize); |
|
_key[keysize] = 0x00; |
| |
private: |
} |
SimpleThread(); |
|
| |
PEGASUS_THREAD_HANDLE _handle; |
thread_data(const Sint8 *key, size_t size) : _delete_func(default_delete), _size(size) |
Boolean _is_detached; |
{ |
Boolean _cancel_enabled; |
PEGASUS_ASSERT(key != NULL); |
Boolean _cancelled; |
size_t keysize = strlen(key); |
|
_key = new Sint8 [keysize + 1]; |
|
memcpy(_key, key, keysize); |
|
_key[keysize] = 0x00; |
|
_data = ::operator new(_size) ; |
| |
//PEGASUS_SEM_HANDLE _suspend_count; |
} |
Semaphore _suspend; |
|
| |
// always pass this * as the void * parameter to the thread |
thread_data(const Sint8 *key, size_t size, void *data) : _delete_func(default_delete), _size(size) |
// store the user parameter in _thread_parm |
{ |
|
PEGASUS_ASSERT(key != NULL); |
|
PEGASUS_ASSERT(data != NULL); |
|
size_t keysize = strlen(key); |
|
|
|
_key = new Sint8[keysize + 1]; |
|
memcpy(_key, key, keysize); |
|
_key[keysize] = 0x00; |
|
_data = ::operator new(_size); |
|
memcpy(_data, data, size); |
|
} |
| |
PEGASUS_THREAD_RETURN ( PEGASUS_THREAD_CDECL *_start)(void *) ; |
~thread_data() |
|
{ |
|
if( _data != NULL) |
|
if(_delete_func != NULL) |
|
{ |
|
_delete_func( _data ); |
|
} |
|
if( _key != NULL ) |
|
delete [] _key; |
|
} |
| |
void *_thread_parm; |
void put_data(void (*del)(void *), size_t size, void *data ) throw(NullPointer) |
} ; |
{ |
|
if(_data != NULL) |
|
if(_delete_func != NULL) |
|
_delete_func(_data); |
| |
/////////////////////////////////////////////////////////////////////////////// |
_delete_func = del; |
|
_data = data; |
|
_size = size; |
|
return ; |
|
} |
| |
static void default_delete(void * data) { delete [] (char *) data; } |
size_t get_size(void) { return _size; } |
| |
class PEGASUS_EXPORT thread_data |
void get_data(void **data, size_t *size) |
{ | { |
|
if(data == NULL || size == NULL) |
|
throw NullPointer(); |
|
|
|
*data = _data; |
|
*size = _size; |
|
return; |
| |
public: |
|
thread_data( Sint8 *key ) : _delete_func(NULL) , _data(NULL), _size(0) |
|
{ |
|
_key = strdup(key) ; |
|
} | } |
| |
thread_data(Sint8 *key, int size) |
void copy_data(void **buf, size_t *size) throw(NullPointer) |
{ | { |
_delete_func = default_delete; |
if((buf == NULL) || (size == NULL)) |
_data = new char [size]; |
throw NullPointer() ; |
|
*buf = ::operator new(_size); |
|
*size = _size; |
|
memcpy(*buf, _data, _size); |
|
return; |
} | } |
| |
thread_data(Sint8 *key, int size, void *data) |
inline Boolean operator==(const void *key) const |
{ | { |
_delete_func = default_delete; |
if ( ! strcmp(_key, (Sint8 *)key)) |
_data = new char [size]; |
return(true); |
memcpy(_data, data, size); |
return(false); |
} | } |
| |
~thread_data() { if( _data != NULL) _delete_func( _data ); } |
inline Boolean operator==(const thread_data& b) const |
|
|
void *get_data(void ); |
|
Uint32 get_data_size(void); |
|
void *put_data(Sint8 *key, void (*delete_func) (void *), Uint32 size, void *data ) |
|
{ | { |
void *old_data = data; |
return(operator==((const void *)b._key)); |
_delete_func = delete_func; |
|
_data = data; |
|
return(old_data); |
|
} | } |
|
|
private: | private: |
inline Boolean operator ==(void *key) ; |
|
void (*_delete_func) (void *data) ; | void (*_delete_func) (void *data) ; |
thread_data(); | thread_data(); |
void *_data; | void *_data; |
Uint32 _size; |
size_t _size; |
Sint8 *_key; | Sint8 *_key; |
friend class Dqueue; |
|
|
friend class DQueue<thread_data>; |
friend class Thread; | friend class Thread; |
}; | }; |
| |
| |
/////////////////////////////////////////////////////////////////////////// | /////////////////////////////////////////////////////////////////////////// |
| |
class PEGASUS_EXPORT Thread |
class PEGASUS_COMMON_LINKAGE ThreadPool; |
|
|
|
class PEGASUS_COMMON_LINKAGE Thread |
{ | { |
| |
public: | public: |
|
|
void run(void); | void run(void); |
| |
// get the user parameter | // get the user parameter |
void *get_parm(void); |
inline void *get_parm(void) { return _thread_parm; } |
| |
// send the thread a signal -- may not be appropriate due to Windows | // send the thread a signal -- may not be appropriate due to Windows |
// void kill(int signum); | // void kill(int signum); |
|
|
// cancel if there is a pending cancellation request | // cancel if there is a pending cancellation request |
void test_cancel(void); | void test_cancel(void); |
| |
|
Boolean is_cancelled(void); |
|
|
// for user-level threads - put the calling thread | // for user-level threads - put the calling thread |
// to sleep and jump to the thread scheduler. | // to sleep and jump to the thread scheduler. |
// platforms with preemptive scheduling and native threads | // platforms with preemptive scheduling and native threads |
|
|
| |
void thread_switch(void); | void thread_switch(void); |
| |
|
#if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU) |
// suspend this thread | // suspend this thread |
// void suspend(void) ; |
void suspend(void) ; |
| |
// resume this thread | // resume this thread |
// void resume(void) ; |
void resume(void) ; |
|
#endif |
| |
void sleep(Uint32 msec) ; |
static void sleep(Uint32 msec) ; |
| |
// block the calling thread until this thread terminates | // block the calling thread until this thread terminates |
void join(void ); | void join(void ); |
|
void thread_init(void); |
| |
|
// thread routine needs to call this function when |
|
// it is ready to exit |
|
void exit_self(PEGASUS_THREAD_RETURN return_code) ; |
| |
// stack of functions to be called when thread terminates | // stack of functions to be called when thread terminates |
// will be called last in first out (LIFO) | // will be called last in first out (LIFO) |
void cleanup_push( void (*routine) (void *), void *parm ) throw(IPCException); | void cleanup_push( void (*routine) (void *), void *parm ) throw(IPCException); |
void cleanup_pop(Boolean execute = true) throw(IPCException); | void cleanup_pop(Boolean execute = true) throw(IPCException); |
| |
// thread specific data (thread_data object methods) |
// create and initialize a tsd |
|
inline void create_tsd(const Sint8 *key, int size, void *buffer) throw(IPCException) |
|
{ |
|
thread_data *tsd = new thread_data(key, size, buffer); |
|
try { _tsd.insert_first(tsd); } |
|
catch(IPCException& e) { e = e; delete tsd; throw; } |
|
} |
| |
// create an empty tsd and index it according to <key> |
// get the buffer associated with the key |
void create_tsd(void *key ); |
// NOTE: this call leaves the tsd LOCKED !!!! |
|
inline void *reference_tsd(const Sint8 *key) throw(IPCException) |
|
{ |
|
_tsd.lock(); |
|
thread_data *tsd = _tsd.reference((const void *)key); |
|
if(tsd != NULL) |
|
return( (void *)(tsd->_data) ); |
|
else |
|
return(NULL); |
|
} |
| |
// create an empty tsd with a pre-allocated buffer of <size> |
inline void *try_reference_tsd(const Sint8 *key) throw(IPCException) |
void create_tsd(void *key, int size) ; |
{ |
|
_tsd.try_lock(); |
|
thread_data *tsd = _tsd.reference((const void *)key); |
|
if(tsd != NULL) |
|
return((void *)(tsd->_data) ); |
|
else |
|
return(NULL); |
|
} |
| |
// create and initialize a tsd |
|
void create_tsd(void *key, int size, void *buffer); |
|
| |
// get the buffer associated with the key |
// release the lock held on the tsd |
void *get_tsd(void *key); |
// NOTE: assumes a corresponding and prior call to reference_tsd() !!! |
|
inline void dereference_tsd(void) throw(IPCException) |
|
{ |
|
_tsd.unlock(); |
|
} |
| |
// delete the tsd associated with the key | // delete the tsd associated with the key |
void delete_tsd(void *key); |
inline void delete_tsd(const Sint8 *key) throw(IPCException) |
|
{ |
|
thread_data *tsd = _tsd.remove((const void *)key); |
|
if(tsd != NULL) |
|
delete tsd; |
|
} |
|
|
|
// Note: Caller must delete the thread_data object returned (if not null) |
|
inline void *remove_tsd(const Sint8 *key) throw(IPCException) |
|
{ |
|
return(_tsd.remove((const void *)key)); |
|
} |
|
|
|
inline void empty_tsd(void) throw(IPCException) |
|
{ |
|
thread_data* tsd; |
|
while (0 != (tsd = _tsd.remove_first())) |
|
{ |
|
delete tsd; |
|
} |
|
//_tsd.empty_list(); |
|
} |
| |
// create or re-initialize tsd associated with the key | // create or re-initialize tsd associated with the key |
// if the tsd already exists, return the existing buffer |
// if the tsd already exists, delete the existing buffer |
void * put_tsd(void *key, void (*delete_func)(void *), Uint32 size, void *value); |
void put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) |
|
throw(IPCException) |
| |
|
{ |
|
PEGASUS_ASSERT(key != NULL); |
|
thread_data *tsd ; |
|
tsd = _tsd.remove((const void *)key); // may throw an IPC exception |
|
delete tsd; |
|
thread_data *ntsd = new thread_data(key); |
|
ntsd->put_data(delete_func, size, value); |
|
try { _tsd.insert_first(ntsd); } |
|
catch(IPCException& e) { e = e; delete ntsd; throw; } |
|
} |
inline PEGASUS_THREAD_RETURN get_exit(void) { return _exit_code; } | inline PEGASUS_THREAD_RETURN get_exit(void) { return _exit_code; } |
inline PEGASUS_THREAD_TYPE self(void) {return _handle.thid; } |
inline PEGASUS_THREAD_TYPE self(void) {return pegasus_thread_self(); } |
|
|
|
PEGASUS_THREAD_HANDLE getThreadHandle() {return _handle;} |
|
|
|
inline Boolean operator==(const void *key) const |
|
{ |
|
if ( (void *)this == key) |
|
return(true); |
|
return(false); |
|
} |
|
inline Boolean operator==(const Thread & b) const |
|
{ |
|
return(operator==((const void *)&b )); |
|
} |
|
|
|
void detach(void); |
| |
private: | private: |
Thread(); | Thread(); |
|
inline void create_tsd(const Sint8 *key ) throw(IPCException) |
|
{ |
|
thread_data *tsd = new thread_data(key); |
|
try { _tsd.insert_first(tsd); } |
|
catch(IPCException& e) { e = e; delete tsd; throw; } |
|
} |
PEGASUS_THREAD_HANDLE _handle; | PEGASUS_THREAD_HANDLE _handle; |
Boolean _is_detached; | Boolean _is_detached; |
Boolean _cancel_enabled; | Boolean _cancel_enabled; |
|
|
// store the user parameter in _thread_parm | // store the user parameter in _thread_parm |
| |
PEGASUS_THREAD_RETURN ( PEGASUS_THREAD_CDECL *_start)(void *) ; | PEGASUS_THREAD_RETURN ( PEGASUS_THREAD_CDECL *_start)(void *) ; |
|
DQueue<class cleanup_handler> _cleanup; |
|
DQueue<class thread_data> _tsd; |
| |
DQueue<cleanup_handler> _cleanup; |
|
DQueue<thread_data> _tsd; |
|
void *_thread_parm; | void *_thread_parm; |
|
|
PEGASUS_THREAD_RETURN _exit_code; | PEGASUS_THREAD_RETURN _exit_code; |
|
static Boolean _signals_blocked; |
|
friend class ThreadPool; |
} ; | } ; |
| |
| |
#if 0 |
class PEGASUS_COMMON_LINKAGE ThreadPool |
class PEGASUS_EXPORT Aggregator { |
{ |
|
|
public: | public: |
| |
Aggregator(); |
ThreadPool(Sint16 initial_size, |
~Aggregator(); |
const Sint8 *key, |
|
Sint16 min, |
|
Sint16 max, |
|
struct timeval & alloc_wait, |
|
struct timeval & dealloc_wait, |
|
struct timeval & deadlock_detect); |
| |
void started(void); |
~ThreadPool(void); |
void completed(void); |
|
void remaining(int operations); |
|
void put_result(CIMReference *ref); |
|
| |
private: |
void allocate_and_awaken(void *parm, |
int _reference_count; |
PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *work)(void *), |
|
Semaphore *blocking = 0) |
|
throw(IPCException); |
|
|
|
|
|
Uint32 kill_dead_threads( void ) |
|
throw(IPCException); |
|
|
|
void get_key(Sint8 *buf, int bufsize); |
|
|
|
inline Boolean operator==(const void *key) const |
|
{ |
|
if ( ! strncmp( reinterpret_cast<Sint8 *>(const_cast<void *>(key)), _key, 16 )) |
|
return(true); |
|
return(false); |
|
} |
|
inline Boolean operator==(const ThreadPool & b) const |
|
{ |
|
return(operator==((const void *) b._key )); |
|
} |
|
|
|
inline void set_min_threads(Sint16 min) |
|
{ |
|
_min_threads = min; |
|
} |
|
|
|
inline Sint16 get_min_threads(void) const |
|
{ |
|
return _min_threads; |
|
} |
| |
// keep track of the thread running this operation so we can kill |
inline void set_max_threads(Sint16 max) |
// it if necessary |
{ |
Thread _owner; |
_max_threads = max; |
|
} |
// this is a phased aggregate. when it is complete is will |
|
// be streamed to the client regardless of the state of |
inline Sint16 get_max_threads(void) const |
// siblings |
{ |
Boolean _is_phased; |
return _max_threads; |
|
} |
int _total_values; |
|
int _completed_values; |
inline void set_allocate_wait(const struct timeval & alloc_wait) |
int _total_child_values; |
{ |
int _completed_child_values; |
_allocate_wait.tv_sec = alloc_wait.tv_sec; |
int _completion_state; |
_allocate_wait.tv_usec = alloc_wait.tv_usec; |
struct timeval _last_update; |
} |
time_t lifetime; |
|
Aggregator *_parent; |
inline struct timeval *get_allocate_wait(struct timeval *buffer) const |
// children may be phased or not phased |
{ |
DQueue _children; |
if(buffer == 0) |
// empty results that are filled by provider |
throw NullPointer(); |
DQueue _results; |
buffer->tv_sec = _allocate_wait.tv_sec; |
// array of predicates for events and |
buffer->tv_usec = _allocate_wait.tv_usec; |
// stored queries (cursors) |
return buffer; |
Array _filter; |
} |
|
|
|
inline void set_deallocate_wait(const struct timeval & dealloc_wait) |
|
{ |
|
_deallocate_wait.tv_sec = dealloc_wait.tv_sec; |
|
_deallocate_wait.tv_usec = dealloc_wait.tv_usec; |
|
} |
|
|
|
inline struct timeval *get_deallocate_wait(struct timeval *buffer) const |
|
{ |
|
if(buffer == 0) |
|
throw NullPointer(); |
|
buffer->tv_sec = _deallocate_wait.tv_sec; |
|
buffer->tv_usec = _deallocate_wait.tv_usec; |
|
return buffer; |
|
} |
|
|
|
inline void set_deadlock_detect(const struct timeval & deadlock) |
|
{ |
|
_deadlock_detect.tv_sec = deadlock.tv_sec; |
|
_deadlock_detect.tv_usec = deadlock.tv_usec; |
|
} |
|
|
|
inline struct timeval * get_deadlock_detect(struct timeval *buffer) const |
|
{ |
|
if(buffer == 0) |
|
throw NullPointer(); |
|
buffer->tv_sec = _deadlock_detect.tv_sec; |
|
buffer->tv_usec = _deadlock_detect.tv_usec; |
|
return buffer; |
|
} |
|
|
|
inline Uint32 running_count(void) |
|
{ |
|
return _running.count(); |
|
} |
|
|
|
inline Uint32 pool_count(void) |
|
{ |
|
return _pool.count(); |
|
} |
|
inline Uint32 dead_count(void) |
|
{ |
|
return _dead.count(); |
|
} |
|
|
|
|
|
static Boolean check_time(struct timeval *start, struct timeval *interval); |
|
|
|
Boolean operator ==(const ThreadPool & p) |
|
{ |
|
return operator==((const void *)&p); |
|
} |
|
|
|
Boolean operator ==(const void *p) |
|
{ |
|
if((void *)this == p) |
|
return true; |
|
return false; |
|
} |
|
|
|
static void kill_idle_threads(void); |
|
|
|
private: |
|
ThreadPool(void); |
|
Sint16 _max_threads; |
|
Sint16 _min_threads; |
|
AtomicInt _current_threads; |
|
struct timeval _allocate_wait; |
|
struct timeval _deallocate_wait; |
|
struct timeval _deadlock_detect; |
|
static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _loop(void *); |
|
Sint8 _key[17]; |
|
DQueue<Thread> _pool; |
|
DQueue<Thread> _running; |
|
DQueue<Thread> _dead; |
|
AtomicInt _dying; |
|
static void _sleep_sem_del(void *p); |
|
|
|
void _check_deadlock(struct timeval *start) throw(Deadlock); |
|
Boolean _check_deadlock_no_throw(struct timeval *start); |
|
Boolean _check_dealloc(struct timeval *start); |
|
Thread *_init_thread(void) throw(IPCException); |
|
void _link_pool(Thread *th) throw(IPCException); |
|
static PEGASUS_THREAD_RETURN _undertaker(void *); |
|
static DQueue<ThreadPool> _pools; |
} ; | } ; |
|
|
|
|
|
|
|
|
|
#if defined(PEGASUS_OS_TYPE_WINDOWS) |
|
# include "ThreadWindows_inline.h" |
|
#elif defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) |
|
# include "ThreadzOS_inline.h" |
|
#elif defined(PEGASUS_OS_TYPE_UNIX) |
|
# include "ThreadUnix_inline.h" |
#endif | #endif |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |