(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           //%/////////////////////////////////////////////////////////////////////////////
 33           
 34           #include "ThreadPool.h"
 35           #include "Thread.h"
 36           #include <exception>
 37           #include <Pegasus/Common/Tracer.h>
 38           #include "Time.h"
 39           
 40           PEGASUS_USING_STD;
 41           
 42           PEGASUS_NAMESPACE_BEGIN
 43 mike  1.2 
 44           ///////////////////////////////////////////////////////////////////////////////
 45           //
 46           // ThreadPool
 47           //
 48           ///////////////////////////////////////////////////////////////////////////////
 49           
 50 kumpf 1.4 ThreadPool::ThreadPool(
 51               Sint16 initialSize,
 52               const char* key,
 53               Sint16 minThreads,
 54               Sint16 maxThreads,
 55               struct timeval
 56               &deallocateWait)
 57               : _maxThreads(maxThreads),
 58                 _minThreads(minThreads),
 59                 _currentThreads(0),
 60                 _idleThreads(),
 61                 _runningThreads(),
 62                 _dying(0)
 63 mike  1.2 {
 64               _deallocateWait.tv_sec = deallocateWait.tv_sec;
 65               _deallocateWait.tv_usec = deallocateWait.tv_usec;
 66           
 67               memset(_key, 0x00, 17);
 68               if (key != 0)
 69               {
 70                   strncpy(_key, key, 16);
 71               }
 72           
 73               if ((_maxThreads > 0) && (_maxThreads < initialSize))
 74               {
 75                   _maxThreads = initialSize;
 76               }
 77           
 78               if (_minThreads > initialSize)
 79               {
 80                   _minThreads = initialSize;
 81               }
 82           
 83               for (int i = 0; i < initialSize; i++)
 84 mike  1.2     {
 85                   _addToIdleThreadsQueue(_initializeThread());
 86               }
 87           }
 88           
 89           ThreadPool::~ThreadPool()
 90           {
 91               PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 92           
 93               try
 94               {
 95                   // Set the dying flag so all thread know the destructor has been
 96                   // entered
 97                   _dying++;
 98 marek 1.15         PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
 99 marek 1.9              "Cleaning up %d idle threads.", _currentThreads.get()));
100 mike  1.2  
101                    while (_currentThreads.get() > 0)
102                    {
103 kumpf 1.4              Thread* thread = _idleThreads.remove_front();
104 mike  1.2              if (thread != 0)
105                        {
106                            _cleanupThread(thread);
107                            _currentThreads--;
108                        }
109                        else
110                        {
111                            Threads::yield();
112                        }
113                    }
114                }
115 kumpf 1.4      catch (...)
116 mike  1.2      {
117                }
118            }
119            
120 kumpf 1.4  ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
121 mike  1.2  {
122                PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123            
124                try
125                {
126                    Thread *myself = (Thread *) parm;
127                    PEGASUS_ASSERT(myself != 0);
128            
129                    // Set myself into thread specific storage
130                    // This will allow code to get its own Thread
131                    Thread::setCurrent(myself);
132            
133                    ThreadPool *pool = (ThreadPool *) myself->get_parm();
134                    PEGASUS_ASSERT(pool != 0);
135            
136                    Semaphore *sleep_sem = 0;
137                    struct timeval *lastActivityTime = 0;
138            
139                    try
140                    {
141                        sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike  1.2              myself->dereference_tsd();
143                        PEGASUS_ASSERT(sleep_sem != 0);
144            
145                        lastActivityTime =
146                            (struct timeval *) myself->
147                            reference_tsd("last activity time");
148                        myself->dereference_tsd();
149                        PEGASUS_ASSERT(lastActivityTime != 0);
150                    }
151 kumpf 1.4          catch (...)
152 mike  1.2          {
153 marek 1.15             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
154 kumpf 1.4                  "ThreadPool::_loop: Failure getting sleep_sem or "
155                                "lastActivityTime.");
156 mike  1.2              PEGASUS_ASSERT(false);
157                        pool->_idleThreads.remove(myself);
158                        pool->_currentThreads--;
159                        PEG_METHOD_EXIT();
160 kumpf 1.4              return (ThreadReturnType) 1;
161 mike  1.2          }
162            
163                    while (1)
164                    {
165                        try
166                        {
167                            sleep_sem->wait();
168                        }
169 kumpf 1.4              catch (...)
170 mike  1.2              {
171 marek 1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
172 kumpf 1.4                      "ThreadPool::_loop: failure on sleep_sem->wait().");
173 mike  1.2                  PEGASUS_ASSERT(false);
174                            pool->_idleThreads.remove(myself);
175                            pool->_currentThreads--;
176                            PEG_METHOD_EXIT();
177 kumpf 1.4                  return (ThreadReturnType) 1;
178 mike  1.2              }
179            
180                        // When we awaken we reside on the _runningThreads queue, not the
181                        // _idleThreads queue.
182            
183                        ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
184 kumpf 1.16             void *workParm = 0;
185 mike  1.2              Semaphore *blocking_sem = 0;
186            
187                        try
188                        {
189                            work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190                                myself->reference_tsd("work func");
191                            myself->dereference_tsd();
192 kumpf 1.16                 workParm = myself->reference_tsd("work parm");
193 mike  1.2                  myself->dereference_tsd();
194                            blocking_sem =
195                                (Semaphore *) myself->reference_tsd("blocking sem");
196                            myself->dereference_tsd();
197                        }
198 kumpf 1.4              catch (...)
199 mike  1.2              {
200 marek 1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
201 kumpf 1.4                      "ThreadPool::_loop: Failure accessing work func, work "
202                                    "parm, or blocking sem.");
203 mike  1.2                  PEGASUS_ASSERT(false);
204                            pool->_idleThreads.remove(myself);
205                            pool->_currentThreads--;
206                            PEG_METHOD_EXIT();
207 kumpf 1.4                  return (ThreadReturnType) 1;
208 mike  1.2              }
209            
210                        if (work == 0)
211                        {
212 marek 1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
213 kumpf 1.4                      "ThreadPool::_loop: work func is 0, meaning we should "
214                                    "exit.");
215 mike  1.2                  break;
216                        }
217            
218                        Time::gettimeofday(lastActivityTime);
219            
220                        try
221                        {
222 marek 1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
223 mike  1.2                                   "Work starting.");
224 kumpf 1.16                 work(workParm);
225 marek 1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
226 mike  1.2                                   "Work finished.");
227                        }
228 kumpf 1.4              catch (Exception& e)
229 mike  1.2              {
230 thilo.boehm 1.18                 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
231                                      "Exception from work in ThreadPool::_loop: %s",
232                                      (const char*)e.getMessage().getCString()));
233 mike        1.2              }
234 kumpf       1.4              catch (const exception& e)
235 mike        1.2              {
236 thilo.boehm 1.18                 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
237                                      "Exception from work in ThreadPool::_loop: %s",e.what()));
238 mike        1.2              }
239 kumpf       1.4              catch (...)
240 mike        1.2              {
241 marek       1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
242 kumpf       1.4                      "Unknown exception from work in ThreadPool::_loop.");
243 mike        1.2              }
244                  
245                              // put myself back onto the available list
246                              try
247                              {
248                                  Time::gettimeofday(lastActivityTime);
249                                  if (blocking_sem != 0)
250                                  {
251                                      blocking_sem->signal();
252                                  }
253                  
254                                  pool->_runningThreads.remove(myself);
255                                  pool->_idleThreads.insert_front(myself);
256                              }
257 kumpf       1.4              catch (...)
258 mike        1.2              {
259 marek       1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
260 kumpf       1.4                      "ThreadPool::_loop: Adding thread to idle pool failed.");
261 mike        1.2                  PEGASUS_ASSERT(false);
262                                  pool->_currentThreads--;
263                                  PEG_METHOD_EXIT();
264 kumpf       1.4                  return (ThreadReturnType) 1;
265 mike        1.2              }
266                          }
267                      }
268 kumpf       1.4      catch (const Exception & e)
269 mike        1.2      {
270 thilo.boehm 1.18         PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
271                              "Caught exception: \"%s\".  Exiting _loop.",
272                              (const char*)e.getMessage().getCString()));
273 mike        1.2      }
274 kumpf       1.4      catch (...)
275 mike        1.2      {
276 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
277 kumpf       1.4              "Caught unrecognized exception.  Exiting _loop.");
278 mike        1.2      }
279                  
280                      PEG_METHOD_EXIT();
281 kumpf       1.4      return (ThreadReturnType) 0;
282 mike        1.2  }
283                  
284 kumpf       1.4  ThreadStatus ThreadPool::allocate_and_awaken(
285                      void* parm,
286                      ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
287                      Semaphore* blocking)
288 mike        1.2  {
289                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
290                  
291                      // Allocate_and_awaken will not run if the _dying flag is set.
292                      // Once the lock is acquired, ~ThreadPool will not change
293                      // the value of _dying until the lock is released.
294                  
295                      try
296                      {
297                          if (_dying.get())
298                          {
299 marek       1.15             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
300 kumpf       1.4                  "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
301 mike        1.2              return PEGASUS_THREAD_UNAVAILABLE;
302                          }
303                          struct timeval start;
304                          Time::gettimeofday(&start);
305                          Thread *th = 0;
306                  
307                          th = _idleThreads.remove_front();
308                  
309                          if (th == 0)
310                          {
311                              if ((_maxThreads == 0) ||
312                                  (_currentThreads.get() < Uint32(_maxThreads)))
313                              {
314                                  th = _initializeThread();
315                              }
316                          }
317                  
318                          if (th == 0)
319                          {
320 marek       1.15             PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
321 kumpf       1.3                  "ThreadPool::allocate_and_awaken: Insufficient resources: "
322                                      " pool = %s, running threads = %d, idle threads = %d",
323 marek       1.9                  _key, _runningThreads.size(), _idleThreads.size()));
324 mike        1.2              return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
325                          }
326                  
327                          // initialize the thread data with the work function and parameters
328 marek       1.9          PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
329 marek       1.14             "Initializing thread(%s)"
330                                  " with work function and parameters: parm = %p",
331                              Threads::id(th->getThreadHandle().thid).buffer,
332 marek       1.9              parm));
333 mike        1.2  
334                          th->delete_tsd("work func");
335                          th->put_tsd("work func", NULL,
336                                      sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
337                                              (void *)), (void *) work);
338                          th->delete_tsd("work parm");
339                          th->put_tsd("work parm", NULL, sizeof (void *), parm);
340                          th->delete_tsd("blocking sem");
341                          if (blocking != 0)
342                              th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
343                  
344                          // put the thread on the running list
345                          _runningThreads.insert_front(th);
346                  
347                          // signal the thread's sleep semaphore to awaken it
348                          Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
349                          PEGASUS_ASSERT(sleep_sem != 0);
350                  
351 kumpf       1.10         PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
352                              "Signal thread to awaken");
353 mike        1.2          sleep_sem->signal();
354                          th->dereference_tsd();
355                      }
356 kumpf       1.4      catch (...)
357 mike        1.2      {
358 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
359 mike        1.2                        "ThreadPool::allocate_and_awaken: Operation Failed.");
360                          PEG_METHOD_EXIT();
361                          // ATTN: Error result has not yet been defined
362                          return PEGASUS_THREAD_SETUP_FAILURE;
363                      }
364                      PEG_METHOD_EXIT();
365                      return PEGASUS_THREAD_OK;
366                  }
367                  
368                  // caller is responsible for only calling this routine during slack periods
369                  // but should call it at least once per _deallocateWait interval.
370                  
371                  Uint32 ThreadPool::cleanupIdleThreads()
372                  {
373                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
374                  
375                      Uint32 numThreadsCleanedUp = 0;
376                  
377 kumpf       1.7      Uint32 numIdleThreads = _idleThreads.size();
378                      for (Uint32 i = 0; i < numIdleThreads; i++)
379 mike        1.2      {
380                          // Do not dip below the minimum thread count
381                          if (_currentThreads.get() <= (Uint32) _minThreads)
382                          {
383                              break;
384                          }
385                  
386                          Thread *thread = _idleThreads.remove_back();
387                  
388                          // If there are no more threads in the _idleThreads queue, we're
389                          // done.
390                          if (thread == 0)
391                          {
392                              break;
393                          }
394                  
395 kumpf       1.20         void* tsd = 0;
396                          if (!thread->try_reference_tsd("last activity time", &tsd))
397 mike        1.2          {
398                              PEGASUS_ASSERT(false);
399                              _idleThreads.insert_back(thread);
400                              break;
401                          }
402 kumpf       1.20         struct timeval *lastActivityTime =
403                              reinterpret_cast<struct timeval*>(tsd);
404 kumpf       1.17         PEGASUS_ASSERT(lastActivityTime != 0);
405 mike        1.2  
406                          Boolean cleanupThisThread =
407                              _timeIntervalExpired(lastActivityTime, &_deallocateWait);
408                          thread->dereference_tsd();
409                  
410                          if (cleanupThisThread)
411                          {
412                              _cleanupThread(thread);
413                              _currentThreads--;
414                              numThreadsCleanedUp++;
415                          }
416                          else
417                          {
418                              _idleThreads.insert_front(thread);
419                          }
420                      }
421                  
422                      PEG_METHOD_EXIT();
423                      return numThreadsCleanedUp;
424                  }
425                  
426 mike        1.2  void ThreadPool::_cleanupThread(Thread * thread)
427                  {
428                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
429                  
430                      // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
431                      thread->delete_tsd("work func");
432                      thread->put_tsd("work func", 0,
433                                      sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
434                                              (void *)), (void *) 0);
435                      thread->delete_tsd("work parm");
436                      thread->put_tsd("work parm", 0, sizeof (void *), 0);
437                  
438                      // signal the thread's sleep semaphore to awaken it
439                      Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
440                      PEGASUS_ASSERT(sleep_sem != 0);
441                      sleep_sem->signal();
442                      thread->dereference_tsd();
443                  
444                      thread->join();
445                      delete thread;
446                  
447 mike        1.2      PEG_METHOD_EXIT();
448                  }
449                  
450 kumpf       1.4  Boolean ThreadPool::_timeIntervalExpired(
451                      struct timeval* start,
452                      struct timeval* interval)
453 mike        1.2  {
454 kumpf       1.6      PEGASUS_ASSERT(interval != 0);
455                  
456 mike        1.2      // never time out if the interval is zero
457 kumpf       1.6      if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
458 mike        1.2      {
459                          return false;
460                      }
461                  
462                      struct timeval now, finish, remaining;
463                      Uint32 usec;
464                      Time::gettimeofday(&now);
465 karl        1.19 
466                      memset(&remaining, 0, sizeof(remaining));
467 mike        1.2  
468                      finish.tv_sec = start->tv_sec + interval->tv_sec;
469                      usec = start->tv_usec + interval->tv_usec;
470                      finish.tv_sec += (usec / 1000000);
471                      usec %= 1000000;
472                      finish.tv_usec = usec;
473                  
474                      return (Time::subtract(&remaining, &finish, &now) != 0);
475                  }
476                  
477                  void ThreadPool::_deleteSemaphore(void *p)
478                  {
479                      delete(Semaphore *) p;
480                  }
481                  
482                  Thread *ThreadPool::_initializeThread()
483                  {
484                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
485                  
486                      Thread *th = (Thread *) new Thread(_loop, this, false);
487                  
488 mike        1.2      // allocate a sleep semaphore and pass it in the thread context
489                      // initial count is zero, loop function will sleep until
490                      // we signal the semaphore
491                      Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
492 kumpf       1.4      th->put_tsd(
493                          "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
494 mike        1.2  
495 kumpf       1.4      struct timeval* lastActivityTime =
496 mike        1.2          (struct timeval *)::operator  new(sizeof (struct timeval));
497                      Time::gettimeofday(lastActivityTime);
498                  
499 kumpf       1.4      th->put_tsd(
500                          "last activity time",
501                          thread_data::default_delete,
502                          sizeof(struct timeval),
503                          (void*) lastActivityTime);
504 mike        1.2      // thread will enter _loop() and sleep on sleep_sem until we signal it
505                  
506                      if (th->run() != PEGASUS_THREAD_OK)
507                      {
508 marek       1.15         PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
509 marek       1.9              "Could not create thread. Error code is %d.", errno));
510 mike        1.2          delete th;
511                          return 0;
512                      }
513                      _currentThreads++;
514                  
515                      PEG_METHOD_EXIT();
516                      return th;
517                  }
518                  
519                  void ThreadPool::_addToIdleThreadsQueue(Thread * th)
520                  {
521                      if (th == 0)
522                      {
523 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
524 kumpf       1.4              "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
525 mike        1.2          throw NullPointer();
526                      }
527                  
528                      try
529                      {
530                          _idleThreads.insert_front(th);
531                      }
532 kumpf       1.4      catch (...)
533 mike        1.2      {
534 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
535 kumpf       1.4              "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
536                                  "failed.");
537 mike        1.2      }
538                  }
539                  
540                  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2