(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           //%/////////////////////////////////////////////////////////////////////////////
 33           
 34           #include "ThreadPool.h"
 35           #include "Thread.h"
 36           #include <exception>
 37           #include <Pegasus/Common/Tracer.h>
 38           #include "Time.h"
 39           
 40           PEGASUS_USING_STD;
 41           
 42           PEGASUS_NAMESPACE_BEGIN
 43 mike  1.2 
 44           ///////////////////////////////////////////////////////////////////////////////
 45           //
 46           // ThreadPool
 47           //
 48           ///////////////////////////////////////////////////////////////////////////////
 49           
 50 kumpf 1.4 ThreadPool::ThreadPool(
 51               Sint16 initialSize,
 52               const char* key,
 53               Sint16 minThreads,
 54               Sint16 maxThreads,
 55               struct timeval
 56               &deallocateWait)
 57               : _maxThreads(maxThreads),
 58                 _minThreads(minThreads),
 59                 _currentThreads(0),
 60                 _idleThreads(),
 61                 _runningThreads(),
 62                 _dying(0)
 63 mike  1.2 {
 64               _deallocateWait.tv_sec = deallocateWait.tv_sec;
 65               _deallocateWait.tv_usec = deallocateWait.tv_usec;
 66           
 67               memset(_key, 0x00, 17);
 68               if (key != 0)
 69               {
 70                   strncpy(_key, key, 16);
 71               }
 72           
 73               if ((_maxThreads > 0) && (_maxThreads < initialSize))
 74               {
 75                   _maxThreads = initialSize;
 76               }
 77           
 78               if (_minThreads > initialSize)
 79               {
 80                   _minThreads = initialSize;
 81               }
 82           
 83               for (int i = 0; i < initialSize; i++)
 84 mike  1.2     {
 85                   _addToIdleThreadsQueue(_initializeThread());
 86               }
 87           }
 88           
 89           ThreadPool::~ThreadPool()
 90           {
 91               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 92           
 93               try
 94               {
 95                   // Set the dying flag so all thread know the destructor has been
 96                   // entered
 97                   _dying++;
 98 mike  1.14.12.3.2.1         PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
 99 marek 1.9                       "Cleaning up %d idle threads.", _currentThreads.get()));
100 mike  1.2           
101                             while (_currentThreads.get() > 0)
102                             {
103 kumpf 1.4                       Thread* thread = _idleThreads.remove_front();
104 mike  1.2                       if (thread != 0)
105                                 {
106                                     _cleanupThread(thread);
107                                     _currentThreads--;
108                                 }
109                                 else
110                                 {
111                                     Threads::yield();
112                                 }
113                             }
114                         }
115 kumpf 1.4               catch (...)
116 mike  1.2               {
117                         }
118                     }
119                     
120 kumpf 1.4           ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
121 mike  1.2           {
122                         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123                     
124                         try
125                         {
126                             Thread *myself = (Thread *) parm;
127                             PEGASUS_ASSERT(myself != 0);
128                     
129                             // Set myself into thread specific storage
130                             // This will allow code to get its own Thread
131                             Thread::setCurrent(myself);
132                     
133                             ThreadPool *pool = (ThreadPool *) myself->get_parm();
134                             PEGASUS_ASSERT(pool != 0);
135                     
136                             Semaphore *sleep_sem = 0;
137                             struct timeval *lastActivityTime = 0;
138                     
139                             try
140                             {
141                                 sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike  1.2                       myself->dereference_tsd();
143                                 PEGASUS_ASSERT(sleep_sem != 0);
144                     
145                                 lastActivityTime =
146                                     (struct timeval *) myself->
147                                     reference_tsd("last activity time");
148                                 myself->dereference_tsd();
149                                 PEGASUS_ASSERT(lastActivityTime != 0);
150                             }
151 kumpf 1.4                   catch (...)
152 mike  1.2                   {
153 mike  1.14.12.3.2.1             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
154 kumpf 1.4                           "ThreadPool::_loop: Failure getting sleep_sem or "
155                                         "lastActivityTime.");
156 mike  1.2                       PEGASUS_ASSERT(false);
157                                 pool->_idleThreads.remove(myself);
158                                 pool->_currentThreads--;
159                                 PEG_METHOD_EXIT();
160 kumpf 1.4                       return (ThreadReturnType) 1;
161 mike  1.2                   }
162                     
163                             while (1)
164                             {
165                                 try
166                                 {
167                                     sleep_sem->wait();
168                                 }
169 kumpf 1.4                       catch (...)
170 mike  1.2                       {
171 mike  1.14.12.3.2.1                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
172 kumpf 1.4                               "ThreadPool::_loop: failure on sleep_sem->wait().");
173 mike  1.2                           PEGASUS_ASSERT(false);
174                                     pool->_idleThreads.remove(myself);
175                                     pool->_currentThreads--;
176                                     PEG_METHOD_EXIT();
177 kumpf 1.4                           return (ThreadReturnType) 1;
178 mike  1.2                       }
179                     
180                                 // When we awaken we reside on the _runningThreads queue, not the
181                                 // _idleThreads queue.
182                     
183                                 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
184                                 void *parm = 0;
185                                 Semaphore *blocking_sem = 0;
186                     
187                                 try
188                                 {
189                                     work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190                                         myself->reference_tsd("work func");
191                                     myself->dereference_tsd();
192                                     parm = myself->reference_tsd("work parm");
193                                     myself->dereference_tsd();
194                                     blocking_sem =
195                                         (Semaphore *) myself->reference_tsd("blocking sem");
196                                     myself->dereference_tsd();
197                                 }
198 kumpf 1.4                       catch (...)
199 mike  1.2                       {
200 mike  1.14.12.3.2.1                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
201 kumpf 1.4                               "ThreadPool::_loop: Failure accessing work func, work "
202                                             "parm, or blocking sem.");
203 mike  1.2                           PEGASUS_ASSERT(false);
204                                     pool->_idleThreads.remove(myself);
205                                     pool->_currentThreads--;
206                                     PEG_METHOD_EXIT();
207 kumpf 1.4                           return (ThreadReturnType) 1;
208 mike  1.2                       }
209                     
210                                 if (work == 0)
211                                 {
212 marek 1.9                           PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
213 kumpf 1.4                               "ThreadPool::_loop: work func is 0, meaning we should "
214                                             "exit.");
215 mike  1.2                           break;
216                                 }
217                     
218                                 Time::gettimeofday(lastActivityTime);
219                     
220                                 try
221                                 {
222 marek 1.9                           PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
223 mike  1.2                                            "Work starting.");
224                                     work(parm);
225 marek 1.9                           PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
226 mike  1.2                                            "Work finished.");
227                                 }
228 kumpf 1.4                       catch (Exception& e)
229 mike  1.2                       {
230 mike  1.14.12.3.2.1                 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
231 kumpf 1.4                               String("Exception from work in ThreadPool::_loop: ") +
232                                             e.getMessage());
233 mike  1.2                       }
234 kumpf 1.4                       catch (const exception& e)
235 mike  1.2                       {
236 mike  1.14.12.3.2.1                 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
237 kumpf 1.4                               String("Exception from work in ThreadPool::_loop: ") +
238                                             e.what());
239 mike  1.2                       }
240 kumpf 1.4                       catch (...)
241 mike  1.2                       {
242 mike  1.14.12.3.2.1                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
243 kumpf 1.4                               "Unknown exception from work in ThreadPool::_loop.");
244 mike  1.2                       }
245                     
246                                 // put myself back onto the available list
247                                 try
248                                 {
249                                     Time::gettimeofday(lastActivityTime);
250                                     if (blocking_sem != 0)
251                                     {
252                                         blocking_sem->signal();
253                                     }
254                     
255                                     pool->_runningThreads.remove(myself);
256                                     pool->_idleThreads.insert_front(myself);
257                                 }
258 kumpf 1.4                       catch (...)
259 mike  1.2                       {
260 mike  1.14.12.3.2.1                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
261 kumpf 1.4                               "ThreadPool::_loop: Adding thread to idle pool failed.");
262 mike  1.2                           PEGASUS_ASSERT(false);
263                                     pool->_currentThreads--;
264                                     PEG_METHOD_EXIT();
265 kumpf 1.4                           return (ThreadReturnType) 1;
266 mike  1.2                       }
267                             }
268                         }
269 kumpf 1.4               catch (const Exception & e)
270 mike  1.2               {
271 mike  1.14.12.3.2.1         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
272 kumpf 1.4                       "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
273 mike  1.2               }
274 kumpf 1.4               catch (...)
275 mike  1.2               {
276 mike  1.14.12.3.2.1         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
277 kumpf 1.4                       "Caught unrecognized exception.  Exiting _loop.");
278 mike  1.2               }
279                     
280                         PEG_METHOD_EXIT();
281 kumpf 1.4               return (ThreadReturnType) 0;
282 mike  1.2           }
283                     
284 kumpf 1.4           ThreadStatus ThreadPool::allocate_and_awaken(
285                         void* parm,
286                         ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
287                         Semaphore* blocking)
288 mike  1.2           {
289                         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
290                     
291                         // Allocate_and_awaken will not run if the _dying flag is set.
292                         // Once the lock is acquired, ~ThreadPool will not change
293                         // the value of _dying until the lock is released.
294                     
295                         try
296                         {
297                             if (_dying.get())
298                             {
299 mike  1.14.12.3.2.1             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
300 kumpf 1.4                           "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
301 mike  1.2                       return PEGASUS_THREAD_UNAVAILABLE;
302                             }
303                             struct timeval start;
304                             Time::gettimeofday(&start);
305                             Thread *th = 0;
306                     
307                             th = _idleThreads.remove_front();
308                     
309                             if (th == 0)
310                             {
311                                 if ((_maxThreads == 0) ||
312                                     (_currentThreads.get() < Uint32(_maxThreads)))
313                                 {
314                                     th = _initializeThread();
315                                 }
316                             }
317                     
318                             if (th == 0)
319                             {
320 mike  1.14.12.3.2.1             PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
321 kumpf 1.3                           "ThreadPool::allocate_and_awaken: Insufficient resources: "
322                                         " pool = %s, running threads = %d, idle threads = %d",
323 marek 1.9                           _key, _runningThreads.size(), _idleThreads.size()));
324 mike  1.2                       return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
325                             }
326                     
327                             // initialize the thread data with the work function and parameters
328 marek 1.9                   PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
329 marek 1.14                      "Initializing thread(%s)"
330                                     " with work function and parameters: parm = %p",
331                                 Threads::id(th->getThreadHandle().thid).buffer,
332 marek 1.9                       parm));
333 mike  1.2           
334                             th->delete_tsd("work func");
335                             th->put_tsd("work func", NULL,
336                                         sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
337                                                 (void *)), (void *) work);
338                             th->delete_tsd("work parm");
339                             th->put_tsd("work parm", NULL, sizeof (void *), parm);
340                             th->delete_tsd("blocking sem");
341                             if (blocking != 0)
342                                 th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
343                     
344                             // put the thread on the running list
345                             _runningThreads.insert_front(th);
346                     
347                             // signal the thread's sleep semaphore to awaken it
348                             Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
349                             PEGASUS_ASSERT(sleep_sem != 0);
350                     
351 kumpf 1.10                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
352                                 "Signal thread to awaken");
353 mike  1.2                   sleep_sem->signal();
354                             th->dereference_tsd();
355                         }
356 kumpf 1.4               catch (...)
357 mike  1.2               {
358 mike  1.14.12.3.2.1         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
359 mike  1.2                                 "ThreadPool::allocate_and_awaken: Operation Failed.");
360                             PEG_METHOD_EXIT();
361                             // ATTN: Error result has not yet been defined
362                             return PEGASUS_THREAD_SETUP_FAILURE;
363                         }
364                         PEG_METHOD_EXIT();
365                         return PEGASUS_THREAD_OK;
366                     }
367                     
368                     // caller is responsible for only calling this routine during slack periods
369                     // but should call it at least once per _deallocateWait interval.
370                     
371                     Uint32 ThreadPool::cleanupIdleThreads()
372                     {
373                         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
374                     
375                         Uint32 numThreadsCleanedUp = 0;
376                     
377 kumpf 1.7               Uint32 numIdleThreads = _idleThreads.size();
378                         for (Uint32 i = 0; i < numIdleThreads; i++)
379 mike  1.2               {
380                             // Do not dip below the minimum thread count
381                             if (_currentThreads.get() <= (Uint32) _minThreads)
382                             {
383                                 break;
384                             }
385                     
386                             Thread *thread = _idleThreads.remove_back();
387                     
388                             // If there are no more threads in the _idleThreads queue, we're
389                             // done.
390                             if (thread == 0)
391                             {
392                                 break;
393                             }
394                     
395                             struct timeval *lastActivityTime;
396                             try
397                             {
398                                 lastActivityTime =
399                                     (struct timeval *) thread->
400 mike  1.2                           try_reference_tsd("last activity time");
401                                 PEGASUS_ASSERT(lastActivityTime != 0);
402                             }
403 kumpf 1.4                   catch (...)
404 mike  1.2                   {
405                                 PEGASUS_ASSERT(false);
406                                 _idleThreads.insert_back(thread);
407                                 break;
408                             }
409                     
410                             Boolean cleanupThisThread =
411                                 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
412                             thread->dereference_tsd();
413                     
414                             if (cleanupThisThread)
415                             {
416                                 _cleanupThread(thread);
417                                 _currentThreads--;
418                                 numThreadsCleanedUp++;
419                             }
420                             else
421                             {
422                                 _idleThreads.insert_front(thread);
423                             }
424                         }
425 mike  1.2           
426                         PEG_METHOD_EXIT();
427                         return numThreadsCleanedUp;
428                     }
429                     
430                     void ThreadPool::_cleanupThread(Thread * thread)
431                     {
432                         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
433                     
434                         // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
435                         thread->delete_tsd("work func");
436                         thread->put_tsd("work func", 0,
437                                         sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
438                                                 (void *)), (void *) 0);
439                         thread->delete_tsd("work parm");
440                         thread->put_tsd("work parm", 0, sizeof (void *), 0);
441                     
442                         // signal the thread's sleep semaphore to awaken it
443                         Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
444                         PEGASUS_ASSERT(sleep_sem != 0);
445                         sleep_sem->signal();
446 mike  1.2               thread->dereference_tsd();
447                     
448                         thread->join();
449                         delete thread;
450                     
451                         PEG_METHOD_EXIT();
452                     }
453                     
454 kumpf 1.4           Boolean ThreadPool::_timeIntervalExpired(
455                         struct timeval* start,
456                         struct timeval* interval)
457 mike  1.2           {
458 kumpf 1.6               PEGASUS_ASSERT(interval != 0);
459                     
460 mike  1.2               // never time out if the interval is zero
461 kumpf 1.6               if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
462 mike  1.2               {
463                             return false;
464                         }
465                     
466                         struct timeval now, finish, remaining;
467                         Uint32 usec;
468                         Time::gettimeofday(&now);
469 mike  1.14.12.1     
470 mike  1.14.12.3     #if defined(PEGASUS_OS_SOLARIS)
471 mike  1.14.12.1         memset(&remaining, 0, sizeof(remaining));
472                     #else
473 mike  1.2               Time::gettimeofday(&remaining);     // Avoid valgrind error
474 mike  1.14.12.1     #endif
475 mike  1.2           
476                         finish.tv_sec = start->tv_sec + interval->tv_sec;
477                         usec = start->tv_usec + interval->tv_usec;
478                         finish.tv_sec += (usec / 1000000);
479                         usec %= 1000000;
480                         finish.tv_usec = usec;
481                     
482                         return (Time::subtract(&remaining, &finish, &now) != 0);
483                     }
484                     
485                     void ThreadPool::_deleteSemaphore(void *p)
486                     {
487                         delete(Semaphore *) p;
488                     }
489                     
490                     Thread *ThreadPool::_initializeThread()
491                     {
492                         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
493                     
494                         Thread *th = (Thread *) new Thread(_loop, this, false);
495                     
496 mike  1.2               // allocate a sleep semaphore and pass it in the thread context
497                         // initial count is zero, loop function will sleep until
498                         // we signal the semaphore
499                         Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
500 kumpf 1.4               th->put_tsd(
501                             "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
502 mike  1.2           
503 kumpf 1.4               struct timeval* lastActivityTime =
504 mike  1.2                   (struct timeval *)::operator  new(sizeof (struct timeval));
505                         Time::gettimeofday(lastActivityTime);
506                     
507 kumpf 1.4               th->put_tsd(
508                             "last activity time",
509                             thread_data::default_delete,
510                             sizeof(struct timeval),
511                             (void*) lastActivityTime);
512 mike  1.2               // thread will enter _loop() and sleep on sleep_sem until we signal it
513                     
514                         if (th->run() != PEGASUS_THREAD_OK)
515                         {
516 mike  1.14.12.3.2.1         PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
517 marek 1.9                       "Could not create thread. Error code is %d.", errno));
518 mike  1.2                   delete th;
519                             return 0;
520                         }
521                         _currentThreads++;
522                     
523                         PEG_METHOD_EXIT();
524                         return th;
525                     }
526                     
527                     void ThreadPool::_addToIdleThreadsQueue(Thread * th)
528                     {
529                         if (th == 0)
530                         {
531 mike  1.14.12.3.2.1         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
532 kumpf 1.4                       "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
533 mike  1.2                   throw NullPointer();
534                         }
535                     
536                         try
537                         {
538                             _idleThreads.insert_front(th);
539                         }
540 kumpf 1.4               catch (...)
541 mike  1.2               {
542 mike  1.14.12.3.2.1         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
543 kumpf 1.4                       "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
544                                     "failed.");
545 mike  1.2               }
546                     }
547                     
548                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2