(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.65 and 1.72

version 1.65, 2004/06/04 05:54:51 version 1.72, 2004/10/25 18:26:02
Line 1 
Line 1 
 //%2003////////////////////////////////////////////////////////////////////////  //%2004////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.  // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.; // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
   // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 33 
Line 35 
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include "Thread.h" #include "Thread.h"
   #include <exception>
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
  
Line 46 
Line 49 
 # error "Unsupported platform" # error "Unsupported platform"
 #endif #endif
  
   PEGASUS_USING_STD;
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
  
Line 399 
Line 403 
             // is needed.             // is needed.
             sleep_sem->signal();             sleep_sem->signal();
             th->dereference_tsd();             th->dereference_tsd();
             th->cancel();  
             th->join();             th->join();
             delete th;             delete th;
          }          }
Line 429 
Line 432 
             sleep_sem->signal();             sleep_sem->signal();
             sleep_sem->signal();             sleep_sem->signal();
             th->dereference_tsd();             th->dereference_tsd();
             th->cancel();  
             th->join();             th->join();
             delete th;             delete th;
          }          }
Line 454 
Line 456 
                sleep_sem->signal();                sleep_sem->signal();
                sleep_sem->signal();                sleep_sem->signal();
                th->dereference_tsd();                th->dereference_tsd();
                th->cancel();                 //th->cancel();
                pegasus_yield();                pegasus_yield();
  
                th->join();                th->join();
Line 521 
Line 523 
    catch(...)    catch(...)
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer");                      "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
         _graveyard(myself);
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);       return((PEGASUS_THREAD_RETURN)0);
    }    }
Line 530 
Line 533 
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
         _graveyard(myself);
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);       return((PEGASUS_THREAD_RETURN)0);
    }    }
Line 541 
Line 545 
  
       try       try
       {       {
          sleep_sem->wait();                                  Boolean ignoreInterrupt = false;
                                   sleep_sem->wait(ignoreInterrupt);
         }
         catch (WaitInterrupted &e)
         {
           /* From the sem_wait manpage:
    The sem_trywait() and sem_wait() functions may fail if:
   
          EINTR  A signal interrupted this function.
           */
               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                   "Sleep semaphore wait failed. Doing a continue");
               continue;
       }       }
       catch(IPCException& )       catch(IPCException& )
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: failure on sleep_sem->wait().");           "ThreadPool::_loop: failure on sleep_sem->wait().");
            _graveyard(myself);
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);          return((PEGASUS_THREAD_RETURN)0);
       }       }
  
       // when we awaken we reside on the running queue, not the pool queue       // when we awaken we reside on the running queue, not the pool queue
         /* Hence no need to move the thread to the _dead queue, as the _running
          * queue is only dused by kill_dead_threads which makes sure that the
          * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
          * before killing it.
          */
  
       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
       void *parm = 0;       void *parm = 0;
Line 571 
Line 593 
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
           /*
            * We cannot move ourselves to the dead queue b/c the TSD might be still
            * locked and _graveyard is not equipped to de-lock (dereference_tsd) the TSD.
            * Only the kill_dead_threads has enough logic to handle such situations.
            _graveyard( myself);
           */
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);          return((PEGASUS_THREAD_RETURN)0);
       }       }
Line 586 
Line 614 
       if(_work ==       if(_work ==
          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
       {       {
           /*
           * The undertaker is set by  ThreadPool::kill_dead_threads which awakens this thread,
           *  joins it and then removes it from the queue. Hence no reason to go to the
           _graveyard( myself);
           */
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          _work(parm);          _work(parm);
       }       }
Line 596 
Line 629 
       {       {
          try          try
          {          {
               PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
                   "Worker started");
             _work(parm);             _work(parm);
               PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
                   "Worker finished");
          }          }
          catch(Exception & e)          catch(Exception & e)
          {          {
Line 606 
Line 643 
             PEG_METHOD_EXIT();             PEG_METHOD_EXIT();
             return((PEGASUS_THREAD_RETURN)0);             return((PEGASUS_THREAD_RETURN)0);
          }          }
   #if !defined(PEGASUS_OS_LSB)
            catch (exception& e)
            {
               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                  String("Exception from _work in ThreadPool::_loop: ") +
                     e.what());
               PEG_METHOD_EXIT();
               return((PEGASUS_THREAD_RETURN)0);
            }
   #endif
          catch(...)          catch(...)
          {          {
             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
Line 685 
Line 732 
          // ATTN: Error result has not yet been defined          // ATTN: Error result has not yet been defined
          return true;          return true;
       }       }
       struct timeval now;  
       struct timeval start;       struct timeval start;
       gettimeofday(&start, NULL);       gettimeofday(&start, NULL);
       Thread *th = 0;       Thread *th = 0;
Line 774 
Line 820 
    // manipulate the threads on the ThreadPool queues, they should never    // manipulate the threads on the ThreadPool queues, they should never
    // be allowed to run at the same time.    // be allowed to run at the same time.
  
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
    // << Thu Oct 23 14:41:02 2003 mdd >>    // << Thu Oct 23 14:41:02 2003 mdd >>
    // not true, the queues are thread safe. they are syncrhonized.    // not true, the queues are thread safe. they are syncrhonized.
  
Line 789 
Line 836 
       struct timeval now;       struct timeval now;
       gettimeofday(&now, NULL);       gettimeofday(&now, NULL);
       Uint32 bodies = 0;       Uint32 bodies = 0;
         AtomicInt needed(0);
  
       // first go thread the dead q and clean it up as much as possible       // first go thread the dead q and clean it up as much as possible
       try       try
Line 807 
Line 855 
       }       }
       catch(...)       catch(...)
       {       {
               Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Exception when deleting dead");
       }       }
  
       if (_dying.value())       if (_dying.value())
Line 814 
Line 863 
          return 0;          return 0;
       }       }
  
       DQueue<Thread> * map[2] =  
          {  
             &_pool, &_running  
          };  
   
   
       DQueue<Thread> *q = 0;  
       int i = 0;  
       AtomicInt needed(0);  
       Thread *th = 0;       Thread *th = 0;
       internal_dq idq;       internal_dq idq;
  
 #ifdef PEGASUS_KILL_LONG_RUNNING_THREADS        if(_pool.count() > 0 )
       // Defining PEGASUS_KILL_LONG_RUNNING_THREADS causes the thread pool  
       // to kill threads that are on the _running queue longer than the  
       // _deadlock_detect time interval specified for the thread pool.  
       // Cancelling long-running threads has proven to be problematic and  
       // may cause a crash depending on the state of the thread when it is  
       // killed.  Use this option with care.  
       for( ; i < 2; i++)  
 #else  
       for( ; i < 1; i++)  
 #endif  
       {  
          q = map[i];  
          if(q->count() > 0 )  
          {          {
             try             try
             {             {
                q->try_lock();              _pool.try_lock();
             }             }
             catch(...)             catch(...)
             {             {
Line 853 
Line 880 
             struct timeval dt = { 0, 0 };             struct timeval dt = { 0, 0 };
             struct timeval *dtp;             struct timeval *dtp;
  
             th = q->next(th);           th = _pool.next(th);
             while (th != 0 )             while (th != 0 )
             {             {
                try                try
Line 862 
Line 889 
                }                }
                catch(...)                catch(...)
                {                {
                   q->unlock();                 _pool.unlock();
                   return bodies;                   return bodies;
                }                }
  
Line 873 
Line 900 
                th->dereference_tsd();                th->dereference_tsd();
                struct timeval deadlock_timeout;                struct timeval deadlock_timeout;
                Boolean too_long;                Boolean too_long;
                if( i == 0)  
                {  
                   too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));                   too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
                }  
                else  
                {  
                   too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));  
                }  
  
                if( true == too_long)                if( true == too_long)
                {                {
                   // if we are deallocating from the pool, escape if we are                 // escape if we are down to the minimum thread count
                   // down to the minimum thread count  
                   _current_threads--;                   _current_threads--;
                   if( _current_threads.value() < (Uint32)_min_threads )                   if( _current_threads.value() < (Uint32)_min_threads )
                   {                   {
                      if( i == 0)  
                      {  
                         _current_threads++;                         _current_threads++;
                         th = q->next(th);                    th = _pool.next(th);
                         continue;                         continue;
                      }                      }
                      else  
                      {  
                         // we are killing a hung thread and we will drop below the  
                         // minimum. create another thread to make up for the one  
                         // we are about to kill  
                         needed++;  
                      }  
                   }  
  
                   th = q->remove_no_lock((void *)th);                 th = _pool.remove_no_lock((void *)th);
                   idq.insert_first((void*)th);                   idq.insert_first((void*)th);
                }                }
                th = q->next(th);              th = _pool.next(th);
             }             }
             q->unlock();           _pool.unlock();
          }          }
  
          th = (Thread*)idq.remove_last();          th = (Thread*)idq.remove_last();
          while(th != 0)          while(th != 0)
          {          {
             if( i == 0 )  
             {  
                th->delete_tsd("work func");                th->delete_tsd("work func");
                th->put_tsd("work func", NULL,                th->put_tsd("work func", NULL,
                            sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),                            sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
Line 930 
Line 937 
  
                bodies++;                bodies++;
                th->dereference_tsd();                th->dereference_tsd();
                // Putting thread on _dead queue delays availability to others  
                //_dead.insert_first(th);  
                sleep_sem->signal();                sleep_sem->signal();
                th->join();  // Note: Clean up the thread here rather than                th->join();  // Note: Clean up the thread here rather than
                delete th;   // leave it sitting unused on the _dead queue                delete th;   // leave it sitting unused on the _dead queue
                th = 0;  
             }  
             else  
             {  
                // deadlocked threads  
                struct timeval deadlock_timeout;  
                Tracer::trace(TRC_THREAD, Tracer::LEVEL2,  
                              "A thread has run longer than %u seconds and "  
                                  "will be cancelled.",  
                              Uint32(_deadlock_detect.tv_sec));  
                Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,  
                              Logger::SEVERE,  
                              "Common.Thread.CANCEL_LONG_RUNNING_THREAD",  
                              "A thread has run longer than {0} seconds and "  
                                  "will be cancelled.",  
                              Uint32(_deadlock_detect.tv_sec));  
                th->cancel();  
                delete th;  
             }  
             th = (Thread*)idq.remove_last();             th = (Thread*)idq.remove_last();
          }          }
       }  
  
        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                   "We need %u new threads", needed.value());
       while (needed.value() > 0)   {       while (needed.value() > 0)   {
          _link_pool(_init_thread());          _link_pool(_init_thread());
          needed--;          needed--;
Line 968 
Line 955 
     catch (...)     catch (...)
     {     {
     }     }
      PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
  
Line 999 
Line 987 
  
 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm ) PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
 { {
   
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
    exit_thread((PEGASUS_THREAD_RETURN)1);    exit_thread((PEGASUS_THREAD_RETURN)1);
      PEG_METHOD_EXIT();
      return (PEGASUS_THREAD_RETURN)1;
   }
   
   PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
   {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
     ThreadPool *pool = (ThreadPool *)t->get_parm();
     if(pool == 0 ) {
       Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                     "Could not obtain the pool information from the Thread.", t);
   
         return (PEGASUS_THREAD_RETURN)1;
     }
     if (pool->_pool.exists(t))
       {
         if (pool->_pool.remove( (void *) t) != 0)
           {
           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                   "Moving thread %p", t);
           /* We are moving the thread to the _running queue b/c
           _only_ kill_dead_threads has enough logic to take care
           of cleaning up the threads.*/
   
             pool->_running.insert_first( t );
           }
         else
           {
             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                           "Could not move Thread %p from _pool to _runing queue.", t);
    return (PEGASUS_THREAD_RETURN)1;    return (PEGASUS_THREAD_RETURN)1;
 } }
       }
  
     else if (pool->_running.exists(t))
       {
            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                           "Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
             return (PEGASUS_THREAD_RETURN)1;
       }
     if (!pool->_dead.exists(t))
       {
         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                       "Thread is not on any queue! Moving it to the running queue.");
         pool->_running.insert_first( t );
       }
     PEG_METHOD_EXIT();
     return (PEGASUS_THREAD_RETURN)0;
   }
  
  void ThreadPool::_sleep_sem_del(void *p)  void ThreadPool::_sleep_sem_del(void *p)
 { {
Line 1032 
Line 1068 
  
  Thread *ThreadPool::_init_thread(void) throw(IPCException)  Thread *ThreadPool::_init_thread(void) throw(IPCException)
 { {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
    Thread *th = (Thread *) new Thread(_loop, this, false);    Thread *th = (Thread *) new Thread(_loop, this, false);
    // allocate a sleep semaphore and pass it in the thread context    // allocate a sleep semaphore and pass it in the thread context
    // initial count is zero, loop function will sleep until    // initial count is zero, loop function will sleep until
Line 1052 
Line 1089 
    }    }
    _current_threads++;    _current_threads++;
    pegasus_yield();    pegasus_yield();
     PEG_METHOD_EXIT();
  
    return th;    return th;
 } }


Legend:
Removed from v.1.65  
changed lines
  Added in v.1.72

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2