(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.71.2.5 and 1.71.2.6

version 1.71.2.5, 2005/08/12 22:52:42 version 1.71.2.6, 2005/08/16 17:30:51
Line 357 
Line 357 
    _pools.insert_last(this);    _pools.insert_last(this);
 } }
  
   
 // Note:   <<< Fri Oct 17 09:19:03 2003 mdd >>>  
 // the pegasus_yield() calls that preceed each th->join() are to  
 // give a thread on the running list a chance to reach a cancellation  
 // point before the join  
   
 ThreadPool::~ThreadPool(void) ThreadPool::~ThreadPool(void)
 { {
    PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");    PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
Line 374 
Line 368 
       // remove from the global pools list       // remove from the global pools list
       _pools.remove(this);       _pools.remove(this);
  
       // start with idle threads.        while(_current_threads.value() > 0)
       Thread *th = 0;  
       th = _pool.remove_first();  
       Semaphore* sleep_sem;  
   
       while(th != 0)  
       {  
          sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");  
          PEGASUS_ASSERT(sleep_sem != 0);  
   
          if(sleep_sem == 0)  
          {          {
             th->dereference_tsd();           Thread* thread = _pool.remove_first();
          }           if (thread != 0)
          else  
          {          {
             // Signal to get the thread out of the work loop.              _cleanupThread(thread);
             sleep_sem->signal();              _current_threads--;
   
             // Signal to get the thread past the end. See the comment  
             // "wait to be awakend by the thread pool destructor"  
             // Note: the current implementation of Thread for Windows  
             // does not implement "pthread" cancelation points so this  
             // is needed.  
             sleep_sem->signal();  
             th->dereference_tsd();  
             th->join();  
             delete th;  
          }  
          th = _pool.remove_first();  
       }       }
  
       while(_idle_control.value())  
          pegasus_yield();  
   
       th = _dead.remove_first();  
       while(th != 0)  
       {  
          sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");  
          PEGASUS_ASSERT(sleep_sem != 0);  
   
          if(sleep_sem == 0)  
          {  
             th->dereference_tsd();  
          }  
          else          else
          {          {
             //ATTN-DME-P3-20030322: _dead queue processing in  
             //ThreadPool::~ThreadPool is inconsistent with the  
             //processing in kill_dead_threads.  Is this correct?  
   
             // signal the thread's sleep semaphore  
             sleep_sem->signal();  
             sleep_sem->signal();  
             th->dereference_tsd();  
             th->join();  
             delete th;  
          }  
          th = _dead.remove_first();  
       }  
   
       {  
          th = _running.remove_first();  
          while(th != 0)  
          {  
             // signal the thread's sleep semaphore  
   
             sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");  
             PEGASUS_ASSERT(sleep_sem != 0);  
   
             if(sleep_sem == 0 )  
             {  
                th->dereference_tsd();  
             }  
             else  
             {  
                sleep_sem->signal();  
                sleep_sem->signal();  
                th->dereference_tsd();  
                //th->cancel();  
                pegasus_yield();                pegasus_yield();
   
                th->join();  
                delete th;  
             }  
             th = _running.remove_first();  
          }          }
       }       }
    }    }
   
    catch(...)    catch(...)
    {    {
    }    }
Line 496 
Line 415 
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       throw NullPointer();       throw NullPointer();
    }    }
    if(pool->_dying.value())  
    {  
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
           "ThreadPool::_loop: ThreadPool is dying(1)");  
       PEG_METHOD_EXIT();  
       return((PEGASUS_THREAD_RETURN)0);  
    }  
  
    Semaphore *sleep_sem = 0;    Semaphore *sleep_sem = 0;
    Semaphore *blocking_sem = 0;    Semaphore *blocking_sem = 0;
Line 513 
Line 425 
    {    {
       sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");       sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
       myself->dereference_tsd();       myself->dereference_tsd();
         PEGASUS_ASSERT(sleep_sem != 0);
   
       deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");       deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
       myself->dereference_tsd();       myself->dereference_tsd();
         PEGASUS_ASSERT(deadlock_timer != 0);
    }    }
   
    catch(...)    catch(...)
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                     "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");                     "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
       _graveyard(myself);        PEGASUS_ASSERT(false);
       PEG_METHOD_EXIT();        pool->_pool.remove(myself);
       return((PEGASUS_THREAD_RETURN)0);        pool->_current_threads--;
    }  
   
    if(sleep_sem == 0 || deadlock_timer == 0)  
    {  
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");  
       _graveyard(myself);  
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);        return((PEGASUS_THREAD_RETURN)1);
    }    }
  
    while(1)    while(1)
    {    {
       if(pool->_dying.value())  
          break;  
   
       try       try
       {       {
          sleep_sem->wait(false);           sleep_sem->wait();
       }       }
       catch (WaitInterrupted &e)        catch(...)
       {  
         /* From the sem_wait manpage:  
  The sem_trywait() and sem_wait() functions may fail if:  
   
        EINTR  A signal interrupted this function.  
         */  
             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                 "Sleep semaphore wait failed. Doing a continue");  
             continue;  
       }  
       catch(IPCException& )  
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
            "ThreadPool::_loop: failure on sleep_sem->wait().");            "ThreadPool::_loop: failure on sleep_sem->wait().");
          _graveyard(myself);           PEGASUS_ASSERT(false);
            pool->_pool.remove(myself);
            pool->_current_threads--;
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);           return((PEGASUS_THREAD_RETURN)1);
       }       }
  
       // when we awaken we reside on the running queue, not the pool queue       // when we awaken we reside on the running queue, not the pool queue
Line 573 
Line 468 
  
       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
       void *parm = 0;       void *parm = 0;
         Semaphore* blocking_sem = 0;
  
       try       try
       {       {
Line 585 
Line 481 
          myself->dereference_tsd();          myself->dereference_tsd();
  
       }       }
       catch(IPCException &)        catch(...)
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
         /*           PEGASUS_ASSERT(false);
          * We cannot move ourselves to the dead queue b/c the TSD might be still           pool->_pool.remove(myself);
          * locked and _graveyard is not equipped to de-lock (dereference_tsd) the TSD.           pool->_current_threads--;
          * Only the kill_dead_threads has enough logic to handle such situations.  
          _graveyard( myself);  
         */  
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);           return((PEGASUS_THREAD_RETURN)1);
       }       }
  
       if(_work == 0)       if(_work == 0)
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
            "ThreadPool::_loop: work func is null.");             "ThreadPool::_loop: work func is 0, meaning we should exit.");
          PEG_METHOD_EXIT();           break;
          return((PEGASUS_THREAD_RETURN)0);  
       }  
   
       if(_work ==  
          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)  
       {  
         /*  
         * The undertaker is set by  ThreadPool::kill_dead_threads which awakens this thread,  
         *  joins it and then removes it from the queue. Hence no reason to go to the  
         _graveyard( myself);  
         */  
          PEG_METHOD_EXIT();  
          _work(parm);  
       }       }
  
       gettimeofday(deadlock_timer, NULL);       gettimeofday(deadlock_timer, NULL);
  
       if (pool->_dying.value() == 0)  
       {  
          try          try
          {          {
             PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,             PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
Line 636 
Line 514 
             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                String("Exception from _work in ThreadPool::_loop: ") +                String("Exception from _work in ThreadPool::_loop: ") +
                   e.getMessage());                   e.getMessage());
             PEG_METHOD_EXIT();  
             return((PEGASUS_THREAD_RETURN)0);  
          }          }
 #if !defined(PEGASUS_OS_LSB) #if !defined(PEGASUS_OS_LSB)
          catch (exception& e)          catch (exception& e)
Line 645 
Line 521 
             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                String("Exception from _work in ThreadPool::_loop: ") +                String("Exception from _work in ThreadPool::_loop: ") +
                   e.what());                   e.what());
             PEG_METHOD_EXIT();  
             return((PEGASUS_THREAD_RETURN)0);  
          }          }
 #endif #endif
          catch(...)          catch(...)
          {          {
             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
               "ThreadPool::_loop: execution of _work failed.");               "ThreadPool::_loop: execution of _work failed.");
             PEG_METHOD_EXIT();  
             return((PEGASUS_THREAD_RETURN)0);  
          }  
        }        }
  
       // put myself back onto the available list       // put myself back onto the available list
       try       try
       {       {
          if(pool->_dying.value() == 0)  
          {  
             gettimeofday(deadlock_timer, NULL);             gettimeofday(deadlock_timer, NULL);
             if( blocking_sem != 0 )             if( blocking_sem != 0 )
                blocking_sem->signal();                blocking_sem->signal();
  
             // If we are not on _running then ~ThreadPool has removed           Boolean removed = pool->_running.remove((void *)myself);
             // us and now "owns" our pointer.           PEGASUS_ASSERT(removed);
             if ( pool->_running.remove((void *)myself) != 0 )  
             {  
                pool->_pool.insert_first(myself);                pool->_pool.insert_first(myself);
             }             }
             else  
             {  
                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                   "ThreadPool::_loop: Failed to remove thread from running queue.");  
                PEG_METHOD_EXIT();  
                return((PEGASUS_THREAD_RETURN)0);  
             }  
          }  
          else  
          {  
             PEG_METHOD_EXIT();  
             return((PEGASUS_THREAD_RETURN)0);  
          }  
       }  
       catch(...)       catch(...)
       {       {
         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
              "ThreadPool::_loop: Adding thread to idle pool failed.");              "ThreadPool::_loop: Adding thread to idle pool failed.");
            PEGASUS_ASSERT(false);
            pool->_current_threads--;
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);           return((PEGASUS_THREAD_RETURN)1);
       }       }
  
    }    }
  
    // TODO: Why is this needed? Why not just continue?  
    // wait to be awakend by the thread pool destructor  
    //sleep_sem->wait();  
   
    myself->test_cancel();  
   
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
    return((PEGASUS_THREAD_RETURN)0);    return((PEGASUS_THREAD_RETURN)0);
 } }
Line 804 
Line 654 
 Uint32 ThreadPool::kill_dead_threads(void) Uint32 ThreadPool::kill_dead_threads(void)
          throw(IPCException)          throw(IPCException)
 { {
    // Since the kill_dead_threads, ThreadPool or allocate_and_awaken  
    // manipulate the threads on the ThreadPool queues, they should never  
    // be allowed to run at the same time.  
   
    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
    // << Thu Oct 23 14:41:02 2003 mdd >>  
    // not true, the queues are thread safe. they are syncrhonized.  
  
    auto_int do_not_destruct(&_idle_control);     Uint32 numThreadsCleanedUp = 0;
  
    try      Uint32 numIdleThreads = _pool.count();
       for (Uint32 i = 0; i < numIdleThreads; i++)
    {    {
       if (_dying.value())          // Do not dip below the minimum thread count
           if (_current_threads.value() <= (Uint32)_min_threads)
       {       {
          return 0;              break;
       }       }
  
       struct timeval now;          Thread* thread = _pool.remove_last();
       gettimeofday(&now, NULL);  
       Uint32 bodies = 0;  
       AtomicInt needed(0);  
   
       // first go thread the dead q and clean it up as much as possible  
       try  
       {  
          while(_dying.value() == 0 && _dead.count() > 0)  
          {  
             Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");  
             Thread *dead = _dead.remove_first();  
   
             if(dead )  
             {  
                dead->join();  
                delete dead;  
             }  
          }  
       }  
       catch(...)  
       {  
             Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Exception when deleting dead");  
       }  
  
       if (_dying.value())          // If there are no more threads in the _pool queue, we're done.
           if (thread == 0)
       {       {
          return 0;              break;
       }       }
  
       Thread *th = 0;          struct timeval* lastActivityTime;
       internal_dq idq;  
   
       if(_pool.count() > 0 )  
       {  
          try          try
          {          {
             _pool.try_lock();              lastActivityTime = (struct timeval *)thread->try_reference_tsd(
                   "deadlock timer");
               PEGASUS_ASSERT(lastActivityTime != 0);
          }          }
          catch(...)          catch(...)
          {          {
             return bodies;              PEGASUS_ASSERT(false);
               _pool.insert_last(thread);
               break;
          }          }
  
          struct timeval dt = { 0, 0 };          Boolean cleanupThisThread =
          struct timeval *dtp;              check_time(lastActivityTime, &_deallocate_wait);
           thread->dereference_tsd();
  
          th = _pool.next(th);          if (cleanupThisThread)
          while (th != 0 )  
          {  
             try  
             {  
                dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");  
             }  
             catch(...)  
             {             {
                _pool.unlock();              _cleanupThread(thread);
                return bodies;              _current_threads--;
               numThreadsCleanedUp++;
             }             }
           else
             if(dtp != 0)  
             {             {
                memcpy(&dt, dtp, sizeof(struct timeval));              _pool.insert_first(thread);
             }             }
             th->dereference_tsd();  
             struct timeval deadlock_timeout;  
             Boolean too_long;  
             too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));  
   
             if( true == too_long)  
             {  
                // escape if we are down to the minimum thread count  
                _current_threads--;  
                if( _current_threads.value() < (Uint32)_min_threads )  
                {  
                   _current_threads++;  
                   th = _pool.next(th);  
                   continue;  
                }                }
  
                th = _pool.remove_no_lock((void *)th);      PEG_METHOD_EXIT();
                idq.insert_first((void*)th);      return numThreadsCleanedUp;
             }  
             th = _pool.next(th);  
          }  
          _pool.unlock();  
       }       }
  
       th = (Thread*)idq.remove_last();  void ThreadPool::_cleanupThread(Thread* th)
       while(th != 0)  
       {       {
       PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
   
       // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
          th->delete_tsd("work func");          th->delete_tsd("work func");
          th->put_tsd("work func", NULL,      th->put_tsd(
           "work func", NULL,
             sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),             sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
             (void *)&_undertaker);          (void *) 0);
          th->delete_tsd("work parm");          th->delete_tsd("work parm");
          th->put_tsd("work parm", NULL, sizeof(void *), th);      th->put_tsd("work parm", NULL, sizeof(void *), 0);
  
          // signal the thread's sleep semaphore to awaken it          // signal the thread's sleep semaphore to awaken it
          Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");          Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
          PEGASUS_ASSERT(sleep_sem != 0);          PEGASUS_ASSERT(sleep_sem != 0);
   
          bodies++;  
          th->dereference_tsd();  
          sleep_sem->signal();          sleep_sem->signal();
          th->join();  // Note: Clean up the thread here rather than      th->dereference_tsd();
          delete th;   // leave it sitting unused on the _dead queue  
          th = (Thread*)idq.remove_last();      th->join();
       }      delete th;
  
      Tracer::trace(TRC_THREAD, Tracer::LEVEL2,  
                 "We need %u new threads", needed.value());  
       while (needed.value() > 0)   {  
          _link_pool(_init_thread());  
          needed--;  
          pegasus_sleep(0);  
       }  
        return bodies;  
     }  
     catch (...)  
     {  
     }  
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
     return 0;  
 } }
  
   
 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval) Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
 { {
    // never time out if the interval is zero    // never time out if the interval is zero


Legend:
Removed from v.1.71.2.5  
changed lines
  Added in v.1.71.2.6

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2