(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           //%/////////////////////////////////////////////////////////////////////////////
 33           
 34           #include "ThreadPool.h"
 35           #include "Thread.h"
 36           #include <exception>
 37           #include <Pegasus/Common/Tracer.h>
 38           #include "Time.h"
 39           
 40           PEGASUS_USING_STD;
 41           
 42           PEGASUS_NAMESPACE_BEGIN
 43 mike  1.2 
 44           ///////////////////////////////////////////////////////////////////////////////
 45           //
 46           // ThreadPool
 47           //
 48           ///////////////////////////////////////////////////////////////////////////////
 49           
 50 kumpf 1.4 ThreadPool::ThreadPool(
 51               Sint16 initialSize,
 52               const char* key,
 53               Sint16 minThreads,
 54               Sint16 maxThreads,
 55               struct timeval
 56               &deallocateWait)
 57               : _maxThreads(maxThreads),
 58                 _minThreads(minThreads),
 59                 _currentThreads(0),
 60                 _idleThreads(),
 61                 _runningThreads(),
 62                 _dying(0)
 63 mike  1.2 {
 64               _deallocateWait.tv_sec = deallocateWait.tv_sec;
 65               _deallocateWait.tv_usec = deallocateWait.tv_usec;
 66           
 67               memset(_key, 0x00, 17);
 68               if (key != 0)
 69               {
 70                   strncpy(_key, key, 16);
 71               }
 72           
 73               if ((_maxThreads > 0) && (_maxThreads < initialSize))
 74               {
 75                   _maxThreads = initialSize;
 76               }
 77           
 78               if (_minThreads > initialSize)
 79               {
 80                   _minThreads = initialSize;
 81               }
 82           
 83               for (int i = 0; i < initialSize; i++)
 84 mike  1.2     {
 85                   _addToIdleThreadsQueue(_initializeThread());
 86               }
 87           }
 88           
 89           ThreadPool::~ThreadPool()
 90           {
 91               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 92           
 93               try
 94               {
 95                   // Set the dying flag so all thread know the destructor has been
 96                   // entered
 97                   _dying++;
 98                   Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
 99 kumpf 1.4             "Cleaning up %d idle threads.", _currentThreads.get());
100 mike  1.2 
101                   while (_currentThreads.get() > 0)
102                   {
103 kumpf 1.4             Thread* thread = _idleThreads.remove_front();
104 mike  1.2             if (thread != 0)
105                       {
106                           _cleanupThread(thread);
107                           _currentThreads--;
108                       }
109                       else
110                       {
111                           Threads::yield();
112                       }
113                   }
114               }
115 kumpf 1.4     catch (...)
116 mike  1.2     {
117               }
118           }
119           
120 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
121 mike  1.2 {
122               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123           
124               try
125               {
126                   Thread *myself = (Thread *) parm;
127                   PEGASUS_ASSERT(myself != 0);
128           
129                   // Set myself into thread specific storage
130                   // This will allow code to get its own Thread
131                   Thread::setCurrent(myself);
132           
133                   ThreadPool *pool = (ThreadPool *) myself->get_parm();
134                   PEGASUS_ASSERT(pool != 0);
135           
136                   Semaphore *sleep_sem = 0;
137                   struct timeval *lastActivityTime = 0;
138           
139                   try
140                   {
141                       sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike  1.2             myself->dereference_tsd();
143                       PEGASUS_ASSERT(sleep_sem != 0);
144           
145                       lastActivityTime =
146                           (struct timeval *) myself->
147                           reference_tsd("last activity time");
148                       myself->dereference_tsd();
149                       PEGASUS_ASSERT(lastActivityTime != 0);
150                   }
151 kumpf 1.4         catch (...)
152 mike  1.2         {
153                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
154 kumpf 1.4                 "ThreadPool::_loop: Failure getting sleep_sem or "
155                               "lastActivityTime.");
156 mike  1.2             PEGASUS_ASSERT(false);
157                       pool->_idleThreads.remove(myself);
158                       pool->_currentThreads--;
159                       PEG_METHOD_EXIT();
160 kumpf 1.4             return (ThreadReturnType) 1;
161 mike  1.2         }
162           
163                   while (1)
164                   {
165                       try
166                       {
167                           sleep_sem->wait();
168                       }
169 kumpf 1.4             catch (...)
170 mike  1.2             {
171                           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
172 kumpf 1.4                     "ThreadPool::_loop: failure on sleep_sem->wait().");
173 mike  1.2                 PEGASUS_ASSERT(false);
174                           pool->_idleThreads.remove(myself);
175                           pool->_currentThreads--;
176                           PEG_METHOD_EXIT();
177 kumpf 1.4                 return (ThreadReturnType) 1;
178 mike  1.2             }
179           
180                       // When we awaken we reside on the _runningThreads queue, not the
181                       // _idleThreads queue.
182           
183                       ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
184                       void *parm = 0;
185                       Semaphore *blocking_sem = 0;
186           
187                       try
188                       {
189                           work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190                               myself->reference_tsd("work func");
191                           myself->dereference_tsd();
192                           parm = myself->reference_tsd("work parm");
193                           myself->dereference_tsd();
194                           blocking_sem =
195                               (Semaphore *) myself->reference_tsd("blocking sem");
196                           myself->dereference_tsd();
197                       }
198 kumpf 1.4             catch (...)
199 mike  1.2             {
200                           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
201 kumpf 1.4                     "ThreadPool::_loop: Failure accessing work func, work "
202                                   "parm, or blocking sem.");
203 mike  1.2                 PEGASUS_ASSERT(false);
204                           pool->_idleThreads.remove(myself);
205                           pool->_currentThreads--;
206                           PEG_METHOD_EXIT();
207 kumpf 1.4                 return (ThreadReturnType) 1;
208 mike  1.2             }
209           
210                       if (work == 0)
211                       {
212                           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
213 kumpf 1.4                     "ThreadPool::_loop: work func is 0, meaning we should "
214                                   "exit.");
215 mike  1.2                 break;
216                       }
217           
218                       Time::gettimeofday(lastActivityTime);
219           
220                       try
221                       {
222                           PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
223                                            "Work starting.");
224                           work(parm);
225                           PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
226                                            "Work finished.");
227                       }
228 kumpf 1.4             catch (Exception& e)
229 mike  1.2             {
230                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
231 kumpf 1.4                     String("Exception from work in ThreadPool::_loop: ") +
232                                   e.getMessage());
233 mike  1.2             }
234           #if !defined(PEGASUS_OS_LSB)
235 kumpf 1.4             catch (const exception& e)
236 mike  1.2             {
237                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
238 kumpf 1.4                     String("Exception from work in ThreadPool::_loop: ") +
239                                   e.what());
240 mike  1.2             }
241           #endif
242 kumpf 1.4             catch (...)
243 mike  1.2             {
244                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
245 kumpf 1.4                     "Unknown exception from work in ThreadPool::_loop.");
246 mike  1.2             }
247           
248                       // put myself back onto the available list
249                       try
250                       {
251                           Time::gettimeofday(lastActivityTime);
252                           if (blocking_sem != 0)
253                           {
254                               blocking_sem->signal();
255                           }
256           
257                           pool->_runningThreads.remove(myself);
258                           pool->_idleThreads.insert_front(myself);
259                       }
260 kumpf 1.4             catch (...)
261 mike  1.2             {
262                           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
263 kumpf 1.4                     "ThreadPool::_loop: Adding thread to idle pool failed.");
264 mike  1.2                 PEGASUS_ASSERT(false);
265                           pool->_currentThreads--;
266                           PEG_METHOD_EXIT();
267 kumpf 1.4                 return (ThreadReturnType) 1;
268 mike  1.2             }
269                   }
270               }
271 kumpf 1.4     catch (const Exception & e)
272 mike  1.2     {
273                   PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
274 kumpf 1.4             "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
275 mike  1.2     }
276 kumpf 1.4     catch (...)
277 mike  1.2     {
278                   PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
279 kumpf 1.4             "Caught unrecognized exception.  Exiting _loop.");
280 mike  1.2     }
281           
282               PEG_METHOD_EXIT();
283 kumpf 1.4     return (ThreadReturnType) 0;
284 mike  1.2 }
285           
286 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
287               void* parm,
288               ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
289               Semaphore* blocking)
290 mike  1.2 {
291               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
292           
293               // Allocate_and_awaken will not run if the _dying flag is set.
294               // Once the lock is acquired, ~ThreadPool will not change
295               // the value of _dying until the lock is released.
296           
297               try
298               {
299                   if (_dying.get())
300                   {
301                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
302 kumpf 1.4                 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
303 mike  1.2             return PEGASUS_THREAD_UNAVAILABLE;
304                   }
305                   struct timeval start;
306                   Time::gettimeofday(&start);
307                   Thread *th = 0;
308           
309                   th = _idleThreads.remove_front();
310           
311                   if (th == 0)
312                   {
313                       if ((_maxThreads == 0) ||
314                           (_currentThreads.get() < Uint32(_maxThreads)))
315                       {
316                           th = _initializeThread();
317                       }
318                   }
319           
320                   if (th == 0)
321                   {
322 kumpf 1.3             Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
323                           "ThreadPool::allocate_and_awaken: Insufficient resources: "
324                               " pool = %s, running threads = %d, idle threads = %d",
325                           _key, _runningThreads.size(), _idleThreads.size());
326 mike  1.2             return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
327                   }
328           
329                   // initialize the thread data with the work function and parameters
330                   Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
331 kumpf 1.4             "Initializing thread with work function and parameters: parm = %p",
332                       parm);
333 mike  1.2 
334                   th->delete_tsd("work func");
335                   th->put_tsd("work func", NULL,
336                               sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
337                                       (void *)), (void *) work);
338                   th->delete_tsd("work parm");
339                   th->put_tsd("work parm", NULL, sizeof (void *), parm);
340                   th->delete_tsd("blocking sem");
341                   if (blocking != 0)
342                       th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
343           
344                   // put the thread on the running list
345                   _runningThreads.insert_front(th);
346           
347                   // signal the thread's sleep semaphore to awaken it
348                   Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
349                   PEGASUS_ASSERT(sleep_sem != 0);
350           
351                   Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
352                   sleep_sem->signal();
353                   th->dereference_tsd();
354 mike  1.2     }
355 kumpf 1.4     catch (...)
356 mike  1.2     {
357                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
358                                 "ThreadPool::allocate_and_awaken: Operation Failed.");
359                   PEG_METHOD_EXIT();
360                   // ATTN: Error result has not yet been defined
361                   return PEGASUS_THREAD_SETUP_FAILURE;
362               }
363               PEG_METHOD_EXIT();
364               return PEGASUS_THREAD_OK;
365           }
366           
367           // caller is responsible for only calling this routine during slack periods
368           // but should call it at least once per _deallocateWait interval.
369           
370           Uint32 ThreadPool::cleanupIdleThreads()
371           {
372               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
373           
374               Uint32 numThreadsCleanedUp = 0;
375           
376               Uint32 numIdleThreads = _idleThreads.size();
377 mike  1.2     for (Uint32 i = 0; i < numIdleThreads; i++)
378               {
379                   // Do not dip below the minimum thread count
380                   if (_currentThreads.get() <= (Uint32) _minThreads)
381                   {
382                       break;
383                   }
384           
385                   Thread *thread = _idleThreads.remove_back();
386           
387                   // If there are no more threads in the _idleThreads queue, we're
388                   // done.
389                   if (thread == 0)
390                   {
391                       break;
392                   }
393           
394                   struct timeval *lastActivityTime;
395                   try
396                   {
397                       lastActivityTime =
398 mike  1.2                 (struct timeval *) thread->
399                           try_reference_tsd("last activity time");
400                       PEGASUS_ASSERT(lastActivityTime != 0);
401                   }
402 kumpf 1.4         catch (...)
403 mike  1.2         {
404                       PEGASUS_ASSERT(false);
405                       _idleThreads.insert_back(thread);
406                       break;
407                   }
408           
409                   Boolean cleanupThisThread =
410                       _timeIntervalExpired(lastActivityTime, &_deallocateWait);
411                   thread->dereference_tsd();
412           
413                   if (cleanupThisThread)
414                   {
415                       _cleanupThread(thread);
416                       _currentThreads--;
417                       numThreadsCleanedUp++;
418                   }
419                   else
420                   {
421                       _idleThreads.insert_front(thread);
422                   }
423               }
424 mike  1.2 
425               PEG_METHOD_EXIT();
426               return numThreadsCleanedUp;
427           }
428           
429           void ThreadPool::_cleanupThread(Thread * thread)
430           {
431               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
432           
433               // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
434               thread->delete_tsd("work func");
435               thread->put_tsd("work func", 0,
436                               sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
437                                       (void *)), (void *) 0);
438               thread->delete_tsd("work parm");
439               thread->put_tsd("work parm", 0, sizeof (void *), 0);
440           
441               // signal the thread's sleep semaphore to awaken it
442               Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
443               PEGASUS_ASSERT(sleep_sem != 0);
444               sleep_sem->signal();
445 mike  1.2     thread->dereference_tsd();
446           
447               thread->join();
448               delete thread;
449           
450               PEG_METHOD_EXIT();
451           }
452           
453 kumpf 1.4 Boolean ThreadPool::_timeIntervalExpired(
454               struct timeval* start,
455               struct timeval* interval)
456 mike  1.2 {
457               // never time out if the interval is zero
458               if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
459               {
460                   return false;
461               }
462           
463               struct timeval now, finish, remaining;
464               Uint32 usec;
465               Time::gettimeofday(&now);
466               Time::gettimeofday(&remaining);     // Avoid valgrind error
467           
468               finish.tv_sec = start->tv_sec + interval->tv_sec;
469               usec = start->tv_usec + interval->tv_usec;
470               finish.tv_sec += (usec / 1000000);
471               usec %= 1000000;
472               finish.tv_usec = usec;
473           
474               return (Time::subtract(&remaining, &finish, &now) != 0);
475           }
476           
477 mike  1.2 void ThreadPool::_deleteSemaphore(void *p)
478           {
479               delete(Semaphore *) p;
480           }
481           
482           Thread *ThreadPool::_initializeThread()
483           {
484               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
485           
486               Thread *th = (Thread *) new Thread(_loop, this, false);
487           
488               // allocate a sleep semaphore and pass it in the thread context
489               // initial count is zero, loop function will sleep until
490               // we signal the semaphore
491               Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
492 kumpf 1.4     th->put_tsd(
493                   "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
494 mike  1.2 
495 kumpf 1.4     struct timeval* lastActivityTime =
496 mike  1.2         (struct timeval *)::operator  new(sizeof (struct timeval));
497               Time::gettimeofday(lastActivityTime);
498           
499 kumpf 1.4     th->put_tsd(
500                   "last activity time",
501                   thread_data::default_delete,
502                   sizeof(struct timeval),
503                   (void*) lastActivityTime);
504 mike  1.2     // thread will enter _loop() and sleep on sleep_sem until we signal it
505           
506               if (th->run() != PEGASUS_THREAD_OK)
507               {
508                   Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
509 kumpf 1.4             "Could not create thread. Error code is %d.", errno);
510 mike  1.2         delete th;
511                   return 0;
512               }
513               _currentThreads++;
514               Threads::yield();
515           
516               PEG_METHOD_EXIT();
517               return th;
518           }
519           
520           void ThreadPool::_addToIdleThreadsQueue(Thread * th)
521           {
522               if (th == 0)
523               {
524                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
525 kumpf 1.4             "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
526 mike  1.2         throw NullPointer();
527               }
528           
529               try
530               {
531                   _idleThreads.insert_front(th);
532               }
533 kumpf 1.4     catch (...)
534 mike  1.2     {
535                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
536 kumpf 1.4             "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
537                           "failed.");
538 mike  1.2     }
539           }
540           
541           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2