(file) Return to Thread.h CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

File: [Pegasus] / pegasus / src / Pegasus / Common / Thread.h (download)
Revision: 1.57.2.3, Sat Jul 29 00:03:42 2006 UTC (17 years, 11 months ago) by mike
Branch: TASK_BUG_5314_IPC_REFACTORING_BRANCH
Changes since 1.57.2.2: +230 -8 lines
BUG#: 5314
TITLE: IPC Refactoring

DESCRIPTION: IPC Refactoring Branch Work

//%2006////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
// IBM Corp.; EMC Corporation, The Open Group.
// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
// EMC Corporation; VERITAS Software Corporation; The Open Group.
// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
// EMC Corporation; Symantec Corporation; The Open Group.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// 
// THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
//==============================================================================
//
// Author: Mike Day (mdday@us.ibm.com)
//
// Modified By: Markus Mueller
//              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
//              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
//              David Dillard, VERITAS Software Corp.
//                  (david.dillard@veritas.com)
//              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
//              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#2393
//
//%/////////////////////////////////////////////////////////////////////////////

#ifndef Pegasus_Thread_h
#define Pegasus_Thread_h

#include <cstring>
#include <Pegasus/Common/Config.h>
#include <Pegasus/Common/AtomicInt.h>
#include <Pegasus/Common/InternalException.h>
#include <Pegasus/Common/AcceptLanguageList.h>
#include <Pegasus/Common/Linkage.h>
#include <Pegasus/Common/AutoPtr.h>
#include <Pegasus/Common/List.h>
#include <Pegasus/Common/Mutex.h>
#include <Pegasus/Common/Semaphore.h>
#include <Pegasus/Common/TSDKey.h>
#include <Pegasus/Common/Threads.h>

#if defined(PEGASUS_HAVE_PTHREADS)
# include <signal.h>
#endif

PEGASUS_NAMESPACE_BEGIN

class PEGASUS_COMMON_LINKAGE cleanup_handler : public Linkable
{

   public:
      cleanup_handler( void (*routine)(void *), void *arg  ) : _routine(routine), _arg(arg)  {}
      ~cleanup_handler()  {; }

   private:
      void execute() { _routine(_arg); }
      cleanup_handler();
      void (*_routine)(void *);

      void *_arg;
      friend class Thread;
};

///////////////////////////////////////////////////////////////////////////////


class  PEGASUS_COMMON_LINKAGE thread_data : public Linkable
{

   public:
      static void default_delete(void *data);

      thread_data( const char *key ) : _delete_func(NULL) , _data(NULL), _size(0)
      {
         PEGASUS_ASSERT(key != NULL);
         size_t keysize = strlen(key);
         _key.reset(new char[keysize + 1]);
         memcpy(_key.get(), key, keysize);
         _key.get()[keysize] = 0x00;

      }

      thread_data(const char *key, size_t size) : _delete_func(default_delete), _size(size)
      {
         PEGASUS_ASSERT(key != NULL);
         size_t keysize = strlen(key);
         _key.reset(new char[keysize + 1]);
         memcpy(_key.get(), key, keysize);
         _key.get()[keysize] = 0x00;
         _data = ::operator new(_size);

      }

      thread_data(const char *key, size_t size, void *data) : _delete_func(default_delete), _size(size)
      {
         PEGASUS_ASSERT(key != NULL);
         PEGASUS_ASSERT(data != NULL);
         size_t keysize = strlen(key);

         _key.reset(new char[keysize + 1]);
         memcpy(_key.get(), key, keysize);
         _key.get()[keysize] = 0x00;
         _data = ::operator new(_size);
         memcpy(_data, data, size);
      }

      ~thread_data()
      {
         if( _data != NULL)
            if(_delete_func != NULL)
            {
               _delete_func( _data );
            }
      }

      /**
       * This function is used to put data in thread space.
       *
       * Be aware that there is NOTHING in place to stop
       * other users of the thread to remove this data.
       * Or change the data.
       *
       * You, the developer has to make sure that there are
       * no situations in which this can arise (ie, have a
       * lock for the function which manipulates the TSD.
       *
       * @exception NullPointer
       */
      void put_data(void (*del)(void *), size_t size, void *data )
      {
         if(_data != NULL)
            if(_delete_func != NULL)
               _delete_func(_data);

         _delete_func = del;
         _data = data;
         _size = size;
         return;
      }

      size_t get_size() { return _size; }

      /**
       * This function is used to retrieve data from the
       * TSD, the thread specific data.
       *
       * Be aware that there is NOTHING in place to stop
       * other users of the thread to change the data you
       * get from this function.
       *
       * You, the developer has to make sure that there are
       * no situations in which this can arise (ie, have a
       * lock for the function which manipulates the TSD.
       */
      void get_data(void **data, size_t *size)
      {
         if(data == NULL || size == NULL)
            throw NullPointer();

         *data = _data;
         *size = _size;
         return;

      }

      // @exception NullPointer
      void copy_data(void **buf, size_t *size)
      {
         if((buf == NULL) || (size == NULL))
            throw NullPointer();
         *buf = ::operator new(_size);
         *size = _size;
         memcpy(*buf, _data, _size);
         return;
      }

      inline Boolean operator==(const void *key) const
      {
         if ( ! strcmp(_key.get(), reinterpret_cast<const char *>(key)))
            return(true);
         return(false);
      }

      inline Boolean operator==(const thread_data& b) const
      {
         return(operator==(b._key.get()));
      }

      static bool equal(const thread_data* node, const void* key)
      {
	 return ((thread_data*)node)->operator==(key);
      }

   private:
      void (*_delete_func) (void *data);
      thread_data();
      void *_data;
      size_t _size;
      AutoArrayPtr<char> _key;

      friend class Thread;
};


enum ThreadStatus { 
	PEGASUS_THREAD_OK = 1, /* No problems */
	PEGASUS_THREAD_INSUFFICIENT_RESOURCES, /* Can't allocate a thread. Not enough
				        memory. Try again later */
	PEGASUS_THREAD_SETUP_FAILURE, /* Could not allocate into the thread specific 
                                 data storage. */
	PEGASUS_THREAD_UNAVAILABLE  /* Service is being destroyed and no new threads can
                               be provided. */ 
};

///////////////////////////////////////////////////////////////////////////

class PEGASUS_COMMON_LINKAGE Thread : public Linkable
{
   public:

      Thread( ThreadReturnType (PEGASUS_THREAD_CDECL *start )(void *),
              void *parameter, Boolean detached );

      ~Thread();

      /**
          Start the thread.
          @return PEGASUS_THREAD_OK if the thread is started successfully, 
                  PEGASUS_THREAD_INSUFFICIENT_RESOURCES if the resources necessary 
                  to start the thread are not currently available.  
	          PEGASUS_THREAD_SETUP_FAILURE if the thread could not
                  be create properly - check the 'errno' value for specific operating
                  system return code.
       */
      ThreadStatus run();

      // get the user parameter
      inline void *get_parm() { return _thread_parm; }

      // cancellation must be deferred (not asynchronous)
      // for user-level threads the thread itself can decide
      // when it should die.
      void cancel();

      // cancel if there is a pending cancellation request
      void test_cancel();

      Boolean is_cancelled();

      // for user-level threads  - put the calling thread
      // to sleep and jump to the thread scheduler.
      // platforms with preemptive scheduling and native threads
      // can define this to be a no-op.
      // platforms without preemptive scheduling like NetWare
      // or gnu portable threads will have an existing
      // routine that can be mapped to this method

      void thread_switch();

#if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
      // suspend this thread
      void suspend();

      // resume this thread
      void resume();
#endif

      static void sleep(Uint32 msec);

      // block the calling thread until this thread terminates
      void join();
      void thread_init();

      // thread routine needs to call this function when
      // it is ready to exit
      void exit_self(ThreadReturnType return_code);

      // stack of functions to be called when thread terminates
      // will be called last in first out (LIFO)
      // @exception IPCException
      void cleanup_push(void (*routine) (void *), void *parm);

      // @exception IPCException
      void cleanup_pop(Boolean execute = true);

      // create and initialize a tsd
      // @exception IPCException
      inline void create_tsd(const char *key, int size, void *buffer)
      {
        AutoPtr<thread_data> tsd(new thread_data(key, size, buffer));
        _tsd.insert_front(tsd.get());
        tsd.release();
      }

      // get the buffer associated with the key
      // NOTE: this call leaves the tsd LOCKED !!!!
      // @exception IPCException
      inline void *reference_tsd(const char *key)
      {
         _tsd.lock();
         thread_data *tsd = _tsd.find(thread_data::equal, key);
         if(tsd != NULL)
            return( (void *)(tsd->_data) );
         else
            return(NULL);
      }

      // @exception IPCException
      inline void *try_reference_tsd(const char *key)
      {
         _tsd.try_lock();
         thread_data *tsd = _tsd.find(thread_data::equal, key);
         if(tsd != NULL)
            return((void *)(tsd->_data) );
         else
            return(NULL);
      }


      // release the lock held on the tsd
      // NOTE: assumes a corresponding and prior call to reference_tsd() !!!
      // @exception IPCException
      inline void dereference_tsd()
      {
         _tsd.unlock();
      }

      // delete the tsd associated with the key
      // @exception IPCException
      inline void delete_tsd(const char *key)
      {
         AutoPtr<thread_data> tsd(_tsd.remove(thread_data::equal, key));
      }

      // @exception IPCException
      inline void empty_tsd()
      {
	 _tsd.clear();
      }

      // create or re-initialize tsd associated with the key
      // if the tsd already exists, delete the existing buffer
      // @exception IPCException
      void put_tsd(const char *key, void (*delete_func)(void *), Uint32 size, void *value)
      {
         PEGASUS_ASSERT(key != NULL);
         AutoPtr<thread_data> tsd;
         tsd.reset(_tsd.remove(thread_data::equal, key));  // may throw an IPC exception
         tsd.reset();
         AutoPtr<thread_data> ntsd(new thread_data(key));
         ntsd->put_data(delete_func, size, value);
         try { _tsd.insert_front(ntsd.get()); }
         catch(IPCException& e) { e = e; throw; }
         ntsd.release();
      }
      inline ThreadReturnType get_exit() { return _exit_code; }
      inline ThreadType self() {return Threads::self(); }

      ThreadHandle getThreadHandle() {return _handle;}

      void detach();

      //
      //  Gets the Thread object associated with the caller's thread.
      //  Note: this may return NULL if no Thread object is associated
      //  with the caller's thread.
      //
      static Thread * getCurrent();  // l10n

      //
      //  Sets the Thread object associated with the caller's thread.
      //  Note: the Thread object must be placed on the heap.
      //
      static void setCurrent(Thread * thrd); // l10n

      //
      //  Gets the AcceptLanguageList object associated with the caller's
      //  Thread.
      //  Note: this may return NULL if no Thread object, or no
      //  AcceptLanguageList object, is associated with the caller's thread.
      //
      static AcceptLanguageList * getLanguages(); //l10n

      //
      //  Sets the AcceptLanguageList object associated with the caller's
      //  Thread.
      //  Note: a Thread object must have been previously associated with
      //  the caller's thread.
      //  Note: the AcceptLanguageList object must be placed on the heap.
      //
      static void setLanguages(AcceptLanguageList *langs); //l10n

      //
      //  Removes the AcceptLanguageList object associated with the caller's
      //  Thread.
      //
      static void clearLanguages(); //l10n

   private:
      Thread();

      static Sint8 initializeKey();  // l10n

      // @exception IPCException
      inline void create_tsd(const char *key )
      {
         AutoPtr<thread_data> tsd(new thread_data(key));
         _tsd.insert_front(tsd.get());
         tsd.release();
      }
      ThreadHandle _handle;
      Boolean _is_detached;
      Boolean _cancel_enabled;
      Boolean _cancelled;

      // always pass this * as the void * parameter to the thread
      // store the user parameter in _thread_parm

      ThreadReturnType  ( PEGASUS_THREAD_CDECL *_start)(void *);
      List<cleanup_handler, Mutex> _cleanup;
      List<thread_data, Mutex> _tsd;

      void *_thread_parm;
      ThreadReturnType _exit_code;
      static Boolean _signals_blocked;
      static TSDKeyType _platform_thread_key;  //l10n
      static Boolean _key_initialized; // l10n
      static Boolean _key_error; // l10n
};


class PEGASUS_COMMON_LINKAGE ThreadPool
{
public:

    /**
        Constructs a new ThreadPool object.
        @param initialSize The number of threads that are initially added to
            the thread pool.
        @param key A name for this thread pool that can be used to determine
            equality of two thread pool objects.  Only the first 16 characters
            of this value are used.
        @param minThreads The minimum number of threads that should be
            contained in this thread pool at any given time.
        @param maxThreads The maximum number of threads that should be
            contained in this thread pool at any given time.
        @param deallocateWait The minimum time that a thread should be idle
            before it is removed from the pool and cleaned up.
     */
    ThreadPool(
        Sint16 initialSize,
        const char* key,
        Sint16 minThreads,
        Sint16 maxThreads,
        struct timeval& deallocateWait);

    /**
        Destructs the ThreadPool object.
     */
    ~ThreadPool();

    /**
        Allocate and start a thread to do a unit of work.
        @param parm A generic parameter to pass to the thread
        @param work A pointer to the function that is to be executed by
                    the thread
        @param blocking A pointer to an optional semaphore which, if
                        specified, is signaled after the thread finishes
                        executing the work function
        @return PEGASUS_THREAD_OK if the thread is started successfully, 
		PEGASUS_THREAD_INSUFFICIENT_RESOURCES  if the
                resources necessary to start the thread are not currently
                available.  PEGASUS_THREAD_SETUP_FAILURE if the thread
                could not be setup properly. PEGASUS_THREAD_UNAVAILABLE
                if this service is shutting down and no more threads can
                be allocated.
        @exception IPCException
     */
    ThreadStatus allocate_and_awaken(
        void* parm,
        ThreadReturnType (PEGASUS_THREAD_CDECL* work)(void *),
        Semaphore* blocking = 0);

    /**
        Cleans up idle threads if they have been running longer than the
        deallocate_wait configuration and more than the configured
        minimum number of threads is running.
        @return The number of threads that were cleaned up.
        @exception IPCException
     */
    Uint32 cleanupIdleThreads();

    void get_key(Sint8* buf, int bufsize);

    inline void setMinThreads(Sint16 min)
    {
        _minThreads = min;
    }

    inline Sint16 getMinThreads() const
    {
        return _minThreads;
    }

    inline void setMaxThreads(Sint16 max)
    {
        _maxThreads = max;
    }

    inline Sint16 getMaxThreads() const
    {
        return _maxThreads;
    }

    inline Uint32 runningCount()
    {
        return _runningThreads.size();
    }

    inline Uint32 idleCount()
    {
        return _idleThreads.size();
    }

private:

    ThreadPool();    // Unimplemented
    ThreadPool(const ThreadPool&);    // Unimplemented
    ThreadPool& operator=(const ThreadPool&);    // Unimplemented

    static ThreadReturnType PEGASUS_THREAD_CDECL _loop(void *);

    static Boolean _timeIntervalExpired(
        struct timeval* start,
        struct timeval* interval);

    static void _deleteSemaphore(void* p);

    void _cleanupThread(Thread* thread);
    Thread* _initializeThread();
    void _addToIdleThreadsQueue(Thread* th);

    Sint16 _maxThreads;
    Sint16 _minThreads;
    AtomicInt _currentThreads;
    struct timeval _deallocateWait;
    char _key[17];
    List<Thread, Mutex> _idleThreads;
    List<Thread, Mutex> _runningThreads;
    AtomicInt _dying;
};

//==============================================================================
//
// POSIX Threads Implementation:
//
//==============================================================================

#if defined(PEGASUS_HAVE_PTHREADS)

struct StartWrapperArg 
{
    void* (PEGASUS_THREAD_CDECL* start)(void*);
    void* arg;
};

extern "C" void* _start_wrapper(void* arg);

inline ThreadStatus Thread::run()
{
    StartWrapperArg* arg = new StartWrapperArg;
    arg->start = _start;
    arg->arg = this;

    Threads::Type type = _is_detached ? Threads::DETACHED : Threads::JOINABLE;
    int rc = Threads::create(_handle.thid, type, _start_wrapper, arg);

    // On Linux distributions released prior 2005, the implementation of 
    // Native POSIX Thread Library returns ENOMEM instead of EAGAIN when there 
    // are no insufficient memory.  Hence we are checking for both.  See bug
    // 386.

    if ((rc == EAGAIN) || (rc == ENOMEM))
    {
        Threads::clear(_handle.thid);
        delete arg;
        return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
    }
    else if (rc != 0)
    {
        Threads::clear(_handle.thid);
        delete arg;
	return PEGASUS_THREAD_SETUP_FAILURE;
    }
    return PEGASUS_THREAD_OK;
}

inline void Thread::cancel()
{
    _cancelled = true;
    pthread_cancel(_handle.thid.tt_handle());
}

inline void Thread::test_cancel()
{
#if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
    pthread_testintr();
#else
    pthread_testcancel();
#endif
}

inline Boolean Thread::is_cancelled(void)
{
   return _cancelled;
}

inline void Thread::thread_switch()
{
#if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
    pthread_yield(NULL);
#else
    sched_yield();
#endif
}

/*
ATTN: why are these missing on other platforms?
*/
#if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
inline void Thread::suspend()
{
    pthread_kill(_handle.thid.tt_handle(),SIGSTOP);
}

inline void Thread::resume()
{
    pthread_kill(_handle.thid.tt_handle(),SIGCONT);
}
#endif

inline void Thread::sleep(Uint32 msec)
{
    Threads::sleep(msec);
}

inline void Thread::join(void) 
{ 
    if (!_is_detached && Threads::id(_handle.thid) != 0)
        pthread_join(_handle.thid.tt_handle(), &_exit_code); 

    Threads::clear(_handle.thid);
}

inline void Thread::thread_init(void)
{
#if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
    pthread_setintr(PTHREAD_INTR_ENABLE);
    pthread_setintrtype(PTHREAD_INTR_ASYNCHRONOUS);
#else
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
#endif
    _cancel_enabled = true;
}

inline void Thread::detach(void)
{
   _is_detached = true;
   pthread_detach(_handle.thid.tt_handle());
}

#endif /* PEGASUS_HAVE_PTHREADS */

//==============================================================================
//
// Windows Threads Implementation:
//
//==============================================================================

#if defined(PEGASUS_HAVE_WINDOWS_THREADS)

inline ThreadStatus Thread::run(void)
{
    // Note: A Win32 thread ID is not the same thing as a pthread ID.
    // Win32 threads have both a thread ID and a handle.  The handle
    // is used in the wait functions, etc.
    // So _handle.thid is actually the thread handle.

    unsigned threadid = 0;
    
    ThreadType tt;
    tt.handle = (HANDLE)_beginthreadex(NULL, 0, _start, this, 0, &threadid);
    _handle.thid = tt;

    if (Threads::id(_handle.thid) == 0)
    {
        if (errno == EAGAIN)
        {
            return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
        }
        else
        {
            return PEGASUS_THREAD_SETUP_FAILURE;
        }
    }
    return PEGASUS_THREAD_OK;
}

inline void Thread::cancel(void)
{
	_cancelled = true;
}

inline void Thread::test_cancel(void)
{
	if( _cancel_enabled && _cancelled )
	{
		exit_self( 0 );
	}
}

inline Boolean Thread::is_cancelled(void)
{
	return _cancelled;
}

inline void Thread::thread_switch(void)
{
	Sleep( 0 );
}

inline void Thread::sleep( Uint32 milliseconds )
{
	Sleep( milliseconds );
}

inline void Thread::join(void)
{
	if( Threads::id(_handle.thid) != 0 )
	{
		if( !_is_detached )
		{
			if( !_cancelled )
			{
				// Emulate the unix join api. Caller sleeps until thread is done.
				WaitForSingleObject( _handle.thid.handle, INFINITE );
			}
			else
			{
				// Currently this is the only way to ensure this code does not 
				// hang forever.
				if( WaitForSingleObject( _handle.thid.handle, 10000 ) == WAIT_TIMEOUT )
				{
					TerminateThread( _handle.thid.handle, 0 );
				}
			}

			DWORD exit_code = 0;
			GetExitCodeThread( _handle.thid.handle, &exit_code );
			_exit_code = (ThreadReturnType)exit_code;
		}

		CloseHandle( _handle.thid.handle );
		Threads::clear(_handle.thid);
	}
}

inline void Thread::thread_init(void)
{
	_cancel_enabled = true;
}

inline void Thread::detach(void)
{
	_is_detached = true;
}

#endif /* PEGASUS_HAVE_WINDOWS_THREADS */ 

PEGASUS_NAMESPACE_END

#endif // Pegasus_Thread_h

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2