(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.57 and 1.58

version 1.57, 2003/10/23 05:59:53 version 1.58, 2003/10/23 19:30:23
Line 279 
Line 279 
       Mutex* _mut;       Mutex* _mut;
 }; };
  
   class auto_int
   {
      public:
         auto_int(AtomicInt* num)
            : _int(num)
         {
            _int->operator++();
         }
         ~auto_int(void)
         {
            _int->operator--();
         }
         AtomicInt *_int;
   };
  
 DQueue<ThreadPool> ThreadPool::_pools(true);  
  
   AtomicInt _idle_control;
   
   DQueue<ThreadPool> ThreadPool::_pools(true);
  
 void ThreadPool::kill_idle_threads(void) void ThreadPool::kill_idle_threads(void)
 { {
Line 355 
Line 371 
    try    try
    {    {
       // Set the dying flag so all thread know the destructor has been entered       // Set the dying flag so all thread know the destructor has been entered
       {  
          // ThreadPool::~ThreadPool will wait to set the _dying flag until  
          // it can acquire the lock. Once this lock is acquired, the thread  
          // acquiring the lock must be able to assume that the value of  
          // _dying will not change until the lock is released.  
   
          // Once the _dying flag is set, functions (e.g., kill_dead_threads,  
          // and allocate_and_awaken) that manipulate the ThreadPool  
          // queues (_pool, _dead, and _running), should not be allowed  
          // to run.  
   
          Tracer::trace(TRC_THREAD, Tracer::LEVEL4,  
              "Attempting to aquire _monitor lock");  
          auto_mutex dying_lock(&(this->_monitor));  
          Tracer::trace(TRC_THREAD, Tracer::LEVEL4,  
              "Acquired _monitor lock");  
          _dying++;          _dying++;
       }  
       // remove from the global pools list       // remove from the global pools list
       _pools.remove(this);       _pools.remove(this);
  
Line 409 
Line 409 
          th = _pool.remove_first();          th = _pool.remove_first();
       }       }
  
         while(_idle_control.value())
            pegasus_yield();
   
       th = _dead.remove_first();       th = _dead.remove_first();
       while(th != 0)       while(th != 0)
       {       {
Line 536 
Line 539 
  
    while(1)    while(1)
    {    {
         if(pool->_dying.value())
            break;
   
       try       try
       {       {
          sleep_sem->wait();          sleep_sem->wait();
Line 666 
Line 672 
  
    try    try
    {    {
       auto_mutex dying_lock(&(this->_monitor));  
       if (_dying.value())       if (_dying.value())
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
Line 749 
Line 754 
    // manipulate the threads on the ThreadPool queues, they should never    // manipulate the threads on the ThreadPool queues, they should never
    // be allowed to run at the same time.    // be allowed to run at the same time.
  
    // kill_dead_threads will "hold" the _monitor lock until it has     // << Thu Oct 23 14:41:02 2003 mdd >>
    // completed executing. ~ThreadPool will not set the dying flag until     // not true, the queues are thread safe. they are syncrhonized.
    // it can get access to the lock.  
      auto_int do_not_destruct(&_idle_control);
  
    try    try
    {    {
       try_mutex dying_lock(&(this->_monitor));  
       if (_dying.value())       if (_dying.value())
       {       {
          return 0;          return 0;
Line 768 
Line 773 
       // first go thread the dead q and clean it up as much as possible       // first go thread the dead q and clean it up as much as possible
       try       try
       {       {
          while(_dead.count() > 0)           while(_dying.value() == 0 && _dead.count() > 0)
          {          {
             Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");             Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
             Thread *dead = _dead.remove_first();             Thread *dead = _dead.remove_first();
  
             if(dead == 0)              if(dead )
                throw NullPointer();              {
             dead->join();             dead->join();
             delete dead;             delete dead;
          }          }
       }       }
         }
       catch(...)       catch(...)
       {       {
       }       }
  
         if (_dying.value())
         {
            return 0;
         }
  
       DQueue<Thread> * map[2] =       DQueue<Thread> * map[2] =
          {          {
Line 793 
Line 803 
       DQueue<Thread> *q = 0;       DQueue<Thread> *q = 0;
       int i = 0;       int i = 0;
       AtomicInt needed(0);       AtomicInt needed(0);
         Thread *th = 0;
         internal_dq idq;
  
 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
       // This change prevents the thread pool from killing "hung" threads.       // This change prevents the thread pool from killing "hung" threads.
Line 827 
Line 839 
  
             struct timeval dt = { 0, 0 };             struct timeval dt = { 0, 0 };
             struct timeval *dtp;             struct timeval *dtp;
             Thread *th = 0;  
             th = q->next(th);             th = q->next(th);
             while (th != 0 )             while (th != 0 )
             {             {
Line 880 
Line 892 
                   }                   }
  
                   th = q->remove_no_lock((void *)th);                   th = q->remove_no_lock((void *)th);
                     idq.insert_first((void*)th);
                  }
                  th = q->next(th);
               }
               q->unlock();
            }
  
                   if(th != 0)           th = (Thread*)idq.remove_last();
            while(th != 0)
                   {                   {
                      if( i == 0 )                      if( i == 0 )
                      {                      {
Line 915 
Line 934 
                         th->cancel();                         th->cancel();
                         delete th;                         delete th;
                      }                      }
                   }              th = (Thread*)idq.remove_last();
                }  
                th = q->next(th);  
                pegasus_yield();  
             }  
             q->unlock();  
          }          }
       }       }
  
Line 931 
Line 945 
       }       }
        return bodies;        return bodies;
     }     }
     catch (AlreadyLocked &)      catch (...)
     {     {
        // kill_dead_threads was not able to obtain the  
        // _monitor lock that controls access to the  
        // _dying flag and the ThreadPool queues.  
        // This means that one of the queue manipulating  
        // functions (e.g., ~ThreadPool or allocate_and_awaken)  
        // is running.  Since cleanup is opportunistic, this  
        // function just returns.  
     }     }
     return 0;     return 0;
 } }


Legend:
Removed from v.1.57  
changed lines
  Added in v.1.58

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2