(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           // Author: Mike Day (mdday@us.ibm.com)
 33           //
 34           // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 35           //              added nsk platform support
 36           //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 37           //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 38           //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 39           //              David Dillard, VERITAS Software Corp.
 40           //                  (david.dillard@veritas.com)
 41           //
 42           //%/////////////////////////////////////////////////////////////////////////////
 43 mike  1.2 
 44           #include "ThreadPool.h"
 45           #include "Thread.h"
 46           #include <exception>
 47           #include <Pegasus/Common/Tracer.h>
 48           #include "Time.h"
 49           
 50           PEGASUS_USING_STD;
 51           
 52           PEGASUS_NAMESPACE_BEGIN
 53           
 54           ///////////////////////////////////////////////////////////////////////////////
 55           //
 56           // ThreadPool
 57           //
 58           ///////////////////////////////////////////////////////////////////////////////
 59           
 60           ThreadPool::ThreadPool(Sint16 initialSize,
 61                                  const char *key,
 62                                  Sint16 minThreads,
 63                                  Sint16 maxThreads,
 64 mike  1.2                        struct timeval
 65                                  &deallocateWait):_maxThreads(maxThreads),
 66           _minThreads(minThreads), _currentThreads(0), _idleThreads(),
 67           _runningThreads(), _dying(0)
 68           {
 69               _deallocateWait.tv_sec = deallocateWait.tv_sec;
 70               _deallocateWait.tv_usec = deallocateWait.tv_usec;
 71           
 72               memset(_key, 0x00, 17);
 73               if (key != 0)
 74               {
 75                   strncpy(_key, key, 16);
 76               }
 77           
 78               if ((_maxThreads > 0) && (_maxThreads < initialSize))
 79               {
 80                   _maxThreads = initialSize;
 81               }
 82           
 83               if (_minThreads > initialSize)
 84               {
 85 mike  1.2         _minThreads = initialSize;
 86               }
 87           
 88               for (int i = 0; i < initialSize; i++)
 89               {
 90                   _addToIdleThreadsQueue(_initializeThread());
 91               }
 92           }
 93           
 94           ThreadPool::~ThreadPool()
 95           {
 96               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 97           
 98               try
 99               {
100                   // Set the dying flag so all thread know the destructor has been
101                   // entered
102                   _dying++;
103                   Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
104                                 "Cleaning up %d idle threads. ", _currentThreads.get());
105           
106 mike  1.2         while (_currentThreads.get() > 0)
107                   {
108                       Thread *thread = _idleThreads.remove_front();
109                       if (thread != 0)
110                       {
111                           _cleanupThread(thread);
112                           _currentThreads--;
113                       }
114                       else
115                       {
116                           Threads::yield();
117                       }
118                   }
119               }
120               catch(...)
121               {
122               }
123           }
124           
125           ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
126           {
127 mike  1.2     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
128           
129               try
130               {
131                   Thread *myself = (Thread *) parm;
132                   PEGASUS_ASSERT(myself != 0);
133           
134                   // Set myself into thread specific storage
135                   // This will allow code to get its own Thread
136                   Thread::setCurrent(myself);
137           
138                   ThreadPool *pool = (ThreadPool *) myself->get_parm();
139                   PEGASUS_ASSERT(pool != 0);
140           
141                   Semaphore *sleep_sem = 0;
142                   struct timeval *lastActivityTime = 0;
143           
144                   try
145                   {
146                       sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
147                       myself->dereference_tsd();
148 mike  1.2             PEGASUS_ASSERT(sleep_sem != 0);
149           
150                       lastActivityTime =
151                           (struct timeval *) myself->
152                           reference_tsd("last activity time");
153                       myself->dereference_tsd();
154                       PEGASUS_ASSERT(lastActivityTime != 0);
155                   }
156                   catch(...)
157                   {
158                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
159                                     "ThreadPool::_loop: Failure getting sleep_sem or "
160                                     "lastActivityTime.");
161                       PEGASUS_ASSERT(false);
162                       pool->_idleThreads.remove(myself);
163                       pool->_currentThreads--;
164                       PEG_METHOD_EXIT();
165                       return ((ThreadReturnType) 1);
166                   }
167           
168                   while (1)
169 mike  1.2         {
170                       try
171                       {
172                           sleep_sem->wait();
173                       }
174                       catch(...)
175                       {
176                           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
177                                         "ThreadPool::_loop: failure on sleep_sem->wait().");
178                           PEGASUS_ASSERT(false);
179                           pool->_idleThreads.remove(myself);
180                           pool->_currentThreads--;
181                           PEG_METHOD_EXIT();
182                           return ((ThreadReturnType) 1);
183                       }
184           
185                       // When we awaken we reside on the _runningThreads queue, not the
186                       // _idleThreads queue.
187           
188                       ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
189                       void *parm = 0;
190 mike  1.2             Semaphore *blocking_sem = 0;
191           
192                       try
193                       {
194                           work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
195                               myself->reference_tsd("work func");
196                           myself->dereference_tsd();
197                           parm = myself->reference_tsd("work parm");
198                           myself->dereference_tsd();
199                           blocking_sem =
200                               (Semaphore *) myself->reference_tsd("blocking sem");
201                           myself->dereference_tsd();
202                       }
203                       catch(...)
204                       {
205                           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
206                                         "ThreadPool::_loop: Failure accessing work func, work parm, "
207                                         "or blocking sem.");
208                           PEGASUS_ASSERT(false);
209                           pool->_idleThreads.remove(myself);
210                           pool->_currentThreads--;
211 mike  1.2                 PEG_METHOD_EXIT();
212                           return ((ThreadReturnType) 1);
213                       }
214           
215                       if (work == 0)
216                       {
217                           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
218                                         "ThreadPool::_loop: work func is 0, meaning we should exit.");
219                           break;
220                       }
221           
222                       Time::gettimeofday(lastActivityTime);
223           
224                       try
225                       {
226                           PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
227                                            "Work starting.");
228                           work(parm);
229                           PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
230                                            "Work finished.");
231                       }
232 mike  1.2             catch(Exception & e)
233                       {
234                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
235                                            String
236                                            ("Exception from work in ThreadPool::_loop: ")
237                                            + e.getMessage());
238                       }
239           #if !defined(PEGASUS_OS_LSB)
240                       catch(const exception & e)
241                       {
242                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
243                                            String
244                                            ("Exception from work in ThreadPool::_loop: ")
245                                            + e.what());
246                       }
247           #endif
248                       catch(...)
249                       {
250                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
251                                            "Unknown exception from work in ThreadPool::_loop.");
252                       }
253 mike  1.2 
254                       // put myself back onto the available list
255                       try
256                       {
257                           Time::gettimeofday(lastActivityTime);
258                           if (blocking_sem != 0)
259                           {
260                               blocking_sem->signal();
261                           }
262           
263                           pool->_runningThreads.remove(myself);
264                           pool->_idleThreads.insert_front(myself);
265                       }
266                       catch(...)
267                       {
268                           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
269                                         "ThreadPool::_loop: Adding thread to idle pool failed.");
270                           PEGASUS_ASSERT(false);
271                           pool->_currentThreads--;
272                           PEG_METHOD_EXIT();
273                           return ((ThreadReturnType) 1);
274 mike  1.2             }
275                   }
276               }
277               catch(const Exception & e)
278               {
279                   PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
280                                    "Caught exception: \"" + e.getMessage() +
281                                    "\".  Exiting _loop.");
282               }
283               catch(...)
284               {
285                   PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
286                                    "Caught unrecognized exception.  Exiting _loop.");
287               }
288           
289               PEG_METHOD_EXIT();
290               return ((ThreadReturnType) 0);
291           }
292           
293           ThreadStatus ThreadPool::allocate_and_awaken(void *parm,
294                                                        ThreadReturnType
295 mike  1.2                                              (PEGASUS_THREAD_CDECL *
296                                                         work) (void *),
297                                                        Semaphore * blocking)
298           {
299               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
300           
301               // Allocate_and_awaken will not run if the _dying flag is set.
302               // Once the lock is acquired, ~ThreadPool will not change
303               // the value of _dying until the lock is released.
304           
305               try
306               {
307                   if (_dying.get())
308                   {
309                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
310                                     "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
311                       return PEGASUS_THREAD_UNAVAILABLE;
312                   }
313                   struct timeval start;
314                   Time::gettimeofday(&start);
315                   Thread *th = 0;
316 mike  1.2 
317                   th = _idleThreads.remove_front();
318           
319                   if (th == 0)
320                   {
321                       if ((_maxThreads == 0) ||
322                           (_currentThreads.get() < Uint32(_maxThreads)))
323                       {
324                           th = _initializeThread();
325                       }
326                   }
327           
328                   if (th == 0)
329                   {
330 kumpf 1.3             Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
331                           "ThreadPool::allocate_and_awaken: Insufficient resources: "
332                               " pool = %s, running threads = %d, idle threads = %d",
333                           _key, _runningThreads.size(), _idleThreads.size());
334 mike  1.2             return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
335                   }
336           
337                   // initialize the thread data with the work function and parameters
338                   Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
339                                 "Initializing thread with work function and parameters: parm = %p",
340                                 parm);
341           
342                   th->delete_tsd("work func");
343                   th->put_tsd("work func", NULL,
344                               sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
345                                       (void *)), (void *) work);
346                   th->delete_tsd("work parm");
347                   th->put_tsd("work parm", NULL, sizeof (void *), parm);
348                   th->delete_tsd("blocking sem");
349                   if (blocking != 0)
350                       th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
351           
352                   // put the thread on the running list
353                   _runningThreads.insert_front(th);
354           
355 mike  1.2         // signal the thread's sleep semaphore to awaken it
356                   Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
357                   PEGASUS_ASSERT(sleep_sem != 0);
358           
359                   Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
360                   sleep_sem->signal();
361                   th->dereference_tsd();
362               }
363               catch(...)
364               {
365                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
366                                 "ThreadPool::allocate_and_awaken: Operation Failed.");
367                   PEG_METHOD_EXIT();
368                   // ATTN: Error result has not yet been defined
369                   return PEGASUS_THREAD_SETUP_FAILURE;
370               }
371               PEG_METHOD_EXIT();
372               return PEGASUS_THREAD_OK;
373           }
374           
375           // caller is responsible for only calling this routine during slack periods
376 mike  1.2 // but should call it at least once per _deallocateWait interval.
377           
378           Uint32 ThreadPool::cleanupIdleThreads()
379           {
380               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
381           
382               Uint32 numThreadsCleanedUp = 0;
383           
384               Uint32 numIdleThreads = _idleThreads.size();
385               for (Uint32 i = 0; i < numIdleThreads; i++)
386               {
387                   // Do not dip below the minimum thread count
388                   if (_currentThreads.get() <= (Uint32) _minThreads)
389                   {
390                       break;
391                   }
392           
393                   Thread *thread = _idleThreads.remove_back();
394           
395                   // If there are no more threads in the _idleThreads queue, we're
396                   // done.
397 mike  1.2         if (thread == 0)
398                   {
399                       break;
400                   }
401           
402                   struct timeval *lastActivityTime;
403                   try
404                   {
405                       lastActivityTime =
406                           (struct timeval *) thread->
407                           try_reference_tsd("last activity time");
408                       PEGASUS_ASSERT(lastActivityTime != 0);
409                   }
410                   catch(...)
411                   {
412                       PEGASUS_ASSERT(false);
413                       _idleThreads.insert_back(thread);
414                       break;
415                   }
416           
417                   Boolean cleanupThisThread =
418 mike  1.2             _timeIntervalExpired(lastActivityTime, &_deallocateWait);
419                   thread->dereference_tsd();
420           
421                   if (cleanupThisThread)
422                   {
423                       _cleanupThread(thread);
424                       _currentThreads--;
425                       numThreadsCleanedUp++;
426                   }
427                   else
428                   {
429                       _idleThreads.insert_front(thread);
430                   }
431               }
432           
433               PEG_METHOD_EXIT();
434               return numThreadsCleanedUp;
435           }
436           
437           void ThreadPool::_cleanupThread(Thread * thread)
438           {
439 mike  1.2     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
440           
441               // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
442               thread->delete_tsd("work func");
443               thread->put_tsd("work func", 0,
444                               sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
445                                       (void *)), (void *) 0);
446               thread->delete_tsd("work parm");
447               thread->put_tsd("work parm", 0, sizeof (void *), 0);
448           
449               // signal the thread's sleep semaphore to awaken it
450               Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
451               PEGASUS_ASSERT(sleep_sem != 0);
452               sleep_sem->signal();
453               thread->dereference_tsd();
454           
455               thread->join();
456               delete thread;
457           
458               PEG_METHOD_EXIT();
459           }
460 mike  1.2 
461           Boolean ThreadPool::_timeIntervalExpired(struct timeval *start,
462                                                    struct timeval *interval)
463           {
464               // never time out if the interval is zero
465               if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
466               {
467                   return false;
468               }
469           
470               struct timeval now, finish, remaining;
471               Uint32 usec;
472               Time::gettimeofday(&now);
473               Time::gettimeofday(&remaining);     // Avoid valgrind error
474           
475               finish.tv_sec = start->tv_sec + interval->tv_sec;
476               usec = start->tv_usec + interval->tv_usec;
477               finish.tv_sec += (usec / 1000000);
478               usec %= 1000000;
479               finish.tv_usec = usec;
480           
481 mike  1.2     return (Time::subtract(&remaining, &finish, &now) != 0);
482           }
483           
484           void ThreadPool::_deleteSemaphore(void *p)
485           {
486               delete(Semaphore *) p;
487           }
488           
489           Thread *ThreadPool::_initializeThread()
490           {
491               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
492           
493               Thread *th = (Thread *) new Thread(_loop, this, false);
494           
495               // allocate a sleep semaphore and pass it in the thread context
496               // initial count is zero, loop function will sleep until
497               // we signal the semaphore
498               Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
499               th->put_tsd("sleep sem", &_deleteSemaphore, sizeof (Semaphore),
500                           (void *) sleep_sem);
501           
502 mike  1.2     struct timeval *lastActivityTime =
503                   (struct timeval *)::operator  new(sizeof (struct timeval));
504               Time::gettimeofday(lastActivityTime);
505           
506               th->put_tsd("last activity time", thread_data::default_delete,
507                           sizeof (struct timeval), (void *) lastActivityTime);
508               // thread will enter _loop() and sleep on sleep_sem until we signal it
509           
510               if (th->run() != PEGASUS_THREAD_OK)
511               {
512                   Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
513                                 "Could not create thread. Error code is %d.", errno);
514                   delete th;
515                   return 0;
516               }
517               _currentThreads++;
518               Threads::yield();
519           
520               PEG_METHOD_EXIT();
521               return th;
522           }
523 mike  1.2 
524           void ThreadPool::_addToIdleThreadsQueue(Thread * th)
525           {
526               if (th == 0)
527               {
528                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
529                                 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
530                   throw NullPointer();
531               }
532           
533               try
534               {
535                   _idleThreads.insert_front(th);
536               }
537               catch(...)
538               {
539                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
540                                 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
541                                 "failed.");
542               }
543           }
544 mike  1.2 
545           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2