(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           //%/////////////////////////////////////////////////////////////////////////////
 33           
 34           #include "ThreadPool.h"
 35           #include "Thread.h"
 36           #include <exception>
 37           #include <Pegasus/Common/Tracer.h>
 38           #include "Time.h"
 39           
 40           PEGASUS_USING_STD;
 41           
 42           PEGASUS_NAMESPACE_BEGIN
 43 mike  1.2 
 44           ///////////////////////////////////////////////////////////////////////////////
 45           //
 46           // ThreadPool
 47           //
 48           ///////////////////////////////////////////////////////////////////////////////
 49           
 50 kumpf 1.4 ThreadPool::ThreadPool(
 51               Sint16 initialSize,
 52               const char* key,
 53               Sint16 minThreads,
 54               Sint16 maxThreads,
 55               struct timeval
 56               &deallocateWait)
 57               : _maxThreads(maxThreads),
 58                 _minThreads(minThreads),
 59                 _currentThreads(0),
 60                 _idleThreads(),
 61                 _runningThreads(),
 62                 _dying(0)
 63 mike  1.2 {
 64               _deallocateWait.tv_sec = deallocateWait.tv_sec;
 65               _deallocateWait.tv_usec = deallocateWait.tv_usec;
 66           
 67               memset(_key, 0x00, 17);
 68               if (key != 0)
 69               {
 70                   strncpy(_key, key, 16);
 71               }
 72           
 73               if ((_maxThreads > 0) && (_maxThreads < initialSize))
 74               {
 75                   _maxThreads = initialSize;
 76               }
 77           
 78               if (_minThreads > initialSize)
 79               {
 80                   _minThreads = initialSize;
 81               }
 82           
 83               for (int i = 0; i < initialSize; i++)
 84 mike  1.2     {
 85                   _addToIdleThreadsQueue(_initializeThread());
 86               }
 87           }
 88           
 89           ThreadPool::~ThreadPool()
 90           {
 91               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 92           
 93               try
 94               {
 95                   // Set the dying flag so all thread know the destructor has been
 96                   // entered
 97                   _dying++;
 98 marek 1.9         PEG_TRACE((TRC_THREAD, Tracer::LEVEL2,
 99                       "Cleaning up %d idle threads.", _currentThreads.get()));
100 mike  1.2 
101                   while (_currentThreads.get() > 0)
102                   {
103 kumpf 1.4             Thread* thread = _idleThreads.remove_front();
104 mike  1.2             if (thread != 0)
105                       {
106                           _cleanupThread(thread);
107                           _currentThreads--;
108                       }
109                       else
110                       {
111                           Threads::yield();
112                       }
113                   }
114               }
115 kumpf 1.4     catch (...)
116 mike  1.2     {
117               }
118           }
119           
120 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
121 mike  1.2 {
122               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123           
124               try
125               {
126                   Thread *myself = (Thread *) parm;
127                   PEGASUS_ASSERT(myself != 0);
128           
129                   // Set myself into thread specific storage
130                   // This will allow code to get its own Thread
131                   Thread::setCurrent(myself);
132           
133                   ThreadPool *pool = (ThreadPool *) myself->get_parm();
134                   PEGASUS_ASSERT(pool != 0);
135           
136                   Semaphore *sleep_sem = 0;
137                   struct timeval *lastActivityTime = 0;
138           
139                   try
140                   {
141                       sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike  1.2             myself->dereference_tsd();
143                       PEGASUS_ASSERT(sleep_sem != 0);
144           
145                       lastActivityTime =
146                           (struct timeval *) myself->
147                           reference_tsd("last activity time");
148                       myself->dereference_tsd();
149                       PEGASUS_ASSERT(lastActivityTime != 0);
150                   }
151 kumpf 1.4         catch (...)
152 mike  1.2         {
153 marek 1.9             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
154 kumpf 1.4                 "ThreadPool::_loop: Failure getting sleep_sem or "
155                               "lastActivityTime.");
156 mike  1.2             PEGASUS_ASSERT(false);
157                       pool->_idleThreads.remove(myself);
158                       pool->_currentThreads--;
159                       PEG_METHOD_EXIT();
160 kumpf 1.4             return (ThreadReturnType) 1;
161 mike  1.2         }
162           
163                   while (1)
164                   {
165                       try
166                       {
167                           sleep_sem->wait();
168                       }
169 kumpf 1.4             catch (...)
170 mike  1.2             {
171 marek 1.9                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
172 kumpf 1.4                     "ThreadPool::_loop: failure on sleep_sem->wait().");
173 mike  1.2                 PEGASUS_ASSERT(false);
174                           pool->_idleThreads.remove(myself);
175                           pool->_currentThreads--;
176                           PEG_METHOD_EXIT();
177 kumpf 1.4                 return (ThreadReturnType) 1;
178 mike  1.2             }
179           
180                       // When we awaken we reside on the _runningThreads queue, not the
181                       // _idleThreads queue.
182           
183                       ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
184                       void *parm = 0;
185                       Semaphore *blocking_sem = 0;
186           
187                       try
188                       {
189                           work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190                               myself->reference_tsd("work func");
191                           myself->dereference_tsd();
192                           parm = myself->reference_tsd("work parm");
193                           myself->dereference_tsd();
194                           blocking_sem =
195                               (Semaphore *) myself->reference_tsd("blocking sem");
196                           myself->dereference_tsd();
197                       }
198 kumpf 1.4             catch (...)
199 mike  1.2             {
200 marek 1.9                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
201 kumpf 1.4                     "ThreadPool::_loop: Failure accessing work func, work "
202                                   "parm, or blocking sem.");
203 mike  1.2                 PEGASUS_ASSERT(false);
204                           pool->_idleThreads.remove(myself);
205                           pool->_currentThreads--;
206                           PEG_METHOD_EXIT();
207 kumpf 1.4                 return (ThreadReturnType) 1;
208 mike  1.2             }
209           
210                       if (work == 0)
211                       {
212 marek 1.9                 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
213 kumpf 1.4                     "ThreadPool::_loop: work func is 0, meaning we should "
214                                   "exit.");
215 mike  1.2                 break;
216                       }
217           
218                       Time::gettimeofday(lastActivityTime);
219           
220                       try
221                       {
222 marek 1.9                 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
223 mike  1.2                                  "Work starting.");
224                           work(parm);
225 marek 1.9                 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
226 mike  1.2                                  "Work finished.");
227                       }
228 kumpf 1.4             catch (Exception& e)
229 mike  1.2             {
230                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
231 kumpf 1.4                     String("Exception from work in ThreadPool::_loop: ") +
232                                   e.getMessage());
233 mike  1.2             }
234 kumpf 1.4             catch (const exception& e)
235 mike  1.2             {
236                           PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
237 kumpf 1.4                     String("Exception from work in ThreadPool::_loop: ") +
238                                   e.what());
239 mike  1.2             }
240 kumpf 1.4             catch (...)
241 mike  1.2             {
242 marek 1.9                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
243 kumpf 1.4                     "Unknown exception from work in ThreadPool::_loop.");
244 mike  1.2             }
245           
246                       // put myself back onto the available list
247                       try
248                       {
249                           Time::gettimeofday(lastActivityTime);
250                           if (blocking_sem != 0)
251                           {
252                               blocking_sem->signal();
253                           }
254           
255                           pool->_runningThreads.remove(myself);
256                           pool->_idleThreads.insert_front(myself);
257                       }
258 kumpf 1.4             catch (...)
259 mike  1.2             {
260 marek 1.9                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
261 kumpf 1.4                     "ThreadPool::_loop: Adding thread to idle pool failed.");
262 mike  1.2                 PEGASUS_ASSERT(false);
263                           pool->_currentThreads--;
264                           PEG_METHOD_EXIT();
265 kumpf 1.4                 return (ThreadReturnType) 1;
266 mike  1.2             }
267                   }
268               }
269 kumpf 1.4     catch (const Exception & e)
270 mike  1.2     {
271                   PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
272 kumpf 1.4             "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
273 mike  1.2     }
274 kumpf 1.4     catch (...)
275 mike  1.2     {
276 marek 1.9         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
277 kumpf 1.4             "Caught unrecognized exception.  Exiting _loop.");
278 mike  1.2     }
279           
280               PEG_METHOD_EXIT();
281 kumpf 1.4     return (ThreadReturnType) 0;
282 mike  1.2 }
283           
284 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
285               void* parm,
286               ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
287               Semaphore* blocking)
288 mike  1.2 {
289               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
290           
291               // Allocate_and_awaken will not run if the _dying flag is set.
292               // Once the lock is acquired, ~ThreadPool will not change
293               // the value of _dying until the lock is released.
294           
295               try
296               {
297                   if (_dying.get())
298                   {
299 marek 1.9             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
300 kumpf 1.4                 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
301 mike  1.2             return PEGASUS_THREAD_UNAVAILABLE;
302                   }
303                   struct timeval start;
304                   Time::gettimeofday(&start);
305                   Thread *th = 0;
306           
307                   th = _idleThreads.remove_front();
308           
309                   if (th == 0)
310                   {
311                       if ((_maxThreads == 0) ||
312                           (_currentThreads.get() < Uint32(_maxThreads)))
313                       {
314                           th = _initializeThread();
315                       }
316                   }
317           
318                   if (th == 0)
319                   {
320 marek 1.9             PEG_TRACE((TRC_THREAD, Tracer::LEVEL2,
321 kumpf 1.3                 "ThreadPool::allocate_and_awaken: Insufficient resources: "
322                               " pool = %s, running threads = %d, idle threads = %d",
323 marek 1.9                 _key, _runningThreads.size(), _idleThreads.size()));
324 mike  1.2             return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
325                   }
326           
327                   // initialize the thread data with the work function and parameters
328 marek 1.9         PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
329 kumpf 1.4             "Initializing thread with work function and parameters: parm = %p",
330 marek 1.9             parm));
331 mike  1.2 
332                   th->delete_tsd("work func");
333                   th->put_tsd("work func", NULL,
334                               sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
335                                       (void *)), (void *) work);
336                   th->delete_tsd("work parm");
337                   th->put_tsd("work parm", NULL, sizeof (void *), parm);
338                   th->delete_tsd("blocking sem");
339                   if (blocking != 0)
340                       th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
341           
342                   // put the thread on the running list
343                   _runningThreads.insert_front(th);
344           
345                   // signal the thread's sleep semaphore to awaken it
346                   Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
347                   PEGASUS_ASSERT(sleep_sem != 0);
348           
349 kumpf 1.10         PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
350                        "Signal thread to awaken");
351 mike  1.2          sleep_sem->signal();
352                    th->dereference_tsd();
353                }
354 kumpf 1.4      catch (...)
355 mike  1.2      {
356 marek 1.9          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
357 mike  1.2                        "ThreadPool::allocate_and_awaken: Operation Failed.");
358                    PEG_METHOD_EXIT();
359                    // ATTN: Error result has not yet been defined
360                    return PEGASUS_THREAD_SETUP_FAILURE;
361                }
362                PEG_METHOD_EXIT();
363                return PEGASUS_THREAD_OK;
364            }
365            
366            // caller is responsible for only calling this routine during slack periods
367            // but should call it at least once per _deallocateWait interval.
368            
369            Uint32 ThreadPool::cleanupIdleThreads()
370            {
371                PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
372            
373                Uint32 numThreadsCleanedUp = 0;
374            
375 kumpf 1.7      Uint32 numIdleThreads = _idleThreads.size();
376                for (Uint32 i = 0; i < numIdleThreads; i++)
377 mike  1.2      {
378                    // Do not dip below the minimum thread count
379                    if (_currentThreads.get() <= (Uint32) _minThreads)
380                    {
381                        break;
382                    }
383            
384                    Thread *thread = _idleThreads.remove_back();
385            
386                    // If there are no more threads in the _idleThreads queue, we're
387                    // done.
388                    if (thread == 0)
389                    {
390                        break;
391                    }
392            
393                    struct timeval *lastActivityTime;
394                    try
395                    {
396                        lastActivityTime =
397                            (struct timeval *) thread->
398 mike  1.2                  try_reference_tsd("last activity time");
399                        PEGASUS_ASSERT(lastActivityTime != 0);
400                    }
401 kumpf 1.4          catch (...)
402 mike  1.2          {
403                        PEGASUS_ASSERT(false);
404                        _idleThreads.insert_back(thread);
405                        break;
406                    }
407            
408                    Boolean cleanupThisThread =
409                        _timeIntervalExpired(lastActivityTime, &_deallocateWait);
410                    thread->dereference_tsd();
411            
412                    if (cleanupThisThread)
413                    {
414                        _cleanupThread(thread);
415                        _currentThreads--;
416                        numThreadsCleanedUp++;
417                    }
418                    else
419                    {
420                        _idleThreads.insert_front(thread);
421                    }
422                }
423 mike  1.2  
424                PEG_METHOD_EXIT();
425                return numThreadsCleanedUp;
426            }
427            
428            void ThreadPool::_cleanupThread(Thread * thread)
429            {
430                PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
431            
432                // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
433                thread->delete_tsd("work func");
434                thread->put_tsd("work func", 0,
435                                sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
436                                        (void *)), (void *) 0);
437                thread->delete_tsd("work parm");
438                thread->put_tsd("work parm", 0, sizeof (void *), 0);
439            
440                // signal the thread's sleep semaphore to awaken it
441                Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
442                PEGASUS_ASSERT(sleep_sem != 0);
443                sleep_sem->signal();
444 mike  1.2      thread->dereference_tsd();
445            
446                thread->join();
447                delete thread;
448            
449                PEG_METHOD_EXIT();
450            }
451            
452 kumpf 1.4  Boolean ThreadPool::_timeIntervalExpired(
453                struct timeval* start,
454                struct timeval* interval)
455 mike  1.2  {
456 kumpf 1.6      PEGASUS_ASSERT(interval != 0);
457            
458 mike  1.2      // never time out if the interval is zero
459 kumpf 1.6      if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
460 mike  1.2      {
461                    return false;
462                }
463            
464                struct timeval now, finish, remaining;
465                Uint32 usec;
466                Time::gettimeofday(&now);
467                Time::gettimeofday(&remaining);     // Avoid valgrind error
468            
469                finish.tv_sec = start->tv_sec + interval->tv_sec;
470                usec = start->tv_usec + interval->tv_usec;
471                finish.tv_sec += (usec / 1000000);
472                usec %= 1000000;
473                finish.tv_usec = usec;
474            
475                return (Time::subtract(&remaining, &finish, &now) != 0);
476            }
477            
478            void ThreadPool::_deleteSemaphore(void *p)
479            {
480                delete(Semaphore *) p;
481 mike  1.2  }
482            
483            Thread *ThreadPool::_initializeThread()
484            {
485                PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
486            
487                Thread *th = (Thread *) new Thread(_loop, this, false);
488            
489                // allocate a sleep semaphore and pass it in the thread context
490                // initial count is zero, loop function will sleep until
491                // we signal the semaphore
492                Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
493 kumpf 1.4      th->put_tsd(
494                    "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
495 mike  1.2  
496 kumpf 1.4      struct timeval* lastActivityTime =
497 mike  1.2          (struct timeval *)::operator  new(sizeof (struct timeval));
498                Time::gettimeofday(lastActivityTime);
499            
500 kumpf 1.4      th->put_tsd(
501                    "last activity time",
502                    thread_data::default_delete,
503                    sizeof(struct timeval),
504                    (void*) lastActivityTime);
505 mike  1.2      // thread will enter _loop() and sleep on sleep_sem until we signal it
506            
507                if (th->run() != PEGASUS_THREAD_OK)
508                {
509 marek 1.9          PEG_TRACE((TRC_THREAD, Tracer::LEVEL2,
510                        "Could not create thread. Error code is %d.", errno));
511 mike  1.2          delete th;
512                    return 0;
513                }
514                _currentThreads++;
515            
516                PEG_METHOD_EXIT();
517                return th;
518            }
519            
520            void ThreadPool::_addToIdleThreadsQueue(Thread * th)
521            {
522                if (th == 0)
523                {
524 marek 1.9          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
525 kumpf 1.4              "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
526 mike  1.2          throw NullPointer();
527                }
528            
529                try
530                {
531                    _idleThreads.insert_front(th);
532                }
533 kumpf 1.4      catch (...)
534 mike  1.2      {
535 marek 1.9          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
536 kumpf 1.4              "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
537                            "failed.");
538 mike  1.2      }
539            }
540            
541            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2