(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 martin 1.22 //%LICENSE////////////////////////////////////////////////////////////////
  2 martin 1.23 //
  3 martin 1.22 // Licensed to The Open Group (TOG) under one or more contributor license
  4             // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5             // this work for additional information regarding copyright ownership.
  6             // Each contributor licenses this file to you under the OpenPegasus Open
  7             // Source License; you may not use this file except in compliance with the
  8             // License.
  9 martin 1.23 //
 10 martin 1.22 // Permission is hereby granted, free of charge, to any person obtaining a
 11             // copy of this software and associated documentation files (the "Software"),
 12             // to deal in the Software without restriction, including without limitation
 13             // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14             // and/or sell copies of the Software, and to permit persons to whom the
 15             // Software is furnished to do so, subject to the following conditions:
 16 martin 1.23 //
 17 martin 1.22 // The above copyright notice and this permission notice shall be included
 18             // in all copies or substantial portions of the Software.
 19 martin 1.23 //
 20 martin 1.22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21 martin 1.23 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 martin 1.22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23             // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24             // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25             // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26             // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27 martin 1.23 //
 28 martin 1.22 //////////////////////////////////////////////////////////////////////////
 29 mike   1.2  //
 30             //%/////////////////////////////////////////////////////////////////////////////
 31             
 32             #include "ThreadPool.h"
 33             #include "Thread.h"
 34             #include <exception>
 35             #include <Pegasus/Common/Tracer.h>
 36             #include "Time.h"
 37             
 38             PEGASUS_USING_STD;
 39             
 40             PEGASUS_NAMESPACE_BEGIN
 41             
 42             ///////////////////////////////////////////////////////////////////////////////
 43             //
 44             // ThreadPool
 45             //
 46             ///////////////////////////////////////////////////////////////////////////////
 47             
 48 kumpf  1.4  ThreadPool::ThreadPool(
 49                 Sint16 initialSize,
 50                 const char* key,
 51                 Sint16 minThreads,
 52                 Sint16 maxThreads,
 53                 struct timeval
 54                 &deallocateWait)
 55                 : _maxThreads(maxThreads),
 56                   _minThreads(minThreads),
 57                   _currentThreads(0),
 58                   _idleThreads(),
 59                   _runningThreads(),
 60                   _dying(0)
 61 mike   1.2  {
 62                 _deallocateWait.tv_sec = deallocateWait.tv_sec;
 63                 _deallocateWait.tv_usec = deallocateWait.tv_usec;
 64             
 65                 memset(_key, 0x00, 17);
 66                 if (key != 0)
 67                 {
 68                     strncpy(_key, key, 16);
 69                 }
 70             
 71                 if ((_maxThreads > 0) && (_maxThreads < initialSize))
 72                 {
 73                     _maxThreads = initialSize;
 74                 }
 75             
 76                 if (_minThreads > initialSize)
 77                 {
 78                     _minThreads = initialSize;
 79                 }
 80             
 81                 for (int i = 0; i < initialSize; i++)
 82 mike   1.2      {
 83                     _addToIdleThreadsQueue(_initializeThread());
 84                 }
 85             }
 86             
 87             ThreadPool::~ThreadPool()
 88             {
 89                 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 90             
 91                 try
 92                 {
 93                     // Set the dying flag so all thread know the destructor has been
 94                     // entered
 95                     _dying++;
 96 marek  1.15         PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
 97 marek  1.9              "Cleaning up %d idle threads.", _currentThreads.get()));
 98 mike   1.2  
 99                     while (_currentThreads.get() > 0)
100                     {
101 kumpf  1.4              Thread* thread = _idleThreads.remove_front();
102 mike   1.2              if (thread != 0)
103                         {
104                             _cleanupThread(thread);
105                             _currentThreads--;
106                         }
107                         else
108                         {
109                             Threads::yield();
110                         }
111                     }
112                 }
113 kumpf  1.4      catch (...)
114 mike   1.2      {
115                 }
116             }
117             
118 kumpf  1.4  ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
119 mike   1.2  {
120                 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
121             
122                 try
123                 {
124                     Thread *myself = (Thread *) parm;
125                     PEGASUS_ASSERT(myself != 0);
126             
127                     // Set myself into thread specific storage
128                     // This will allow code to get its own Thread
129                     Thread::setCurrent(myself);
130             
131                     ThreadPool *pool = (ThreadPool *) myself->get_parm();
132                     PEGASUS_ASSERT(pool != 0);
133             
134                     Semaphore *sleep_sem = 0;
135                     struct timeval *lastActivityTime = 0;
136             
137                     try
138                     {
139 mike   1.21             sleep_sem = (Semaphore *) myself->reference_tsd(TSD_SLEEP_SEM);
140 mike   1.2              myself->dereference_tsd();
141                         PEGASUS_ASSERT(sleep_sem != 0);
142             
143                         lastActivityTime =
144                             (struct timeval *) myself->
145 mike   1.21                 reference_tsd(TSD_LAST_ACTIVITY_TIME);
146 mike   1.2              myself->dereference_tsd();
147                         PEGASUS_ASSERT(lastActivityTime != 0);
148                     }
149 kumpf  1.4          catch (...)
150 mike   1.2          {
151 marek  1.15             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
152 kumpf  1.4                  "ThreadPool::_loop: Failure getting sleep_sem or "
153                                 "lastActivityTime.");
154 mike   1.2              pool->_idleThreads.remove(myself);
155                         pool->_currentThreads--;
156                         PEG_METHOD_EXIT();
157 kumpf  1.4              return (ThreadReturnType) 1;
158 mike   1.2          }
159             
160                     while (1)
161                     {
162                         try
163                         {
164                             sleep_sem->wait();
165                         }
166 kumpf  1.4              catch (...)
167 mike   1.2              {
168 marek  1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
169 kumpf  1.4                      "ThreadPool::_loop: failure on sleep_sem->wait().");
170 mike   1.2                  pool->_idleThreads.remove(myself);
171                             pool->_currentThreads--;
172                             PEG_METHOD_EXIT();
173 kumpf  1.4                  return (ThreadReturnType) 1;
174 mike   1.2              }
175             
176                         // When we awaken we reside on the _runningThreads queue, not the
177                         // _idleThreads queue.
178             
179                         ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
180 kumpf  1.16             void *workParm = 0;
181 mike   1.2              Semaphore *blocking_sem = 0;
182             
183                         try
184                         {
185                             work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
186 mike   1.21                     myself->reference_tsd(TSD_WORK_FUNC);
187 mike   1.2                  myself->dereference_tsd();
188 mike   1.21                 workParm = myself->reference_tsd(TSD_WORK_PARM);
189 mike   1.2                  myself->dereference_tsd();
190                             blocking_sem =
191 mike   1.21                     (Semaphore *) myself->reference_tsd(TSD_BLOCKING_SEM);
192 mike   1.2                  myself->dereference_tsd();
193                         }
194 kumpf  1.4              catch (...)
195 mike   1.2              {
196 marek  1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
197 kumpf  1.4                      "ThreadPool::_loop: Failure accessing work func, work "
198                                     "parm, or blocking sem.");
199 mike   1.2                  pool->_idleThreads.remove(myself);
200                             pool->_currentThreads--;
201                             PEG_METHOD_EXIT();
202 kumpf  1.4                  return (ThreadReturnType) 1;
203 mike   1.2              }
204             
205                         if (work == 0)
206                         {
207 marek  1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
208 kumpf  1.4                      "ThreadPool::_loop: work func is 0, meaning we should "
209                                     "exit.");
210 mike   1.2                  break;
211                         }
212             
213                         Time::gettimeofday(lastActivityTime);
214             
215                         try
216                         {
217 marek  1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
218 mike   1.2                                   "Work starting.");
219 kumpf  1.16                 work(workParm);
220 marek  1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
221 mike   1.2                                   "Work finished.");
222                         }
223 kumpf  1.4              catch (Exception& e)
224 mike   1.2              {
225 thilo.boehm 1.18                 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
226                                      "Exception from work in ThreadPool::_loop: %s",
227                                      (const char*)e.getMessage().getCString()));
228 mike        1.2              }
229 kumpf       1.4              catch (const exception& e)
230 mike        1.2              {
231 thilo.boehm 1.18                 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
232                                      "Exception from work in ThreadPool::_loop: %s",e.what()));
233 mike        1.2              }
234 kumpf       1.4              catch (...)
235 mike        1.2              {
236 marek       1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
237 kumpf       1.4                      "Unknown exception from work in ThreadPool::_loop.");
238 mike        1.2              }
239                  
240                              // put myself back onto the available list
241                              try
242                              {
243                                  Time::gettimeofday(lastActivityTime);
244                                  if (blocking_sem != 0)
245                                  {
246                                      blocking_sem->signal();
247                                  }
248                  
249                                  pool->_runningThreads.remove(myself);
250                                  pool->_idleThreads.insert_front(myself);
251                              }
252 kumpf       1.4              catch (...)
253 mike        1.2              {
254 marek       1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
255 kumpf       1.4                      "ThreadPool::_loop: Adding thread to idle pool failed.");
256 mike        1.2                  pool->_currentThreads--;
257                                  PEG_METHOD_EXIT();
258 kumpf       1.4                  return (ThreadReturnType) 1;
259 mike        1.2              }
260                          }
261                      }
262 kumpf       1.4      catch (const Exception & e)
263 mike        1.2      {
264 thilo.boehm 1.18         PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
265                              "Caught exception: \"%s\".  Exiting _loop.",
266                              (const char*)e.getMessage().getCString()));
267 mike        1.2      }
268 kumpf       1.4      catch (...)
269 mike        1.2      {
270 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
271 kumpf       1.4              "Caught unrecognized exception.  Exiting _loop.");
272 mike        1.2      }
273                  
274                      PEG_METHOD_EXIT();
275 kumpf       1.4      return (ThreadReturnType) 0;
276 mike        1.2  }
277                  
278 kumpf       1.4  ThreadStatus ThreadPool::allocate_and_awaken(
279                      void* parm,
280                      ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
281                      Semaphore* blocking)
282 mike        1.2  {
283                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
284                  
285                      // Allocate_and_awaken will not run if the _dying flag is set.
286                      // Once the lock is acquired, ~ThreadPool will not change
287                      // the value of _dying until the lock is released.
288                  
289                      try
290                      {
291                          if (_dying.get())
292                          {
293 marek       1.15             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
294 kumpf       1.4                  "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
295 mike        1.2              return PEGASUS_THREAD_UNAVAILABLE;
296                          }
297                          struct timeval start;
298                          Time::gettimeofday(&start);
299                          Thread *th = 0;
300                  
301                          th = _idleThreads.remove_front();
302                  
303                          if (th == 0)
304                          {
305                              if ((_maxThreads == 0) ||
306                                  (_currentThreads.get() < Uint32(_maxThreads)))
307                              {
308                                  th = _initializeThread();
309                              }
310                          }
311                  
312                          if (th == 0)
313                          {
314 marek       1.15             PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
315 kumpf       1.3                  "ThreadPool::allocate_and_awaken: Insufficient resources: "
316                                      " pool = %s, running threads = %d, idle threads = %d",
317 marek       1.9                  _key, _runningThreads.size(), _idleThreads.size()));
318 mike        1.2              return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
319                          }
320                  
321                          // initialize the thread data with the work function and parameters
322 marek       1.9          PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
323 marek       1.14             "Initializing thread(%s)"
324                                  " with work function and parameters: parm = %p",
325                              Threads::id(th->getThreadHandle().thid).buffer,
326 marek       1.9              parm));
327 mike        1.2  
328 mike        1.21         th->delete_tsd(TSD_WORK_FUNC);
329                          th->put_tsd(TSD_WORK_FUNC, NULL,
330 mike        1.2                      sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
331                                              (void *)), (void *) work);
332 mike        1.21         th->delete_tsd(TSD_WORK_PARM);
333                          th->put_tsd(TSD_WORK_PARM, NULL, sizeof (void *), parm);
334                          th->delete_tsd(TSD_BLOCKING_SEM);
335 mike        1.2          if (blocking != 0)
336 mike        1.21             th->put_tsd(TSD_BLOCKING_SEM, NULL, sizeof (Semaphore *), blocking);
337 mike        1.2  
338                          // put the thread on the running list
339                          _runningThreads.insert_front(th);
340                  
341                          // signal the thread's sleep semaphore to awaken it
342 mike        1.21         Semaphore *sleep_sem = (Semaphore *) th->reference_tsd(TSD_SLEEP_SEM);
343 mike        1.2          PEGASUS_ASSERT(sleep_sem != 0);
344                  
345 kumpf       1.10         PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
346                              "Signal thread to awaken");
347 mike        1.2          sleep_sem->signal();
348                          th->dereference_tsd();
349                      }
350 kumpf       1.4      catch (...)
351 mike        1.2      {
352 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
353 mike        1.2                        "ThreadPool::allocate_and_awaken: Operation Failed.");
354                          PEG_METHOD_EXIT();
355                          // ATTN: Error result has not yet been defined
356                          return PEGASUS_THREAD_SETUP_FAILURE;
357                      }
358                      PEG_METHOD_EXIT();
359                      return PEGASUS_THREAD_OK;
360                  }
361                  
362                  // caller is responsible for only calling this routine during slack periods
363                  // but should call it at least once per _deallocateWait interval.
364                  
365                  Uint32 ThreadPool::cleanupIdleThreads()
366                  {
367                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
368                  
369                      Uint32 numThreadsCleanedUp = 0;
370                  
371 kumpf       1.7      Uint32 numIdleThreads = _idleThreads.size();
372                      for (Uint32 i = 0; i < numIdleThreads; i++)
373 mike        1.2      {
374                          // Do not dip below the minimum thread count
375                          if (_currentThreads.get() <= (Uint32) _minThreads)
376                          {
377                              break;
378                          }
379                  
380                          Thread *thread = _idleThreads.remove_back();
381                  
382                          // If there are no more threads in the _idleThreads queue, we're
383                          // done.
384                          if (thread == 0)
385                          {
386                              break;
387                          }
388                  
389 mike        1.21         void* tsd = thread->reference_tsd(TSD_LAST_ACTIVITY_TIME);
390 kumpf       1.20         struct timeval *lastActivityTime =
391                              reinterpret_cast<struct timeval*>(tsd);
392 kumpf       1.17         PEGASUS_ASSERT(lastActivityTime != 0);
393 mike        1.2  
394                          Boolean cleanupThisThread =
395                              _timeIntervalExpired(lastActivityTime, &_deallocateWait);
396                          thread->dereference_tsd();
397                  
398                          if (cleanupThisThread)
399                          {
400                              _cleanupThread(thread);
401                              _currentThreads--;
402                              numThreadsCleanedUp++;
403                          }
404                          else
405                          {
406                              _idleThreads.insert_front(thread);
407                          }
408                      }
409                  
410                      PEG_METHOD_EXIT();
411                      return numThreadsCleanedUp;
412                  }
413                  
414 mike        1.2  void ThreadPool::_cleanupThread(Thread * thread)
415                  {
416                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
417                  
418 mike        1.21     // Set the TSD_WORK_FUNC and TSD_WORK_PARM to 0 so _loop() knows to exit.
419                      thread->delete_tsd(TSD_WORK_FUNC);
420                      thread->put_tsd(TSD_WORK_FUNC, 0,
421 mike        1.2                      sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
422                                              (void *)), (void *) 0);
423 mike        1.21     thread->delete_tsd(TSD_WORK_PARM);
424                      thread->put_tsd(TSD_WORK_PARM, 0, sizeof (void *), 0);
425 mike        1.2  
426                      // signal the thread's sleep semaphore to awaken it
427 mike        1.21     Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd(TSD_SLEEP_SEM);
428 mike        1.2      PEGASUS_ASSERT(sleep_sem != 0);
429                      sleep_sem->signal();
430                      thread->dereference_tsd();
431                  
432                      thread->join();
433                      delete thread;
434                  
435                      PEG_METHOD_EXIT();
436                  }
437                  
438 kumpf       1.4  Boolean ThreadPool::_timeIntervalExpired(
439                      struct timeval* start,
440                      struct timeval* interval)
441 mike        1.2  {
442 kumpf       1.6      PEGASUS_ASSERT(interval != 0);
443                  
444 mike        1.2      // never time out if the interval is zero
445 kumpf       1.6      if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
446 mike        1.2      {
447                          return false;
448                      }
449                  
450                      struct timeval now, finish, remaining;
451                      Uint32 usec;
452                      Time::gettimeofday(&now);
453 karl        1.19 
454                      memset(&remaining, 0, sizeof(remaining));
455 mike        1.2  
456                      finish.tv_sec = start->tv_sec + interval->tv_sec;
457                      usec = start->tv_usec + interval->tv_usec;
458                      finish.tv_sec += (usec / 1000000);
459                      usec %= 1000000;
460                      finish.tv_usec = usec;
461                  
462                      return (Time::subtract(&remaining, &finish, &now) != 0);
463                  }
464                  
465                  void ThreadPool::_deleteSemaphore(void *p)
466                  {
467                      delete(Semaphore *) p;
468                  }
469                  
470                  Thread *ThreadPool::_initializeThread()
471                  {
472                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
473                  
474                      Thread *th = (Thread *) new Thread(_loop, this, false);
475                  
476 mike        1.2      // allocate a sleep semaphore and pass it in the thread context
477                      // initial count is zero, loop function will sleep until
478                      // we signal the semaphore
479                      Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
480 kumpf       1.4      th->put_tsd(
481 mike        1.21         TSD_SLEEP_SEM, &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
482 mike        1.2  
483 kumpf       1.4      struct timeval* lastActivityTime =
484 mike        1.2          (struct timeval *)::operator  new(sizeof (struct timeval));
485                      Time::gettimeofday(lastActivityTime);
486                  
487 kumpf       1.4      th->put_tsd(
488 mike        1.21         TSD_LAST_ACTIVITY_TIME,
489 kumpf       1.4          thread_data::default_delete,
490                          sizeof(struct timeval),
491                          (void*) lastActivityTime);
492 mike        1.2      // thread will enter _loop() and sleep on sleep_sem until we signal it
493                  
494                      if (th->run() != PEGASUS_THREAD_OK)
495                      {
496 marek       1.15         PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
497 marek       1.9              "Could not create thread. Error code is %d.", errno));
498 mike        1.2          delete th;
499                          return 0;
500                      }
501                      _currentThreads++;
502                  
503                      PEG_METHOD_EXIT();
504                      return th;
505                  }
506                  
507                  void ThreadPool::_addToIdleThreadsQueue(Thread * th)
508                  {
509                      if (th == 0)
510                      {
511 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
512 kumpf       1.4              "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
513 mike        1.2          throw NullPointer();
514                      }
515                  
516                      try
517                      {
518                          _idleThreads.insert_front(th);
519                      }
520 kumpf       1.4      catch (...)
521 mike        1.2      {
522 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
523 kumpf       1.4              "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
524                                  "failed.");
525 mike        1.2      }
526                  }
527                  
528                  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2