(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/ThreadPool.cpp between version 1.5 and 1.21

version 1.5, 2007/01/11 16:21:54 version 1.21, 2008/11/12 22:46:15
Line 95 
Line 95 
         // Set the dying flag so all thread know the destructor has been         // Set the dying flag so all thread know the destructor has been
         // entered         // entered
         _dying++;         _dying++;
         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,          PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
             "Cleaning up %d idle threads.", _currentThreads.get());              "Cleaning up %d idle threads.", _currentThreads.get()));
  
         while (_currentThreads.get() > 0)         while (_currentThreads.get() > 0)
         {         {
Line 138 
Line 138 
  
         try         try
         {         {
             sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");              sleep_sem = (Semaphore *) myself->reference_tsd(TSD_SLEEP_SEM);
             myself->dereference_tsd();             myself->dereference_tsd();
             PEGASUS_ASSERT(sleep_sem != 0);             PEGASUS_ASSERT(sleep_sem != 0);
  
             lastActivityTime =             lastActivityTime =
                 (struct timeval *) myself->                 (struct timeval *) myself->
                 reference_tsd("last activity time");                  reference_tsd(TSD_LAST_ACTIVITY_TIME);
             myself->dereference_tsd();             myself->dereference_tsd();
             PEGASUS_ASSERT(lastActivityTime != 0);             PEGASUS_ASSERT(lastActivityTime != 0);
         }         }
         catch (...)         catch (...)
         {         {
             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,              PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
                 "ThreadPool::_loop: Failure getting sleep_sem or "                 "ThreadPool::_loop: Failure getting sleep_sem or "
                     "lastActivityTime.");                     "lastActivityTime.");
             PEGASUS_ASSERT(false);             PEGASUS_ASSERT(false);
Line 168 
Line 168 
             }             }
             catch (...)             catch (...)
             {             {
                 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
                     "ThreadPool::_loop: failure on sleep_sem->wait().");                     "ThreadPool::_loop: failure on sleep_sem->wait().");
                 PEGASUS_ASSERT(false);                 PEGASUS_ASSERT(false);
                 pool->_idleThreads.remove(myself);                 pool->_idleThreads.remove(myself);
Line 181 
Line 181 
             // _idleThreads queue.             // _idleThreads queue.
  
             ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;             ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
             void *parm = 0;              void *workParm = 0;
             Semaphore *blocking_sem = 0;             Semaphore *blocking_sem = 0;
  
             try             try
             {             {
                 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))                 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
                     myself->reference_tsd("work func");                      myself->reference_tsd(TSD_WORK_FUNC);
                 myself->dereference_tsd();                 myself->dereference_tsd();
                 parm = myself->reference_tsd("work parm");                  workParm = myself->reference_tsd(TSD_WORK_PARM);
                 myself->dereference_tsd();                 myself->dereference_tsd();
                 blocking_sem =                 blocking_sem =
                     (Semaphore *) myself->reference_tsd("blocking sem");                      (Semaphore *) myself->reference_tsd(TSD_BLOCKING_SEM);
                 myself->dereference_tsd();                 myself->dereference_tsd();
             }             }
             catch (...)             catch (...)
             {             {
                 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
                     "ThreadPool::_loop: Failure accessing work func, work "                     "ThreadPool::_loop: Failure accessing work func, work "
                         "parm, or blocking sem.");                         "parm, or blocking sem.");
                 PEGASUS_ASSERT(false);                 PEGASUS_ASSERT(false);
Line 209 
Line 209 
  
             if (work == 0)             if (work == 0)
             {             {
                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
                     "ThreadPool::_loop: work func is 0, meaning we should "                     "ThreadPool::_loop: work func is 0, meaning we should "
                         "exit.");                         "exit.");
                 break;                 break;
Line 219 
Line 219 
  
             try             try
             {             {
                 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
                                  "Work starting.");                                  "Work starting.");
                 work(parm);                  work(workParm);
                 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
                                  "Work finished.");                                  "Work finished.");
             }             }
             catch (Exception& e)             catch (Exception& e)
             {             {
                 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
                     String("Exception from work in ThreadPool::_loop: ") +                      "Exception from work in ThreadPool::_loop: %s",
                         e.getMessage());                      (const char*)e.getMessage().getCString()));
             }             }
 #if !defined(PEGASUS_OS_LSB)  
             catch (const exception& e)             catch (const exception& e)
             {             {
                 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
                     String("Exception from work in ThreadPool::_loop: ") +                      "Exception from work in ThreadPool::_loop: %s",e.what()));
                         e.what());  
             }             }
 #endif  
             catch (...)             catch (...)
             {             {
                 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
                     "Unknown exception from work in ThreadPool::_loop.");                     "Unknown exception from work in ThreadPool::_loop.");
             }             }
  
Line 259 
Line 256 
             }             }
             catch (...)             catch (...)
             {             {
                 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
                     "ThreadPool::_loop: Adding thread to idle pool failed.");                     "ThreadPool::_loop: Adding thread to idle pool failed.");
                 PEGASUS_ASSERT(false);                 PEGASUS_ASSERT(false);
                 pool->_currentThreads--;                 pool->_currentThreads--;
Line 270 
Line 267 
     }     }
     catch (const Exception & e)     catch (const Exception & e)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
             "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");              "Caught exception: \"%s\".  Exiting _loop.",
               (const char*)e.getMessage().getCString()));
     }     }
     catch (...)     catch (...)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
             "Caught unrecognized exception.  Exiting _loop.");             "Caught unrecognized exception.  Exiting _loop.");
     }     }
  
Line 298 
Line 296 
     {     {
         if (_dying.get())         if (_dying.get())
         {         {
             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,              PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
                 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");                 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
             return PEGASUS_THREAD_UNAVAILABLE;             return PEGASUS_THREAD_UNAVAILABLE;
         }         }
Line 319 
Line 317 
  
         if (th == 0)         if (th == 0)
         {         {
             Tracer::trace(TRC_THREAD, Tracer::LEVEL2,              PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
                 "ThreadPool::allocate_and_awaken: Insufficient resources: "                 "ThreadPool::allocate_and_awaken: Insufficient resources: "
                     " pool = %s, running threads = %d, idle threads = %d",                     " pool = %s, running threads = %d, idle threads = %d",
                 _key, _runningThreads.size(), _idleThreads.size());                  _key, _runningThreads.size(), _idleThreads.size()));
             return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;             return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
         }         }
  
         // initialize the thread data with the work function and parameters         // initialize the thread data with the work function and parameters
         Tracer::trace(TRC_THREAD, Tracer::LEVEL4,          PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
             "Initializing thread with work function and parameters: parm = %p",              "Initializing thread(%s)"
             parm);                  " with work function and parameters: parm = %p",
               Threads::id(th->getThreadHandle().thid).buffer,
               parm));
  
         th->delete_tsd("work func");          th->delete_tsd(TSD_WORK_FUNC);
         th->put_tsd("work func", NULL,          th->put_tsd(TSD_WORK_FUNC, NULL,
                     sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)                     sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
                             (void *)), (void *) work);                             (void *)), (void *) work);
         th->delete_tsd("work parm");          th->delete_tsd(TSD_WORK_PARM);
         th->put_tsd("work parm", NULL, sizeof (void *), parm);          th->put_tsd(TSD_WORK_PARM, NULL, sizeof (void *), parm);
         th->delete_tsd("blocking sem");          th->delete_tsd(TSD_BLOCKING_SEM);
         if (blocking != 0)         if (blocking != 0)
             th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);              th->put_tsd(TSD_BLOCKING_SEM, NULL, sizeof (Semaphore *), blocking);
  
         // put the thread on the running list         // put the thread on the running list
         _runningThreads.insert_front(th);         _runningThreads.insert_front(th);
  
         // signal the thread's sleep semaphore to awaken it         // signal the thread's sleep semaphore to awaken it
         Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");          Semaphore *sleep_sem = (Semaphore *) th->reference_tsd(TSD_SLEEP_SEM);
         PEGASUS_ASSERT(sleep_sem != 0);         PEGASUS_ASSERT(sleep_sem != 0);
  
         Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");          PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
               "Signal thread to awaken");
         sleep_sem->signal();         sleep_sem->signal();
         th->dereference_tsd();         th->dereference_tsd();
     }     }
     catch (...)     catch (...)
     {     {
         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
                       "ThreadPool::allocate_and_awaken: Operation Failed.");                       "ThreadPool::allocate_and_awaken: Operation Failed.");
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         // ATTN: Error result has not yet been defined         // ATTN: Error result has not yet been defined
Line 373 
Line 374 
  
     Uint32 numThreadsCleanedUp = 0;     Uint32 numThreadsCleanedUp = 0;
  
     size_t numIdleThreads = _idleThreads.size();      Uint32 numIdleThreads = _idleThreads.size();
     for (size_t i = 0; i < numIdleThreads; i++)      for (Uint32 i = 0; i < numIdleThreads; i++)
     {     {
         // Do not dip below the minimum thread count         // Do not dip below the minimum thread count
         if (_currentThreads.get() <= (Uint32) _minThreads)         if (_currentThreads.get() <= (Uint32) _minThreads)
Line 391 
Line 392 
             break;             break;
         }         }
  
         struct timeval *lastActivityTime;          void* tsd = thread->reference_tsd(TSD_LAST_ACTIVITY_TIME);
         try          struct timeval *lastActivityTime =
         {              reinterpret_cast<struct timeval*>(tsd);
             lastActivityTime =  
                 (struct timeval *) thread->  
                 try_reference_tsd("last activity time");  
             PEGASUS_ASSERT(lastActivityTime != 0);             PEGASUS_ASSERT(lastActivityTime != 0);
         }  
         catch (...)  
         {  
             PEGASUS_ASSERT(false);  
             _idleThreads.insert_back(thread);  
             break;  
         }  
  
         Boolean cleanupThisThread =         Boolean cleanupThisThread =
             _timeIntervalExpired(lastActivityTime, &_deallocateWait);             _timeIntervalExpired(lastActivityTime, &_deallocateWait);
Line 430 
Line 421 
 { {
     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
  
     // Set the "work func" and "work parm" to 0 so _loop() knows to exit.      // Set the TSD_WORK_FUNC and TSD_WORK_PARM to 0 so _loop() knows to exit.
     thread->delete_tsd("work func");      thread->delete_tsd(TSD_WORK_FUNC);
     thread->put_tsd("work func", 0,      thread->put_tsd(TSD_WORK_FUNC, 0,
                     sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)                     sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
                             (void *)), (void *) 0);                             (void *)), (void *) 0);
     thread->delete_tsd("work parm");      thread->delete_tsd(TSD_WORK_PARM);
     thread->put_tsd("work parm", 0, sizeof (void *), 0);      thread->put_tsd(TSD_WORK_PARM, 0, sizeof (void *), 0);
  
     // signal the thread's sleep semaphore to awaken it     // signal the thread's sleep semaphore to awaken it
     Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");      Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd(TSD_SLEEP_SEM);
     PEGASUS_ASSERT(sleep_sem != 0);     PEGASUS_ASSERT(sleep_sem != 0);
     sleep_sem->signal();     sleep_sem->signal();
     thread->dereference_tsd();     thread->dereference_tsd();
Line 454 
Line 445 
     struct timeval* start,     struct timeval* start,
     struct timeval* interval)     struct timeval* interval)
 { {
       PEGASUS_ASSERT(interval != 0);
   
     // never time out if the interval is zero     // never time out if the interval is zero
     if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))      if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
     {     {
         return false;         return false;
     }     }
Line 463 
Line 456 
     struct timeval now, finish, remaining;     struct timeval now, finish, remaining;
     Uint32 usec;     Uint32 usec;
     Time::gettimeofday(&now);     Time::gettimeofday(&now);
     Time::gettimeofday(&remaining);     // Avoid valgrind error  
       memset(&remaining, 0, sizeof(remaining));
  
     finish.tv_sec = start->tv_sec + interval->tv_sec;     finish.tv_sec = start->tv_sec + interval->tv_sec;
     usec = start->tv_usec + interval->tv_usec;     usec = start->tv_usec + interval->tv_usec;
Line 490 
Line 484 
     // we signal the semaphore     // we signal the semaphore
     Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);     Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
     th->put_tsd(     th->put_tsd(
         "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);          TSD_SLEEP_SEM, &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
  
     struct timeval* lastActivityTime =     struct timeval* lastActivityTime =
         (struct timeval *)::operator  new(sizeof (struct timeval));         (struct timeval *)::operator  new(sizeof (struct timeval));
     Time::gettimeofday(lastActivityTime);     Time::gettimeofday(lastActivityTime);
  
     th->put_tsd(     th->put_tsd(
         "last activity time",          TSD_LAST_ACTIVITY_TIME,
         thread_data::default_delete,         thread_data::default_delete,
         sizeof(struct timeval),         sizeof(struct timeval),
         (void*) lastActivityTime);         (void*) lastActivityTime);
Line 505 
Line 499 
  
     if (th->run() != PEGASUS_THREAD_OK)     if (th->run() != PEGASUS_THREAD_OK)
     {     {
         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,          PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
             "Could not create thread. Error code is %d.", errno);              "Could not create thread. Error code is %d.", errno));
         delete th;         delete th;
         return 0;         return 0;
     }     }
     _currentThreads++;     _currentThreads++;
     Threads::yield();  
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
     return th;     return th;
Line 521 
Line 514 
 { {
     if (th == 0)     if (th == 0)
     {     {
         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
             "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");             "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
         throw NullPointer();         throw NullPointer();
     }     }
Line 532 
Line 525 
     }     }
     catch (...)     catch (...)
     {     {
         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
             "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "             "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
                 "failed.");                 "failed.");
     }     }


Legend:
Removed from v.1.5  
changed lines
  Added in v.1.21

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2