(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Thread.cpp between version 1.61 and 1.68

version 1.61, 2003/11/04 23:59:56 version 1.68, 2004/07/29 06:40:20
Line 28 
Line 28 
 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 //              added nsk platform support //              added nsk platform support
 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
   //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include "Thread.h" #include "Thread.h"
   #include <exception>
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
  
Line 59 
Line 61 
 { {
    if( data != NULL)    if( data != NULL)
    {    {
       AcceptLanguages * al = static_cast<AcceptLanguages *>(data);        AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
       delete al;  
    }    }
 } }
 // l10n end // l10n end
  
 Boolean Thread::_signals_blocked = false; Boolean Thread::_signals_blocked = false;
 // l10n // l10n
   #ifndef PEGASUS_OS_ZOS
 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1; PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
   #else
   PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
   #endif
 Boolean Thread::_key_initialized = false; Boolean Thread::_key_initialized = false;
 Boolean Thread::_key_error = false; Boolean Thread::_key_error = false;
  
Line 76 
Line 81 
 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException) void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 { {
     cleanup_handler *cu = new cleanup_handler(routine, parm);      AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
     try      _cleanup.insert_first(cu.get());
     {      cu.release();
         _cleanup.insert_first(cu);  
     }  
     catch(IPCException&)  
     {  
         delete cu;  
         throw;  
     }  
     return;     return;
 } }
  
 void Thread::cleanup_pop(Boolean execute) throw(IPCException) void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 { {
     cleanup_handler *cu ;      AutoPtr<cleanup_handler> cu ;
     try     try
     {     {
         cu = _cleanup.remove_first() ;          cu.reset(_cleanup.remove_first());
     }     }
     catch(IPCException&)     catch(IPCException&)
     {     {
Line 102 
Line 100 
      }      }
     if(execute == true)     if(execute == true)
         cu->execute();         cu->execute();
     delete cu;  
 } }
  
 #endif #endif
Line 525 
Line 522 
    catch(...)    catch(...)
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer");                      "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
         _graveyard(myself);
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);       return((PEGASUS_THREAD_RETURN)0);
    }    }
Line 534 
Line 532 
    {    {
       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");           "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
         _graveyard(myself);
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return((PEGASUS_THREAD_RETURN)0);       return((PEGASUS_THREAD_RETURN)0);
    }    }
Line 547 
Line 546 
       {       {
          sleep_sem->wait();          sleep_sem->wait();
       }       }
         catch (WaitInterrupted &e)
         {
           /* From the sem_wait manpage:
    The sem_trywait() and sem_wait() functions may fail if:
   
          EINTR  A signal interrupted this function.
           */
               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                   "Sleep semaphore wait failed. Doing a continue");
               continue;
         }
       catch(IPCException& )       catch(IPCException& )
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
           "ThreadPool::_loop: failure on sleep_sem->wait().");           "ThreadPool::_loop: failure on sleep_sem->wait().");
            _graveyard(myself);
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);          return((PEGASUS_THREAD_RETURN)0);
       }       }
  
       // when we awaken we reside on the running queue, not the pool queue       // when we awaken we reside on the running queue, not the pool queue
         /* Hence no need to move the thread to the _dead queue, as the _running
          * queue is only dused by kill_dead_threads which makes sure that the
          * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
          * before killing it.
          */
  
       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
       void *parm = 0;       void *parm = 0;
Line 575 
Line 591 
       {       {
          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");            "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
           /*
            * We cannot move ourselves to the dead queue b/c the TSD might be still
            * locked and _graveyard is not equipped to de-lock (dereference_tsd) the TSD.
            * Only the kill_dead_threads has enough logic to handle such situations.
            _graveyard( myself);
           */
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return((PEGASUS_THREAD_RETURN)0);          return((PEGASUS_THREAD_RETURN)0);
       }       }
Line 590 
Line 612 
       if(_work ==       if(_work ==
          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
       {       {
           /*
           * The undertaker is set by  ThreadPool::kill_dead_threads which awakens this thread,
           *  joins it and then removes it from the queue. Hence no reason to go to the
           _graveyard( myself);
           */
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          _work(parm);          _work(parm);
       }       }
Line 600 
Line 627 
       {       {
          try          try
          {          {
               PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
                   "Worker started");
             _work(parm);             _work(parm);
               PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
                   "Worker finished");
          }          }
          catch(Exception & e)          catch(Exception & e)
          {          {
Line 610 
Line 641 
             PEG_METHOD_EXIT();             PEG_METHOD_EXIT();
             return((PEGASUS_THREAD_RETURN)0);             return((PEGASUS_THREAD_RETURN)0);
          }          }
   #if !defined(PEGASUS_OS_LSB)
            catch (exception& e)
            {
               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                  String("Exception from _work in ThreadPool::_loop: ") +
                     e.what());
               PEG_METHOD_EXIT();
               return((PEGASUS_THREAD_RETURN)0);
            }
   #endif
          catch(...)          catch(...)
          {          {
             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
Line 689 
Line 730 
          // ATTN: Error result has not yet been defined          // ATTN: Error result has not yet been defined
          return true;          return true;
       }       }
       struct timeval now;  
       struct timeval start;       struct timeval start;
       gettimeofday(&start, NULL);       gettimeofday(&start, NULL);
       Thread *th = 0;       Thread *th = 0;
Line 778 
Line 818 
    // manipulate the threads on the ThreadPool queues, they should never    // manipulate the threads on the ThreadPool queues, they should never
    // be allowed to run at the same time.    // be allowed to run at the same time.
  
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
    // << Thu Oct 23 14:41:02 2003 mdd >>    // << Thu Oct 23 14:41:02 2003 mdd >>
    // not true, the queues are thread safe. they are syncrhonized.    // not true, the queues are thread safe. they are syncrhonized.
  
Line 793 
Line 834 
       struct timeval now;       struct timeval now;
       gettimeofday(&now, NULL);       gettimeofday(&now, NULL);
       Uint32 bodies = 0;       Uint32 bodies = 0;
         AtomicInt needed(0);
  
       // first go thread the dead q and clean it up as much as possible       // first go thread the dead q and clean it up as much as possible
       try       try
Line 811 
Line 853 
       }       }
       catch(...)       catch(...)
       {       {
               Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Exception when deleting dead");
       }       }
  
       if (_dying.value())       if (_dying.value())
Line 826 
Line 869 
  
       DQueue<Thread> *q = 0;       DQueue<Thread> *q = 0;
       int i = 0;       int i = 0;
       AtomicInt needed(0);  
       Thread *th = 0;       Thread *th = 0;
       internal_dq idq;       internal_dq idq;
  
 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS  #ifdef PEGASUS_KILL_LONG_RUNNING_THREADS
       // This change prevents the thread pool from killing "hung" threads.        // Defining PEGASUS_KILL_LONG_RUNNING_THREADS causes the thread pool
       // The definition of a "hung" thread is one that has been on the run queue        // to kill threads that are on the _running queue longer than the
       // for longer than the time interval set when the thread pool was created.        // _deadlock_detect time interval specified for the thread pool.
       // Cancelling "hung" threads has proven to be problematic.        // Cancelling long-running threads has proven to be problematic and
         // may cause a crash depending on the state of the thread when it is
       // With this change the thread pool will not cancel "hung" threads.  This        // killed.  Use this option with care.
       // may prevent a crash depending upon the state of the "hung" thread.  In  
       // the case that the thread is actually hung, this change causes the  
       // thread resources not to be reclaimed.  
   
       // Idle threads, those that have not executed a routine for a time  
       // interval, continue to be destroyed.  This is normal and should not  
       // cause any problems.  
       for( ; i < 1; i++)  
 #else  
       for( ; i < 2; i++)       for( ; i < 2; i++)
   #else
         for( ; i < 1; i++)
 #endif #endif
       {       {
          q = map[i];          q = map[i];
Line 951 
Line 986 
             else             else
             {             {
                // deadlocked threads                // deadlocked threads
                Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");                 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                                "A thread has run longer than %u seconds and "
                                    "will be cancelled.",
                                Uint32(_deadlock_detect.tv_sec));
                  Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
                                Logger::SEVERE,
                                "Common.Thread.CANCEL_LONG_RUNNING_THREAD",
                                "A thread has run longer than {0} seconds and "
                                    "will be cancelled.",
                                Uint32(_deadlock_detect.tv_sec));
                th->cancel();                th->cancel();
                delete th;                delete th;
             }             }
Line 959 
Line 1003 
          }          }
       }       }
  
        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                   "We need %u new threads", needed.value());
       while (needed.value() > 0)   {       while (needed.value() > 0)   {
          _link_pool(_init_thread());          _link_pool(_init_thread());
          needed--;          needed--;
Line 969 
Line 1015 
     catch (...)     catch (...)
     {     {
     }     }
      PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
  
Line 1000 
Line 1047 
  
 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm ) PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
 { {
   
      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
    exit_thread((PEGASUS_THREAD_RETURN)1);    exit_thread((PEGASUS_THREAD_RETURN)1);
      PEG_METHOD_EXIT();
    return (PEGASUS_THREAD_RETURN)1;    return (PEGASUS_THREAD_RETURN)1;
 } }
  
   PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
   {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
     ThreadPool *pool = (ThreadPool *)t->get_parm();
     if(pool == 0 ) {
       Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                     "Could not obtain the pool information from the Thread.", t);
   
         return (PEGASUS_THREAD_RETURN)1;
     }
     if (pool->_pool.exists(t))
       {
         if (pool->_pool.remove( (void *) t) != 0)
           {
           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                   "Moving thread %p", t);
           /* We are moving the thread to the _running queue b/c
           _only_ kill_dead_threads has enough logic to take care
           of cleaning up the threads.*/
   
             pool->_running.insert_first( t );
           }
         else
           {
             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                           "Could not move Thread %p from _pool to _runing queue.", t);
             return (PEGASUS_THREAD_RETURN)1;
           }
       }
   
     else if (pool->_running.exists(t))
       {
            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
                           "Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
             return (PEGASUS_THREAD_RETURN)1;
       }
     if (!pool->_dead.exists(t))
       {
         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
                       "Thread is not on any queue! Moving it to the running queue.");
         pool->_running.insert_first( t );
       }
     PEG_METHOD_EXIT();
     return (PEGASUS_THREAD_RETURN)0;
   }
  
  void ThreadPool::_sleep_sem_del(void *p)  void ThreadPool::_sleep_sem_del(void *p)
 { {
Line 1033 
Line 1128 
  
  Thread *ThreadPool::_init_thread(void) throw(IPCException)  Thread *ThreadPool::_init_thread(void) throw(IPCException)
 { {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
    Thread *th = (Thread *) new Thread(_loop, this, false);    Thread *th = (Thread *) new Thread(_loop, this, false);
    // allocate a sleep semaphore and pass it in the thread context    // allocate a sleep semaphore and pass it in the thread context
    // initial count is zero, loop function will sleep until    // initial count is zero, loop function will sleep until
Line 1053 
Line 1149 
    }    }
    _current_threads++;    _current_threads++;
    pegasus_yield();    pegasus_yield();
     PEG_METHOD_EXIT();
  
    return th;    return th;
 } }


Legend:
Removed from v.1.61  
changed lines
  Added in v.1.68

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2