(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.5 and 1.32

version 1.5, 2001/12/25 08:26:09 version 1.32, 2003/03/18 22:35:27
Line 1 
Line 1 
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM,  // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
 // Compaq Computer Corporation  // The Open Group, Tivoli Systems
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 30 
Line 30 
  
 #include "Thread.h" #include "Thread.h"
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
   #include <Pegasus/Common/Tracer.h>
  
 #if defined(PEGASUS_OS_TYPE_WINDOWS) #if defined(PEGASUS_OS_TYPE_WINDOWS)
 # include "ThreadWindows.cpp" # include "ThreadWindows.cpp"
Line 60 
Line 61 
     {     {
         _cleanup.insert_first(cu);         _cleanup.insert_first(cu);
     }     }
     catch(IPCException& e)      catch(IPCException&)
     {     {
         delete cu;         delete cu;
         throw;         throw;
Line 75 
Line 76 
     {     {
         cu = _cleanup.remove_first() ;         cu = _cleanup.remove_first() ;
     }     }
     catch(IPCException& e)      catch(IPCException&)
     {     {
         PEGASUS_ASSERT(0);         PEGASUS_ASSERT(0);
      }      }
Line 87 
Line 88 
 #endif #endif
  
  
 //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
  
  
 #ifndef PEGASUS_THREAD_EXIT_NATIVE #ifndef PEGASUS_THREAD_EXIT_NATIVE
Line 100 
Line 101 
        {        {
            cleanup_pop(true);            cleanup_pop(true);
        }        }
        catch(IPCException& e)         catch(IPCException&)
        {        {
           PEGASUS_ASSERT(0);           PEGASUS_ASSERT(0);
           break;           break;
Line 114 
Line 115 
  
 #endif #endif
  
   DQueue<ThreadPool> ThreadPool::_pools(true);
   
   
   void ThreadPool::kill_idle_threads(void)
   {
      static struct timeval now, last = {0, 0};
   
      pegasus_gettimeofday(&now);
      if(now.tv_sec - last.tv_sec > 5)
      {
         _pools.lock();
         ThreadPool *p = _pools.next(0);
         while(p != 0)
         {
            try
            {
               p->kill_dead_threads();
            }
            catch(...)
            {
            }
            p = _pools.next(p);
         }
         _pools.unlock();
         pegasus_gettimeofday(&last);
      }
   }
   
   
 ThreadPool::ThreadPool(Sint16 initial_size, ThreadPool::ThreadPool(Sint16 initial_size,
                        Sint8 *key,                         const Sint8 *key,
                        Sint16 min,                        Sint16 min,
                        Sint16 max,                        Sint16 max,
                        struct timeval & alloc_wait,                        struct timeval & alloc_wait,
                        struct timeval & dealloc_wait,                        struct timeval & dealloc_wait,
                        struct timeval & deadlock_detect)                        struct timeval & deadlock_detect)
    : _max_threads(max), _min_threads(min),    : _max_threads(max), _min_threads(min),
      _current_threads(0), _waiters(initial_size),       _current_threads(0),
      _pool_sem(0), _pool(true), _running(true),       _pool(true), _running(true),
      _dead(true), _dying(0)      _dead(true), _dying(0)
 { {
    _allocate_wait.tv_sec = alloc_wait.tv_sec;    _allocate_wait.tv_sec = alloc_wait.tv_sec;
Line 135 
Line 165 
    memset(_key, 0x00, 17);    memset(_key, 0x00, 17);
    if(key != 0)    if(key != 0)
       strncpy(_key, key, 16);       strncpy(_key, key, 16);
    if(_max_threads < initial_size)     if(_max_threads > 0 && _max_threads < initial_size)
       _max_threads = initial_size;       _max_threads = initial_size;
    if(_min_threads > initial_size)    if(_min_threads > initial_size)
       _min_threads = initial_size;       _min_threads = initial_size;
Line 145 
Line 175 
    {    {
       _link_pool(_init_thread());       _link_pool(_init_thread());
    }    }
      _pools.insert_last(this);
   
 } }
  
  
  
 ThreadPool::~ThreadPool(void) ThreadPool::~ThreadPool(void)
 { {
   
      _pools.remove(this);
    _dying++;    _dying++;
    Thread *th = _pool.remove_first();     Thread *th = 0;
      th = _pool.remove_first();
    while(th != 0)    while(th != 0)
    {    {
       Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");       Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
Line 195 
Line 230 
       delete th;       delete th;
       th = _dead.remove_first();       th = _dead.remove_first();
    }    }
   
 } }
  
 // make this static to the class // make this static to the class
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
 { {
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
   
   #if defined(PEGASUS_DEBUG)
      char trace_buf[24];
      snprintf(trace_buf, 23, "%d", (Uint32)pegasus_thread_self());
      Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool::_loop entered by %s", trace_buf);
   #endif
   
   
    Thread *myself = (Thread *)parm;    Thread *myself = (Thread *)parm;
    if(myself == 0)    if(myself == 0)
      {
         PEG_METHOD_EXIT();
       throw NullPointer();       throw NullPointer();
      }
    ThreadPool *pool = (ThreadPool *)myself->get_parm();    ThreadPool *pool = (ThreadPool *)myself->get_parm();
    if(pool == 0 )    if(pool == 0 )
      {
         PEG_METHOD_EXIT();
       throw NullPointer();       throw NullPointer();
      }
    Semaphore *sleep_sem = 0;    Semaphore *sleep_sem = 0;
      Semaphore *blocking_sem = 0;
   
    struct timeval *deadlock_timer = 0;    struct timeval *deadlock_timer = 0;
  
    try    try
Line 216 
Line 269 
       deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");       deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
       myself->dereference_tsd();       myself->dereference_tsd();
    }    }
    catch(IPCException & e)     catch(IPCException &)
    {    {
   #if defined(PEGASUS_DEBUG)
         Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                       "%s: IPCException Caught - EXITING", trace_buf);
   #endif
         PEG_METHOD_EXIT();
       myself->exit_self(0);       myself->exit_self(0);
    }    }
      catch(...)
      {
   #if defined(PEGASUS_DEBUG)
         Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                       "%s: Unknown  Exception Caught - EXITING", trace_buf);
   #endif
         PEG_METHOD_EXIT();
         myself->exit_self(0);
      }
   
    if(sleep_sem == 0 || deadlock_timer == 0)    if(sleep_sem == 0 || deadlock_timer == 0)
      {
   #if defined(PEGASUS_DEBUG)
         Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                       "%s: NULL Semaphore  - EXITING", trace_buf);
   #endif
         PEG_METHOD_EXIT();
       throw NullPointer();       throw NullPointer();
      }
  
    while(pool->_dying < 1)    while(pool->_dying < 1)
    {    {
   #if defined(PEGASUS_DEBUG)
         Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                       "%s: ThreadPool::_loop - waiting on semaphore", trace_buf);
   #endif
       sleep_sem->wait();       sleep_sem->wait();
   #if defined(PEGASUS_DEBUG)
         Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                       "%s: ThreadPool::_loop - awakened from semaphore", trace_buf);
   #endif
       // when we awaken we reside on the running queue, not the pool queue       // when we awaken we reside on the running queue, not the pool queue
       if(pool->_dying > 0)       if(pool->_dying > 0)
          break;          break;
Line 241 
Line 324 
          myself->dereference_tsd();          myself->dereference_tsd();
          parm = myself->reference_tsd("work parm");          parm = myself->reference_tsd("work parm");
          myself->dereference_tsd();          myself->dereference_tsd();
            blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
            myself->dereference_tsd();
   
       }       }
       catch(IPCException & e)        catch(IPCException &)
       {       {
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: IPCException Caught - EXITING", trace_buf);
   #endif
            PEG_METHOD_EXIT();
          myself->exit_self(0);          myself->exit_self(0);
       }       }
  
       if(_work == 0)       if(_work == 0)
         {
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: NULL work pointer - EXITING", trace_buf);
   #endif
            PEG_METHOD_EXIT();
          throw NullPointer();          throw NullPointer();
         }
   
         if(_work ==
            (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
         {
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: Calling the Undertaker", trace_buf);
   #endif
            _work(parm);
         }
   
       gettimeofday(deadlock_timer, NULL);       gettimeofday(deadlock_timer, NULL);
         try
         {
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: ThreadPool::_loop - calling work routine", trace_buf);
   #endif
       _work(parm);       _work(parm);
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: ThreadPool::_loop - returned from work routine", trace_buf);
   #endif
         }
         catch(...)
         {
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: Unknown  Exception Caught - EXITING");
   #endif
            gettimeofday(deadlock_timer, NULL);
         }
         gettimeofday(deadlock_timer, NULL);
         if( blocking_sem != 0 )
            blocking_sem->signal();
  
       // put myself back onto the available list       // put myself back onto the available list
       try       try
Line 258 
Line 389 
          pool->_running.remove((void *)myself);          pool->_running.remove((void *)myself);
          pool->_link_pool(myself);          pool->_link_pool(myself);
       }       }
       catch(IPCException & e)        catch(IPCException &)
       {       {
   #if defined(PEGASUS_DEBUG)
            Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
                          "%s: IPCException Caught - EXITING", trace_buf);
   #endif
            PEG_METHOD_EXIT();
          myself->exit_self(0);          myself->exit_self(0);
       }       }
    }    }
    // wait to be awakend by the thread pool destructor    // wait to be awakend by the thread pool destructor
    sleep_sem->wait();    sleep_sem->wait();
    myself->test_cancel();    myself->test_cancel();
   
      PEG_METHOD_EXIT();
    myself->exit_self(0);    myself->exit_self(0);
    return((PEGASUS_THREAD_RETURN)0);    return((PEGASUS_THREAD_RETURN)0);
 } }
  
   
 void ThreadPool::allocate_and_awaken(void *parm, void ThreadPool::allocate_and_awaken(void *parm,
                                      PEGASUS_THREAD_RETURN \                                      PEGASUS_THREAD_RETURN \
                                      (PEGASUS_THREAD_CDECL *work)(void *))                                       (PEGASUS_THREAD_CDECL *work)(void *),
                                        Semaphore *blocking)
   
    throw(IPCException)    throw(IPCException)
 { {
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
    struct timeval start;    struct timeval start;
    gettimeofday(&start, NULL);    gettimeofday(&start, NULL);
  
    Thread *th = _pool.remove_first();    Thread *th = _pool.remove_first();
  
    while (th == 0 && _dying < 1)  
    {  
       try  // we couldn't get a free thread from the pool  
       {  
          // wait for the right interval and try again          // wait for the right interval and try again
          while(th == 0 && _dying < 1)          while(th == 0 && _dying < 1)
          {          {
             _check_deadlock(&start);             _check_deadlock(&start);
             Uint32 interval = _allocate_wait.tv_sec * 1000;  
             if(_allocate_wait.tv_usec > 0)        if(_max_threads == 0 || _current_threads < _max_threads)
                interval += (_deallocate_wait.tv_usec / 1000);  
             // will throw a timeout if no thread comes free  
             _pool_sem.time_wait(interval);  
             th = _pool.remove_first();  
          }  
       }  
       catch(TimeOut & to)  
       {  
          if(_current_threads < _max_threads)  
          {          {
             th = _init_thread();             th = _init_thread();
             break;           continue;
          }          }
         pegasus_yield();
         th = _pool.remove_first();
       }       }
       // will throw a Deadlock Exception before falling out of the loop  
  
       _check_deadlock(&start);  
   
    } // while th == null  
  
    if(_dying < 1)    if(_dying < 1)
    {    {
       // initialize the thread data with the work function and parameters       // initialize the thread data with the work function and parameters
       th->remove_tsd("work func");        Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
             "Initializing thread with work function and parameters: parm = %p",
             parm);
   
         th->delete_tsd("work func");
       th->put_tsd("work func", NULL,       th->put_tsd("work func", NULL,
                   sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),                   sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
                   (void *)work);                   (void *)work);
       th->remove_tsd("work parm");        th->delete_tsd("work parm");
       th->put_tsd("work parm", NULL, sizeof(void *), parm);       th->put_tsd("work parm", NULL, sizeof(void *), parm);
         th->delete_tsd("blocking sem");
         if(blocking != 0 )
            th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
  
       // put the thread on the running list       // put the thread on the running list
       _running.insert_first(th);       _running.insert_first(th);
Line 330 
Line 462 
       if(sleep_sem == 0)       if(sleep_sem == 0)
       {       {
          th->dereference_tsd();          th->dereference_tsd();
            PEG_METHOD_EXIT();
          throw NullPointer();          throw NullPointer();
       }       }
         Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
       sleep_sem->signal();       sleep_sem->signal();
       th->dereference_tsd();       th->dereference_tsd();
    }    }
    else    else
       _pool.insert_first(th);       _pool.insert_first(th);
   
      PEG_METHOD_EXIT();
 } }
  
 // caller is responsible for only calling this routine during slack periods // caller is responsible for only calling this routine during slack periods
 // but should call it at least once per _deadlock_detect with the running q // but should call it at least once per _deadlock_detect with the running q
 // and at least once per _deallocate_wait for the pool q // and at least once per _deallocate_wait for the pool q
  
 void ThreadPool::kill_dead_threads(void)  Uint32 ThreadPool::kill_dead_threads(void)
          throw(IPCException)          throw(IPCException)
 { {
    struct timeval now;    struct timeval now;
    gettimeofday(&now, NULL);    gettimeofday(&now, NULL);
      Uint32 bodies = 0;
  
    // first go thread the dead q and clean it up as much as possible    // first go thread the dead q and clean it up as much as possible
    while(_dead.count() > 0)    while(_dead.count() > 0)
    {    {
   
   #if !defined(PEGASUS_PLATFORM_HPUX_ACC) && !defined(PEGASUS_PLATFORM_LINUX_IA64_GNU)
         PEGASUS_STD(cout) << "ThreadPool:: removing and joining dead thread" << PEGASUS_STD(endl);
   #endif
       Thread *dead = _dead.remove_first();       Thread *dead = _dead.remove_first();
       if(dead == 0)       if(dead == 0)
          throw NullPointer();          throw NullPointer();
       if(dead->_handle.thid != 0)        dead->join();
       {  
          dead->detach();  
          destroy_thread(dead->_handle.thid, 0);  
          dead->_handle.thid = 0;  
          while(dead->_cleanup.count() )  
          {  
             // this may throw a permission exception,  
             // which I will remove from the code prior to stabilizing  
             dead->cleanup_pop(true);  
          }  
       }  
       delete dead;       delete dead;
    }    }
  
Line 382 
Line 510 
    int i = 0;    int i = 0;
    AtomicInt needed(0);    AtomicInt needed(0);
  
    for( q = map[i] ; i < 2; i++, q = map[i])  #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
      // This change prevents the thread pool from killing "hung" threads.
      // The definition of a "hung" thread is one that has been on the run queue
      // for longer than the time interval set when the thread pool was created.
      // Cancelling "hung" threads has proven to be problematic.
   
      // With this change the thread pool will not cancel "hung" threads.  This
      // may prevent a crash depending upon the state of the "hung" thread.  In
      // the case that the thread is actually hung, this change causes the
      // thread resources not to be reclaimed.
   
      // Idle threads, those that have not executed a routine for a time
      // interval, continue to be destroyed.  This is normal and should not
      // cause any problems.
      for( ; i < 1; i++)
   #else
      for( ; i < 2; i++)
   #endif
    {    {
         q = map[i];
       if(q->count() > 0 )       if(q->count() > 0 )
       {       {
          try          try
          {          {
             q->try_lock();             q->try_lock();
          }          }
          catch(AlreadyLocked & a)           catch(...)
          {          {
             q++;              return bodies;
             continue;  
          }          }
  
          struct timeval dt = { 0, 0 };          struct timeval dt = { 0, 0 };
Line 406 
Line 551 
             {             {
                dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");                dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
             }             }
             catch(AlreadyLocked & a)              catch(...)
             {             {
                th = q->next(th);                 q->unlock();
                continue;                 return bodies;
             }             }
  
             if(dtp != 0)             if(dtp != 0)
             {             {
                memcpy(&dt, dtp, sizeof(struct timeval));                memcpy(&dt, dtp, sizeof(struct timeval));
   
             }             }
             th->dereference_tsd();             th->dereference_tsd();
             struct timeval deadlock_timeout;             struct timeval deadlock_timeout;
             if( true == check_time(&dt, get_deadlock_detect(&deadlock_timeout) ))              Boolean too_long;
               if( i == 0)
               {
                  too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
               }
               else
               {
                  too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
               }
   
               if( true == too_long)
             {             {
                // if we are deallocating from the pool, escape if we are                // if we are deallocating from the pool, escape if we are
                // down to the minimum thread count                // down to the minimum thread count
                if( _current_threads.value() <= (Uint32)_min_threads )                 _current_threads--;
                  if( _current_threads.value() < (Uint32)_min_threads )
                {                {
                   if( i == 1)                    if( i == 0)
                   {                   {
                        _current_threads++;
                      th = q->next(th);                      th = q->next(th);
                      continue;                      continue;
                   }                   }
Line 443 
Line 599 
  
                if(th != 0)                if(th != 0)
                {                {
                   th->remove_tsd("work func");                    if( i == 0 )
                     {
                        th->delete_tsd("work func");
                   th->put_tsd("work func", NULL,                   th->put_tsd("work func", NULL,
                               sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),                               sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
                               (void *)&_undertaker);                               (void *)&_undertaker);
                   th->remove_tsd("work parm");                       th->delete_tsd("work parm");
                   th->put_tsd("work parm", NULL, sizeof(void *), th);                   th->put_tsd("work parm", NULL, sizeof(void *), th);
  
                   // signal the thread's sleep semaphore to awaken it                   // signal the thread's sleep semaphore to awaken it
Line 455 
Line 613 
  
                   if(sleep_sem == 0)                   if(sleep_sem == 0)
                   {                   {
                           q->unlock();
                      th->dereference_tsd();                      th->dereference_tsd();
                      throw NullPointer();                      throw NullPointer();
                   }                   }
                   // put the thread on the dead  list  
                        bodies++;
                        th->dereference_tsd();
                   _dead.insert_first(th);                   _dead.insert_first(th);
                   sleep_sem->signal();                   sleep_sem->signal();
                   th->dereference_tsd();  
                   th = 0;                   th = 0;
                }                }
                     else
                     {
                        // deadlocked threads
   #if !defined(PEGASUS_PLATFORM_HPUX_ACC) && !defined(PEGASUS_PLATFORM_LINUX_IA64_GNU)
                        PEGASUS_STD(cout) << "Killing a deadlocked thread" << PEGASUS_STD(endl);
   #endif
                        th->cancel();
                        delete th;
                     }
                  }
             }             }
             th = q->next(th);             th = q->next(th);
               pegasus_sleep(1);
          }          }
          q->unlock();          q->unlock();
          while (needed.value() > 0)          while (needed.value() > 0)
          {          {
             _link_pool(_init_thread());             _link_pool(_init_thread());
             needed--;             needed--;
               pegasus_sleep(0);
          }          }
       }       }
    }    }
       return bodies;
   
    return;  
 } }
  
   
 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval) Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
 { {
    struct timeval now;     // never time out if the interval is zero
      if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
         return false;
   
      struct timeval now, finish, remaining;
      Uint32 usec;
    gettimeofday(&now, NULL);    gettimeofday(&now, NULL);
    if( (now.tv_sec - start->tv_sec) > interval->tv_sec ||  
        (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&     finish.tv_sec = start->tv_sec + interval->tv_sec;
         ((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )     usec = start->tv_usec + interval->tv_usec;
      finish.tv_sec += (usec / 1000000);
      usec %= 1000000;
      finish.tv_usec = usec;
   
      if ( timeval_subtract(&remaining, &finish, &now) )
       return true;       return true;
    else    else
       return false;       return false;
 } }
  
   
 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm ) PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);     exit_thread((PEGASUS_THREAD_RETURN)1);
    if(myself != 0)     return (PEGASUS_THREAD_RETURN)1;
   }
   
   
    void ThreadPool::_sleep_sem_del(void *p)
    {    {
       myself->detach();     if(p != 0)
       myself->_handle.thid = 0;     {
       myself->cancel();        delete (Semaphore *)p;
       myself->test_cancel();  
       myself->exit_self(0);  
    }    }
    return((PEGASUS_THREAD_RETURN)0);  
 } }
  
    void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
   {
      if (true == check_time(start, &_deadlock_detect))
         throw Deadlock(pegasus_thread_self());
      return;
   }
   
   
    Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
   {
      return(check_time(start, &_deadlock_detect));
   }
   
    Boolean ThreadPool::_check_dealloc(struct timeval *start)
   {
      return(check_time(start, &_deallocate_wait));
   }
   
    Thread *ThreadPool::_init_thread(void) throw(IPCException)
   {
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
      Thread *th = (Thread *) new Thread(_loop, this, false);
      // allocate a sleep semaphore and pass it in the thread context
      // initial count is zero, loop function will sleep until
      // we signal the semaphore
      Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
      th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
   
      struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
      th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
      // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
      th->run();
      _current_threads++;
      pegasus_yield();
      PEG_METHOD_EXIT();
   
      return th;
   }
   
    void ThreadPool::_link_pool(Thread *th) throw(IPCException)
   {
      if(th == 0)
         throw NullPointer();
      _pool.insert_first(th);
   }
   
   
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END
  


Legend:
Removed from v.1.5  
changed lines
  Added in v.1.32

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2