(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.66 and 1.67

version 1.66, 2004/07/26 23:45:11 version 1.67, 2004/07/28 16:22:10
Line 521 
Line 521 
    catch(...)    catch(...)
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer");                      "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
         _graveyard(myself);
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);       return((PEGASUS_THREAD_RETURN)0);
    }    }
Line 530 
Line 531 
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
         _graveyard(myself);
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);       return((PEGASUS_THREAD_RETURN)0);
    }    }
Line 543 
Line 545 
       {       {
          sleep_sem->wait();          sleep_sem->wait();
       }       }
         catch (WaitInterrupted &e)
         {
           /* From the sem_wait manpage:
    The sem_trywait() and sem_wait() functions may fail if:
   
          EINTR  A signal interrupted this function.
           */
               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                   "Sleep semaphore wait failed. Doing a continue");
               continue;
         }
       catch(IPCException& )       catch(IPCException& )
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: failure on sleep_sem->wait().");           "ThreadPool::_loop: failure on sleep_sem->wait().");
            _graveyard(myself);
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);          return((PEGASUS_THREAD_RETURN)0);
       }       }
  
       // when we awaken we reside on the running queue, not the pool queue       // when we awaken we reside on the running queue, not the pool queue
         /* Hence no need to move the thread to the _dead queue, as the _running
          * queue is only dused by kill_dead_threads which makes sure that the
          * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
          * before killing it.
          */
  
       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
       void *parm = 0;       void *parm = 0;
Line 571 
Line 590 
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
           /*
            * We cannot move ourselves to the dead queue b/c the TSD might be still
            * locked and _graveyard is not equipped to de-lock (dereference_tsd) the TSD.
            * Only the kill_dead_threads has enough logic to handle such situations.
            _graveyard( myself);
           */
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);          return((PEGASUS_THREAD_RETURN)0);
       }       }
Line 586 
Line 611 
       if(_work ==       if(_work ==
          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
       {       {
           /*
           * The undertaker is set by  ThreadPool::kill_dead_threads which awakens this thread,
           *  joins it and then removes it from the queue. Hence no reason to go to the
           _graveyard( myself);
           */
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          _work(parm);          _work(parm);
       }       }
Line 596 
Line 626 
       {       {
          try          try
          {          {
               PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
                   "Worker started");
             _work(parm);             _work(parm);
               PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
                   "Worker finished");
          }          }
          catch(Exception & e)          catch(Exception & e)
          {          {
Line 773 
Line 807 
    // manipulate the threads on the ThreadPool queues, they should never    // manipulate the threads on the ThreadPool queues, they should never
    // be allowed to run at the same time.    // be allowed to run at the same time.
  
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
    // << Thu Oct 23 14:41:02 2003 mdd >>    // << Thu Oct 23 14:41:02 2003 mdd >>
    // not true, the queues are thread safe. they are syncrhonized.    // not true, the queues are thread safe. they are syncrhonized.
  
Line 788 
Line 823 
       struct timeval now;       struct timeval now;
       gettimeofday(&now, NULL);       gettimeofday(&now, NULL);
       Uint32 bodies = 0;       Uint32 bodies = 0;
         AtomicInt needed(0);
  
       // first go thread the dead q and clean it up as much as possible       // first go thread the dead q and clean it up as much as possible
       try       try
Line 806 
Line 842 
       }       }
       catch(...)       catch(...)
       {       {
               Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Exception when deleting dead");
       }       }
  
       if (_dying.value())       if (_dying.value())
Line 821 
Line 858 
  
       DQueue<Thread> *q = 0;       DQueue<Thread> *q = 0;
       int i = 0;       int i = 0;
       AtomicInt needed(0);  
       Thread *th = 0;       Thread *th = 0;
       internal_dq idq;       internal_dq idq;
  
Line 956 
Line 992 
          }          }
       }       }
  
        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                   "We need %u new threads", needed.value());
       while (needed.value() > 0)   {       while (needed.value() > 0)   {
          _link_pool(_init_thread());          _link_pool(_init_thread());
          needed--;          needed--;
Line 966 
Line 1004 
     catch (...)     catch (...)
     {     {
     }     }
      PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
  
Line 997 
Line 1036 
  
 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm ) PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
 { {
   
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
    exit_thread((PEGASUS_THREAD_RETURN)1);    exit_thread((PEGASUS_THREAD_RETURN)1);
      PEG_METHOD_EXIT();
      return (PEGASUS_THREAD_RETURN)1;
   }
   
   PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
   {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
     ThreadPool *pool = (ThreadPool *)t->get_parm();
     if(pool == 0 ) {
       Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                     "Could not obtain the pool information from the Thread.", t);
   
         return (PEGASUS_THREAD_RETURN)1;
     }
     if (pool->_pool.exists(t))
       {
         if (pool->_pool.remove( (void *) t) != 0)
           {
           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                   "Moving thread %p", t);
           /* We are moving the thread to the _running queue b/c
           _only_ kill_dead_threads has enough logic to take care
           of cleaning up the threads.*/
   
             pool->_running.insert_first( t );
           }
         else
           {
             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                           "Could not move Thread %p from _pool to _runing queue.", t);
    return (PEGASUS_THREAD_RETURN)1;    return (PEGASUS_THREAD_RETURN)1;
 } }
       }
  
     else if (pool->_running.exists(t))
       {
            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                           "Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
             return (PEGASUS_THREAD_RETURN)1;
       }
     if (!pool->_dead.exists(t))
       {
         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                       "Thread is not on any queue! Moving it to the running queue.");
         pool->_running.insert_first( t );
       }
     PEG_METHOD_EXIT();
     return (PEGASUS_THREAD_RETURN)0;
   }
  
  void ThreadPool::_sleep_sem_del(void *p)  void ThreadPool::_sleep_sem_del(void *p)
 { {
Line 1030 
Line 1117 
  
  Thread *ThreadPool::_init_thread(void) throw(IPCException)  Thread *ThreadPool::_init_thread(void) throw(IPCException)
 { {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
    Thread *th = (Thread *) new Thread(_loop, this, false);    Thread *th = (Thread *) new Thread(_loop, this, false);
    // allocate a sleep semaphore and pass it in the thread context    // allocate a sleep semaphore and pass it in the thread context
    // initial count is zero, loop function will sleep until    // initial count is zero, loop function will sleep until
Line 1050 
Line 1138 
    }    }
    _current_threads++;    _current_threads++;
    pegasus_yield();    pegasus_yield();
     PEG_METHOD_EXIT();
  
    return th;    return th;
 } }


Legend:
Removed from v.1.66  
changed lines
  Added in v.1.67

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2