//%2006//////////////////////////////////////////////////////////////////////// // // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.; // IBM Corp.; EMC Corporation, The Open Group. // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; // EMC Corporation; VERITAS Software Corporation; The Open Group. // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.; // EMC Corporation; Symantec Corporation; The Open Group. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to // deal in the Software without restriction, including without limitation the // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or // sell copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // //============================================================================== // // Author: Mike Day (mdday@us.ibm.com) // // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01 // added nsk platform support // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com) // David Dillard, VERITAS Software Corp. // (david.dillard@veritas.com) // //%///////////////////////////////////////////////////////////////////////////// #include "Thread.h" #include #include #include "Time.h" PEGASUS_USING_STD; PEGASUS_NAMESPACE_BEGIN //============================================================================== // // POSIX Threads Implementation: // //============================================================================== #if defined(PEGASUS_HAVE_PTHREADS) struct StartWrapperArg { void *(PEGASUS_THREAD_CDECL * start) (void *); void *arg; }; extern "C" void *_start_wrapper(void *arg_) { StartWrapperArg *arg = (StartWrapperArg *) arg_; void *return_value = (*arg->start) (arg->arg); delete arg; return return_value; } void Thread::cancel() { _cancelled = true; pthread_cancel(_handle.thid.tt_handle()); } void Thread::test_cancel() { #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) pthread_testintr(); #else pthread_testcancel(); #endif } Boolean Thread::is_cancelled(void) { return _cancelled; } void Thread::thread_switch() { #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) pthread_yield(NULL); #else sched_yield(); #endif } /* ATTN: why are these missing on other platforms? */ #if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU) void Thread::suspend() { pthread_kill(_handle.thid.tt_handle(), SIGSTOP); } void Thread::resume() { pthread_kill(_handle.thid.tt_handle(), SIGCONT); } #endif void Thread::sleep(Uint32 msec) { Threads::sleep(msec); } void Thread::join(void) { if (!_is_detached && Threads::id(_handle.thid) != 0) pthread_join(_handle.thid.tt_handle(), &_exit_code); Threads::clear(_handle.thid); } void Thread::thread_init(void) { #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) pthread_setintr(PTHREAD_INTR_ENABLE); pthread_setintrtype(PTHREAD_INTR_ASYNCHRONOUS); #else pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); #endif _cancel_enabled = true; } void Thread::detach(void) { _is_detached = true; pthread_detach(_handle.thid.tt_handle()); } ThreadStatus Thread::run() { StartWrapperArg *arg = new StartWrapperArg; arg->start = _start; arg->arg = this; Threads::Type type = _is_detached ? Threads::DETACHED : Threads::JOINABLE; int rc = Threads::create(_handle.thid, type, _start_wrapper, arg); // On Linux distributions released prior 2005, the implementation of // Native POSIX Thread Library returns ENOMEM instead of EAGAIN when // there // are no insufficient memory. Hence we are checking for both. See bug // 386. if ((rc == EAGAIN) || (rc == ENOMEM)) { Threads::clear(_handle.thid); delete arg; return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; } else if (rc != 0) { Threads::clear(_handle.thid); delete arg; return PEGASUS_THREAD_SETUP_FAILURE; } return PEGASUS_THREAD_OK; } static sigset_t *block_signal_mask(sigset_t * sig) { sigemptyset(sig); // should not be used for main() sigaddset(sig, SIGHUP); sigaddset(sig, SIGINT); // maybe useless, since KILL can't be blocked according to POSIX sigaddset(sig, SIGKILL); sigaddset(sig, SIGABRT); sigaddset(sig, SIGALRM); sigaddset(sig, SIGPIPE); // Note: older versions of the linux pthreads library use SIGUSR1 and SIGUSR2 // internally to stop and start threads that are blocking, the newer ones // implement this through the kernel's real time signals // since SIGSTOP/CONT can handle suspend()/resume() on Linux // block them // #if defined(PEGASUS_PLATFORM_LINUX_IX86_GNU) // sigaddset(sig, SIGUSR1); // sigaddset(sig, SIGUSR2); // #endif #ifndef PEGASUS_PLATFORM_ZOS_ZSERIES_IBM pthread_sigmask(SIG_BLOCK, sig, NULL); #else sigprocmask(SIG_BLOCK, sig, NULL); #endif return sig; } Thread::Thread(ThreadReturnType(PEGASUS_THREAD_CDECL * start) (void *), void *parameter, Boolean detached):_is_detached(detached), _cancel_enabled(true), _cancelled(false), _start(start), _cleanup(), _tsd(), _thread_parm(parameter), _exit_code(0) { Threads::clear(_handle.thid); } Thread::~Thread() { try { join(); empty_tsd(); } catch(...) { // Do not allow the destructor to throw an exception } } #endif /* PEGASUS_HAVE_PTHREADS */ //============================================================================== // // Windows Threads Implementation: // //============================================================================== #if defined(PEGASUS_HAVE_WINDOWS_THREADS) ThreadStatus Thread::run(void) { // Note: A Win32 thread ID is not the same thing as a pthread ID. // Win32 threads have both a thread ID and a handle. The handle // is used in the wait functions, etc. // So _handle.thid is actually the thread handle. unsigned threadid = 0; ThreadType tt; tt.handle = (HANDLE) _beginthreadex(NULL, 0, _start, this, 0, &threadid); _handle.thid = tt; if (Threads::id(_handle.thid) == 0) { if (errno == EAGAIN) { return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; } else { return PEGASUS_THREAD_SETUP_FAILURE; } } return PEGASUS_THREAD_OK; } void Thread::cancel(void) { _cancelled = true; } void Thread::test_cancel(void) { if (_cancel_enabled && _cancelled) { exit_self(0); } } Boolean Thread::is_cancelled(void) { return _cancelled; } void Thread::thread_switch(void) { Sleep(0); } void Thread::sleep(Uint32 milliseconds) { Sleep(milliseconds); } void Thread::join(void) { if (Threads::id(_handle.thid) != 0) { if (!_is_detached) { if (!_cancelled) { // Emulate the unix join api. Caller sleeps until thread is // done. WaitForSingleObject(_handle.thid.handle, INFINITE); } else { // Currently this is the only way to ensure this code does // not // hang forever. if (WaitForSingleObject(_handle.thid.handle, 10000) == WAIT_TIMEOUT) { TerminateThread(_handle.thid.handle, 0); } } DWORD exit_code = 0; GetExitCodeThread(_handle.thid.handle, &exit_code); _exit_code = (ThreadReturnType) exit_code; } CloseHandle(_handle.thid.handle); Threads::clear(_handle.thid); } } void Thread::thread_init(void) { _cancel_enabled = true; } void Thread::detach(void) { _is_detached = true; } Thread::Thread(ThreadReturnType(PEGASUS_THREAD_CDECL * start) (void *), void *parameter, Boolean detached):_is_detached(detached), _cancel_enabled(true), _cancelled(false), _start(start), _cleanup(), _tsd(), _thread_parm(parameter), _exit_code(0) { Threads::clear(_handle.thid); } Thread::~Thread() { try { join(); empty_tsd(); } catch(...) { } } #endif /* PEGASUS_HAVE_WINDOWS_THREADS */ //============================================================================== // // Common implementation: // //============================================================================== void thread_data::default_delete(void *data) { if (data != NULL) ::operator delete(data); } void language_delete(void *data) { if (data != NULL) { AutoPtr < AcceptLanguageList > al(static_cast < AcceptLanguageList * >(data)); } } Boolean Thread::_signals_blocked = false; #ifndef PEGASUS_OS_ZOS TSDKeyType Thread::_platform_thread_key = TSDKeyType(-1); #else TSDKeyType Thread::_platform_thread_key; #endif Boolean Thread::_key_initialized = false; Boolean Thread::_key_error = false; void Thread::cleanup_push(void (*routine) (void *), void *parm) { AutoPtr < cleanup_handler > cu(new cleanup_handler(routine, parm)); _cleanup.insert_front(cu.get()); cu.release(); return; } void Thread::cleanup_pop(Boolean execute) { AutoPtr < cleanup_handler > cu; try { cu.reset(_cleanup.remove_front()); } catch(IPCException &) { PEGASUS_ASSERT(0); } if (execute == true) cu->execute(); } //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) void Thread::exit_self(ThreadReturnType exit_code) { #if defined(PEGASUS_PLATFORM_HPUX_ACC) || \ defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU) // NOTE: pthread_exit exhibits unusual behavior on RHEL 3 U2, as // documented in Bugzilla 3836. Where feasible, it may be advantageous // to avoid using this function. pthread_exit(exit_code); #else // execute the cleanup stack and then return while (_cleanup.size()) { try { cleanup_pop(true); } catch(IPCException &) { PEGASUS_ASSERT(0); break; } } _exit_code = exit_code; Threads::exit(exit_code); Threads::clear(_handle.thid); #endif } Sint8 Thread::initializeKey() { PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey"); if (!Thread::_key_initialized) { if (Thread::_key_error) { Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Thread: ERROR - thread key error"); return -1; } if (TSDKey::create(&Thread::_platform_thread_key) == 0) { Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Thread: able to create a thread key"); Thread::_key_initialized = true; } else { Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Thread: ERROR - unable to create a thread key"); Thread::_key_error = true; return -1; } } PEG_METHOD_EXIT(); return 0; } Thread *Thread::getCurrent() { PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent"); if (Thread::initializeKey() != 0) { return NULL; } PEG_METHOD_EXIT(); return (Thread *) TSDKey::get_thread_specific(_platform_thread_key); } void Thread::setCurrent(Thread * thrd) { PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent"); if (Thread::initializeKey() == 0) { if (TSDKey:: set_thread_specific(Thread::_platform_thread_key, (void *) thrd) == 0) { Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Successful set Thread * into thread specific storage"); } else { Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ERROR: error setting Thread * into thread specific storage"); } } PEG_METHOD_EXIT(); } AcceptLanguageList *Thread::getLanguages() { PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages"); Thread *curThrd = Thread::getCurrent(); if (curThrd == NULL) return NULL; AcceptLanguageList *acceptLangs = (AcceptLanguageList *) curThrd->reference_tsd("acceptLanguages"); curThrd->dereference_tsd(); PEG_METHOD_EXIT(); return acceptLangs; } void Thread::setLanguages(AcceptLanguageList * langs) // l10n { PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages"); Thread *currentThrd = Thread::getCurrent(); if (currentThrd != NULL) { // deletes the old tsd and creates a new one currentThrd->put_tsd("acceptLanguages", language_delete, sizeof (AcceptLanguageList *), langs); } PEG_METHOD_EXIT(); } void Thread::clearLanguages() // l10n { PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages"); Thread *currentThrd = Thread::getCurrent(); if (currentThrd != NULL) { // deletes the old tsd currentThrd->delete_tsd("acceptLanguages"); } PEG_METHOD_EXIT(); } /////////////////////////////////////////////////////////////////////////////// // // ThreadPool // /////////////////////////////////////////////////////////////////////////////// ThreadPool::ThreadPool(Sint16 initialSize, const char *key, Sint16 minThreads, Sint16 maxThreads, struct timeval &deallocateWait):_maxThreads(maxThreads), _minThreads(minThreads), _currentThreads(0), _idleThreads(), _runningThreads(), _dying(0) { _deallocateWait.tv_sec = deallocateWait.tv_sec; _deallocateWait.tv_usec = deallocateWait.tv_usec; memset(_key, 0x00, 17); if (key != 0) { strncpy(_key, key, 16); } if ((_maxThreads > 0) && (_maxThreads < initialSize)) { _maxThreads = initialSize; } if (_minThreads > initialSize) { _minThreads = initialSize; } for (int i = 0; i < initialSize; i++) { _addToIdleThreadsQueue(_initializeThread()); } } ThreadPool::~ThreadPool() { PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool"); try { // Set the dying flag so all thread know the destructor has been // entered _dying++; Tracer::trace(TRC_THREAD, Tracer::LEVEL2, "Cleaning up %d idle threads. ", _currentThreads.get()); while (_currentThreads.get() > 0) { Thread *thread = _idleThreads.remove_front(); if (thread != 0) { _cleanupThread(thread); _currentThreads--; } else { Threads::yield(); } } } catch(...) { } } ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm) { PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop"); try { Thread *myself = (Thread *) parm; PEGASUS_ASSERT(myself != 0); // Set myself into thread specific storage // This will allow code to get its own Thread Thread::setCurrent(myself); ThreadPool *pool = (ThreadPool *) myself->get_parm(); PEGASUS_ASSERT(pool != 0); Semaphore *sleep_sem = 0; struct timeval *lastActivityTime = 0; try { sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem"); myself->dereference_tsd(); PEGASUS_ASSERT(sleep_sem != 0); lastActivityTime = (struct timeval *) myself-> reference_tsd("last activity time"); myself->dereference_tsd(); PEGASUS_ASSERT(lastActivityTime != 0); } catch(...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_loop: Failure getting sleep_sem or " "lastActivityTime."); PEGASUS_ASSERT(false); pool->_idleThreads.remove(myself); pool->_currentThreads--; PEG_METHOD_EXIT(); return ((ThreadReturnType) 1); } while (1) { try { sleep_sem->wait(); } catch(...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_loop: failure on sleep_sem->wait()."); PEGASUS_ASSERT(false); pool->_idleThreads.remove(myself); pool->_currentThreads--; PEG_METHOD_EXIT(); return ((ThreadReturnType) 1); } // When we awaken we reside on the _runningThreads queue, not the // _idleThreads queue. ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0; void *parm = 0; Semaphore *blocking_sem = 0; try { work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)) myself->reference_tsd("work func"); myself->dereference_tsd(); parm = myself->reference_tsd("work parm"); myself->dereference_tsd(); blocking_sem = (Semaphore *) myself->reference_tsd("blocking sem"); myself->dereference_tsd(); } catch(...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_loop: Failure accessing work func, work parm, " "or blocking sem."); PEGASUS_ASSERT(false); pool->_idleThreads.remove(myself); pool->_currentThreads--; PEG_METHOD_EXIT(); return ((ThreadReturnType) 1); } if (work == 0) { Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool::_loop: work func is 0, meaning we should exit."); break; } Time::gettimeofday(lastActivityTime); try { PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting."); work(parm); PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished."); } catch(Exception & e) { PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, String ("Exception from work in ThreadPool::_loop: ") + e.getMessage()); } #if !defined(PEGASUS_OS_LSB) catch(const exception & e) { PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, String ("Exception from work in ThreadPool::_loop: ") + e.what()); } #endif catch(...) { PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Unknown exception from work in ThreadPool::_loop."); } // put myself back onto the available list try { Time::gettimeofday(lastActivityTime); if (blocking_sem != 0) { blocking_sem->signal(); } pool->_runningThreads.remove(myself); pool->_idleThreads.insert_front(myself); } catch(...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_loop: Adding thread to idle pool failed."); PEGASUS_ASSERT(false); pool->_currentThreads--; PEG_METHOD_EXIT(); return ((ThreadReturnType) 1); } } } catch(const Exception & e) { PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Caught exception: \"" + e.getMessage() + "\". Exiting _loop."); } catch(...) { PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Caught unrecognized exception. Exiting _loop."); } PEG_METHOD_EXIT(); return ((ThreadReturnType) 0); } ThreadStatus ThreadPool::allocate_and_awaken(void *parm, ThreadReturnType (PEGASUS_THREAD_CDECL * work) (void *), Semaphore * blocking) { PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken"); // Allocate_and_awaken will not run if the _dying flag is set. // Once the lock is acquired, ~ThreadPool will not change // the value of _dying until the lock is released. try { if (_dying.get()) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); return PEGASUS_THREAD_UNAVAILABLE; } struct timeval start; Time::gettimeofday(&start); Thread *th = 0; th = _idleThreads.remove_front(); if (th == 0) { if ((_maxThreads == 0) || (_currentThreads.get() < Uint32(_maxThreads))) { th = _initializeThread(); } } if (th == 0) { // ATTN-DME-P3-20031103: This trace message should not be // be labeled TRC_DISCARDED_DATA, because it does not // necessarily imply that a failure has occurred. However, // this label is being used temporarily to help isolate // the cause of client timeout problems. Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::allocate_and_awaken: Insufficient resources: " " pool = %s, running threads = %d, idle threads = %d", _key, _runningThreads.size(), _idleThreads.size()); return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; } // initialize the thread data with the work function and parameters Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Initializing thread with work function and parameters: parm = %p", parm); th->delete_tsd("work func"); th->put_tsd("work func", NULL, sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)), (void *) work); th->delete_tsd("work parm"); th->put_tsd("work parm", NULL, sizeof (void *), parm); th->delete_tsd("blocking sem"); if (blocking != 0) th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking); // put the thread on the running list _runningThreads.insert_front(th); // signal the thread's sleep semaphore to awaken it Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem"); PEGASUS_ASSERT(sleep_sem != 0); Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken"); sleep_sem->signal(); th->dereference_tsd(); } catch(...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::allocate_and_awaken: Operation Failed."); PEG_METHOD_EXIT(); // ATTN: Error result has not yet been defined return PEGASUS_THREAD_SETUP_FAILURE; } PEG_METHOD_EXIT(); return PEGASUS_THREAD_OK; } // caller is responsible for only calling this routine during slack periods // but should call it at least once per _deallocateWait interval. Uint32 ThreadPool::cleanupIdleThreads() { PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads"); Uint32 numThreadsCleanedUp = 0; Uint32 numIdleThreads = _idleThreads.size(); for (Uint32 i = 0; i < numIdleThreads; i++) { // Do not dip below the minimum thread count if (_currentThreads.get() <= (Uint32) _minThreads) { break; } Thread *thread = _idleThreads.remove_back(); // If there are no more threads in the _idleThreads queue, we're // done. if (thread == 0) { break; } struct timeval *lastActivityTime; try { lastActivityTime = (struct timeval *) thread-> try_reference_tsd("last activity time"); PEGASUS_ASSERT(lastActivityTime != 0); } catch(...) { PEGASUS_ASSERT(false); _idleThreads.insert_back(thread); break; } Boolean cleanupThisThread = _timeIntervalExpired(lastActivityTime, &_deallocateWait); thread->dereference_tsd(); if (cleanupThisThread) { _cleanupThread(thread); _currentThreads--; numThreadsCleanedUp++; } else { _idleThreads.insert_front(thread); } } PEG_METHOD_EXIT(); return numThreadsCleanedUp; } void ThreadPool::_cleanupThread(Thread * thread) { PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread"); // Set the "work func" and "work parm" to 0 so _loop() knows to exit. thread->delete_tsd("work func"); thread->put_tsd("work func", 0, sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)), (void *) 0); thread->delete_tsd("work parm"); thread->put_tsd("work parm", 0, sizeof (void *), 0); // signal the thread's sleep semaphore to awaken it Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem"); PEGASUS_ASSERT(sleep_sem != 0); sleep_sem->signal(); thread->dereference_tsd(); thread->join(); delete thread; PEG_METHOD_EXIT(); } Boolean ThreadPool::_timeIntervalExpired(struct timeval *start, struct timeval *interval) { // never time out if the interval is zero if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0)) { return false; } struct timeval now, finish, remaining; Uint32 usec; Time::gettimeofday(&now); Time::gettimeofday(&remaining); // Avoid valgrind error finish.tv_sec = start->tv_sec + interval->tv_sec; usec = start->tv_usec + interval->tv_usec; finish.tv_sec += (usec / 1000000); usec %= 1000000; finish.tv_usec = usec; return (Time::subtract(&remaining, &finish, &now) != 0); } void ThreadPool::_deleteSemaphore(void *p) { delete(Semaphore *) p; } Thread *ThreadPool::_initializeThread() { PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread"); Thread *th = (Thread *) new Thread(_loop, this, false); // allocate a sleep semaphore and pass it in the thread context // initial count is zero, loop function will sleep until // we signal the semaphore Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); th->put_tsd("sleep sem", &_deleteSemaphore, sizeof (Semaphore), (void *) sleep_sem); struct timeval *lastActivityTime = (struct timeval *)::operator new(sizeof (struct timeval)); Time::gettimeofday(lastActivityTime); th->put_tsd("last activity time", thread_data::default_delete, sizeof (struct timeval), (void *) lastActivityTime); // thread will enter _loop() and sleep on sleep_sem until we signal it if (th->run() != PEGASUS_THREAD_OK) { Tracer::trace(TRC_THREAD, Tracer::LEVEL2, "Could not create thread. Error code is %d.", errno); delete th; return 0; } _currentThreads++; Threads::yield(); PEG_METHOD_EXIT(); return th; } void ThreadPool::_addToIdleThreadsQueue(Thread * th) { if (th == 0) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null."); throw NullPointer(); } try { _idleThreads.insert_front(th); } catch(...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front " "failed."); } } // ATTN: not sure where to put this! #ifdef PEGASUS_ZOS_SECURITY bool isEnhancedSecurity = 99; #endif PEGASUS_NAMESPACE_END