(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.1.2.10 and 1.1.2.14

version 1.1.2.10, 2001/10/29 11:25:22 version 1.1.2.14, 2001/11/13 18:38:06
Line 1 
Line 1 
   
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM,
   // Compaq Computer Corporation
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 22 
Line 24 
 // //
 // Author: Mike Day (mdday@us.ibm.com) // Author: Mike Day (mdday@us.ibm.com)
 // //
 // Modified By:  // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
   //              added nsk platform support
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
Line 33 
Line 36 
 # include "ThreadWindows.cpp" # include "ThreadWindows.cpp"
 #elif defined(PEGASUS_OS_TYPE_UNIX) #elif defined(PEGASUS_OS_TYPE_UNIX)
 # include "ThreadUnix.cpp" # include "ThreadUnix.cpp"
   #elif defined(PEGASUS_OS_TYPE_NSK)
   # include "ThreadNsk.cpp"
 #else #else
 # error "Unsupported platform" # error "Unsupported platform"
 #endif #endif
Line 111 
Line 116 
  
  
 ThreadPool::ThreadPool(Sint16 initial_size, ThreadPool::ThreadPool(Sint16 initial_size,
                        Sint16 max,                         Sint8 *key,
                        Sint16 min,                        Sint16 min,
                        Sint8 *key)                         Sint16 max,
                          struct timeval & alloc_wait,
                          struct timeval & dealloc_wait,
                          struct timeval & deadlock_detect)
    : _max_threads(max), _min_threads(min),    : _max_threads(max), _min_threads(min),
      _current_threads(0), _waiters(initial_size),      _current_threads(0), _waiters(initial_size),
      _pool_sem(0), _pool(true), _running(true),      _pool_sem(0), _pool(true), _running(true),
      _dying(0)       _dead(true), _dying(0)
 { {
    _allocate_wait.tv_sec = 1;     _allocate_wait.tv_sec = alloc_wait.tv_sec;
    _allocate_wait.tv_usec = 0;     _allocate_wait.tv_usec = alloc_wait.tv_usec;
    _deallocate_wait.tv_sec = 30;     _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
    _deallocate_wait.tv_usec = 0;     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
    _deadlock_detect.tv_sec = 60;     _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
    _deadlock_detect.tv_usec = 0;     _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
    memset(_key, 0x00, 17);    memset(_key, 0x00, 17);
    if(key != 0)    if(key != 0)
       strncpy(_key, key, 16);       strncpy(_key, key, 16);
Line 140 
Line 148 
    }    }
 } }
  
   
   
 ThreadPool::~ThreadPool(void) ThreadPool::~ThreadPool(void)
 { {
    _dying++;    _dying++;
    Thread *th = _pool.remove_first();    Thread *th = _pool.remove_first();
    while(th != 0)    while(th != 0)
    {    {
         Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
   
         if(sleep_sem == 0)
         {
            th->dereference_tsd();
            throw NullPointer();
         }
   
         sleep_sem->signal();
         sleep_sem->signal();
         th->dereference_tsd();
       // signal the thread's sleep semaphore       // signal the thread's sleep semaphore
       th->cancel();       th->cancel();
       th->join();       th->join();
Line 153 
Line 174 
       delete th;       delete th;
       th = _pool.remove_first();       th = _pool.remove_first();
    }    }
   
      th = _running.remove_first();
      while(th != 0)
      {
         // signal the thread's sleep semaphore
         th->cancel();
         th->join();
         th->empty_tsd();
         delete th;
         th = _running.remove_first();
      }
   
      th = _dead.remove_first();
      while(th != 0)
      {
         // signal the thread's sleep semaphore
         th->cancel();
         th->join();
         th->empty_tsd();
         delete th;
         th = _dead.remove_first();
      }
 } }
  
 // make this static to the class // make this static to the class
Line 183 
Line 226 
  
    while(pool->_dying < 1)    while(pool->_dying < 1)
    {    {
       myself->test_cancel();  
       sleep_sem->wait();       sleep_sem->wait();
       // when we awaken we reside on the running queue, not the pool queue       // when we awaken we reside on the running queue, not the pool queue
       myself->test_cancel();        if(pool->_dying > 0)
            break;
       gettimeofday(deadlock_timer, NULL);       gettimeofday(deadlock_timer, NULL);
  
       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *);       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *);
Line 220 
Line 263 
          myself->exit_self(0);          myself->exit_self(0);
       }       }
    }    }
      // wait to be awakend by the thread pool destructor
      sleep_sem->wait();
      myself->test_cancel();
    myself->exit_self(0);    myself->exit_self(0);
    return((PEGASUS_THREAD_RETURN)0);    return((PEGASUS_THREAD_RETURN)0);
 } }
Line 260 
Line 306 
          }          }
       }       }
       // will throw a Deadlock Exception before falling out of the loop       // will throw a Deadlock Exception before falling out of the loop
   
       _check_deadlock(&start);       _check_deadlock(&start);
   
    } // while th == null    } // while th == null
  
    if(_dying < 1)    if(_dying < 1)
Line 278 
Line 326 
  
       // signal the thread's sleep semaphore to awaken it       // signal the thread's sleep semaphore to awaken it
       Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");       Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
   
       if(sleep_sem == 0)       if(sleep_sem == 0)
         {
            th->dereference_tsd();
          throw NullPointer();          throw NullPointer();
         }
   
       sleep_sem->signal();       sleep_sem->signal();
         th->dereference_tsd();
    }    }
    else    else
       _pool.insert_first(th);       _pool.insert_first(th);
Line 290 
Line 344 
 // but should call it at least once per _deadlock_detect with the running q // but should call it at least once per _deadlock_detect with the running q
 // and at least once per _deallocate_wait for the pool q // and at least once per _deallocate_wait for the pool q
  
 void ThreadPool::_kill_dead_threads(DQueue<Thread> *q, Boolean (*check)(struct timeval *))  void ThreadPool::kill_dead_threads(void)
    throw(IPCException)    throw(IPCException)
 { {
    struct timeval now;    struct timeval now;
    gettimeofday(&now, NULL);    gettimeofday(&now, NULL);
  
    DQueue<Thread> dead ;  
  
      // first go thread the dead q and clean it up as much as possible
      while(_dead.count() > 0)
      {
         Thread *dead = _dead.remove_first();
         if(dead == 0)
            throw NullPointer();
         if(dead->_handle.thid != 0)
         {
            destroy_thread(dead->_handle.thid, 0);
            while(dead->_cleanup.count() )
            {
               dead->cleanup_pop(true);
            }
         }
         delete dead;
      }
   
      DQueue<Thread> * map[2] =
         {
            &_pool, &_running
         };
   
   
      DQueue<Thread> *q = 0;
      int i = 0;
      AtomicInt needed(0);
   
      for( q = map[i] ; i < 2; i++, q = map[i])
      {
    if(q->count() > 0 )    if(q->count() > 0 )
    {    {
       try       try
Line 306 
Line 388 
       }       }
       catch(AlreadyLocked & a)       catch(AlreadyLocked & a)
       {       {
          return;              q++;
               continue;
       }       }
  
       Thread *context = 0;  
       struct timeval dt = { 0, 0 };       struct timeval dt = { 0, 0 };
       struct timeval *dtp;       struct timeval *dtp;
       Thread *th = q->next(context);           Thread *th = 0;
            th = q->next(th);
       while (th != 0 )       while (th != 0 )
       {       {
          try          try
Line 321 
Line 404 
          }          }
          catch(AlreadyLocked & a)          catch(AlreadyLocked & a)
          {          {
             context = th;                 th = q->next(th);
             th = q->next(context);  
             continue;             continue;
          }          }
  
Line 332 
Line 414 
  
          }          }
          th->dereference_tsd();          th->dereference_tsd();
          if( true == check(&dt))              struct timeval deadlock_timeout;
               if( true == check_time(&dt, get_deadlock_detect(&deadlock_timeout) ))
               {
                  // if we are deallocating from the pool, escape if we are
                  // down to the minimum thread count
                  if( _current_threads.value() <= (Uint32)_min_threads )
          {          {
                     if( i == 1)
                     {
                        th = q->next(th);
                        continue;
                     }
                     else
                     {
                        // we are killing a hung thread and we will drop below the
                        // minimum. create another thread to make up for the one
                        // we are about to kill
                        needed++;
                     }
                  }
   
             th = q->remove_no_lock((void *)th);             th = q->remove_no_lock((void *)th);
  
             if(th != 0)             if(th != 0)
             {             {
                dead.insert_first(th);                    th->remove_tsd("work func");
                     th->put_tsd("work func", NULL,
                                 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
                                 (void *)&_undertaker);
                     th->remove_tsd("work parm");
                     th->put_tsd("work parm", NULL, sizeof(void *), th);
   
                     // signal the thread's sleep semaphore to awaken it
                     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
   
                     if(sleep_sem == 0)
                     {
                        th->dereference_tsd();
                        throw NullPointer();
                     }
                     // put the thread on the dead  list
                     _dead.insert_first(th);
                     sleep_sem->signal();
                     th->dereference_tsd();
                th = 0;                th = 0;
             }             }
          }          }
          context = th;              th = q->next(th);
          th = q->next(context);  
       }       }
       q->unlock();       q->unlock();
    }           while (needed.value() > 0)
   
    if(dead.count())  
    {    {
       Thread *th = dead.remove_first();              _link_pool(_init_thread());
       while(th != 0)              needed--;
       {           }
          th->cancel();  
          th->join();  
          delete th;  
          th = dead.remove_first();  
       }       }
    }    }
   
   
    return;    return;
 } }
  
 Boolean ThreadPool::_check_time(struct timeval *start, struct timeval *interval)  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
 { {
    struct timeval now;    struct timeval now;
    gettimeofday(&now, NULL);    gettimeofday(&now, NULL);
Line 374 
Line 488 
       return false;       return false;
 } }
  
 void ThreadPool::_sleep_sem_del(void *p)  
 {  
    if(p != 0)  
    {  
       delete (Semaphore *)p;  
    }  
 }  
  
 inline void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)  PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
 { {
    if (true == _check_time(start, &_deadlock_detect))     Thread *myself = reinterpret_cast<Thread *>(parm);
       throw Deadlock(pegasus_thread_self());     if(myself != 0)
    return;  
 }  
   
   
 inline Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)  
 {  
    return(_check_time(start, &_deadlock_detect));  
 }  
   
 inline Boolean ThreadPool::_check_dealloc(struct timeval *start)  
 {  
    return(_check_time(start, &_deallocate_wait));  
 }  
   
 inline Thread *ThreadPool::_init_thread(void) throw(IPCException)  
 { {
    Thread *th = (Thread *) new Thread(&_loop, this, false);        myself->detach();
    // allocate a sleep semaphore and pass it in the thread context        myself->_handle.thid = 0;
    // initial count is zero, loop function will sleep until        myself->cancel();
    // we signal the semaphore        myself->test_cancel();
    Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);        myself->exit_self(0);
    th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);  
    struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));  
    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);  
    // thread will enter _loop(void *) and sleep on sleep_sem until we signal it  
    th->run();  
    _current_threads++;  
    return th;  
 } }
      return((PEGASUS_THREAD_RETURN)0);
 inline void ThreadPool::_link_pool(Thread *th) throw(IPCException)  
 {  
    if(th == 0)  
       throw NullPointer();  
    _pool.insert_first(th);  
    _pool_sem.signal();  
 } }
  
  


Legend:
Removed from v.1.1.2.10  
changed lines
  Added in v.1.1.2.14

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2