(file) Return to ThreadPool.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 martin 1.22 //%LICENSE////////////////////////////////////////////////////////////////
  2 martin 1.23 //
  3 martin 1.22 // Licensed to The Open Group (TOG) under one or more contributor license
  4             // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5             // this work for additional information regarding copyright ownership.
  6             // Each contributor licenses this file to you under the OpenPegasus Open
  7             // Source License; you may not use this file except in compliance with the
  8             // License.
  9 martin 1.23 //
 10 martin 1.22 // Permission is hereby granted, free of charge, to any person obtaining a
 11             // copy of this software and associated documentation files (the "Software"),
 12             // to deal in the Software without restriction, including without limitation
 13             // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14             // and/or sell copies of the Software, and to permit persons to whom the
 15             // Software is furnished to do so, subject to the following conditions:
 16 martin 1.23 //
 17 martin 1.22 // The above copyright notice and this permission notice shall be included
 18             // in all copies or substantial portions of the Software.
 19 martin 1.23 //
 20 martin 1.22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21 martin 1.23 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 martin 1.22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23             // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24             // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25             // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26             // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27 martin 1.23 //
 28 martin 1.22 //////////////////////////////////////////////////////////////////////////
 29 mike   1.2  //
 30             //%/////////////////////////////////////////////////////////////////////////////
 31             
 32             #include "ThreadPool.h"
 33             #include "Thread.h"
 34             #include <exception>
 35             #include <Pegasus/Common/Tracer.h>
 36             #include "Time.h"
 37             
 38             PEGASUS_USING_STD;
 39             
 40             PEGASUS_NAMESPACE_BEGIN
 41             
 42             ///////////////////////////////////////////////////////////////////////////////
 43             //
 44             // ThreadPool
 45             //
 46             ///////////////////////////////////////////////////////////////////////////////
 47             
 48 kumpf  1.4  ThreadPool::ThreadPool(
 49                 Sint16 initialSize,
 50                 const char* key,
 51                 Sint16 minThreads,
 52                 Sint16 maxThreads,
 53                 struct timeval
 54                 &deallocateWait)
 55                 : _maxThreads(maxThreads),
 56                   _minThreads(minThreads),
 57                   _currentThreads(0),
 58                   _idleThreads(),
 59                   _runningThreads(),
 60                   _dying(0)
 61 mike   1.2  {
 62                 _deallocateWait.tv_sec = deallocateWait.tv_sec;
 63                 _deallocateWait.tv_usec = deallocateWait.tv_usec;
 64             
 65                 memset(_key, 0x00, 17);
 66                 if (key != 0)
 67                 {
 68                     strncpy(_key, key, 16);
 69                 }
 70             
 71                 if ((_maxThreads > 0) && (_maxThreads < initialSize))
 72                 {
 73                     _maxThreads = initialSize;
 74                 }
 75             
 76                 if (_minThreads > initialSize)
 77                 {
 78                     _minThreads = initialSize;
 79                 }
 80             
 81                 for (int i = 0; i < initialSize; i++)
 82 mike   1.2      {
 83                     _addToIdleThreadsQueue(_initializeThread());
 84                 }
 85             }
 86             
 87             ThreadPool::~ThreadPool()
 88             {
 89                 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
 90             
 91                 try
 92                 {
 93                     // Set the dying flag so all thread know the destructor has been
 94                     // entered
 95                     _dying++;
 96 marek  1.15         PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
 97 marek  1.9              "Cleaning up %d idle threads.", _currentThreads.get()));
 98 mike   1.2  
 99                     while (_currentThreads.get() > 0)
100                     {
101 kumpf  1.4              Thread* thread = _idleThreads.remove_front();
102 mike   1.2              if (thread != 0)
103                         {
104                             _cleanupThread(thread);
105                             _currentThreads--;
106                         }
107                         else
108                         {
109                             Threads::yield();
110                         }
111                     }
112                 }
113 kumpf  1.4      catch (...)
114 mike   1.2      {
115                 }
116             }
117             
118 kumpf  1.4  ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
119 mike   1.2  {
120                 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
121             
122                 try
123                 {
124                     Thread *myself = (Thread *) parm;
125                     PEGASUS_ASSERT(myself != 0);
126             
127                     // Set myself into thread specific storage
128                     // This will allow code to get its own Thread
129                     Thread::setCurrent(myself);
130             
131                     ThreadPool *pool = (ThreadPool *) myself->get_parm();
132                     PEGASUS_ASSERT(pool != 0);
133             
134                     Semaphore *sleep_sem = 0;
135                     struct timeval *lastActivityTime = 0;
136             
137                     try
138                     {
139 mike   1.21             sleep_sem = (Semaphore *) myself->reference_tsd(TSD_SLEEP_SEM);
140 mike   1.2              myself->dereference_tsd();
141                         PEGASUS_ASSERT(sleep_sem != 0);
142             
143                         lastActivityTime =
144                             (struct timeval *) myself->
145 mike   1.21                 reference_tsd(TSD_LAST_ACTIVITY_TIME);
146 mike   1.2              myself->dereference_tsd();
147                         PEGASUS_ASSERT(lastActivityTime != 0);
148                     }
149 kumpf  1.4          catch (...)
150 mike   1.2          {
151 marek  1.15             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
152 kumpf  1.4                  "ThreadPool::_loop: Failure getting sleep_sem or "
153                                 "lastActivityTime.");
154 mike   1.2              PEGASUS_ASSERT(false);
155                         pool->_idleThreads.remove(myself);
156                         pool->_currentThreads--;
157                         PEG_METHOD_EXIT();
158 kumpf  1.4              return (ThreadReturnType) 1;
159 mike   1.2          }
160             
161                     while (1)
162                     {
163                         try
164                         {
165                             sleep_sem->wait();
166                         }
167 kumpf  1.4              catch (...)
168 mike   1.2              {
169 marek  1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
170 kumpf  1.4                      "ThreadPool::_loop: failure on sleep_sem->wait().");
171 mike   1.2                  PEGASUS_ASSERT(false);
172                             pool->_idleThreads.remove(myself);
173                             pool->_currentThreads--;
174                             PEG_METHOD_EXIT();
175 kumpf  1.4                  return (ThreadReturnType) 1;
176 mike   1.2              }
177             
178                         // When we awaken we reside on the _runningThreads queue, not the
179                         // _idleThreads queue.
180             
181                         ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
182 kumpf  1.16             void *workParm = 0;
183 mike   1.2              Semaphore *blocking_sem = 0;
184             
185                         try
186                         {
187                             work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
188 mike   1.21                     myself->reference_tsd(TSD_WORK_FUNC);
189 mike   1.2                  myself->dereference_tsd();
190 mike   1.21                 workParm = myself->reference_tsd(TSD_WORK_PARM);
191 mike   1.2                  myself->dereference_tsd();
192                             blocking_sem =
193 mike   1.21                     (Semaphore *) myself->reference_tsd(TSD_BLOCKING_SEM);
194 mike   1.2                  myself->dereference_tsd();
195                         }
196 kumpf  1.4              catch (...)
197 mike   1.2              {
198 marek  1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
199 kumpf  1.4                      "ThreadPool::_loop: Failure accessing work func, work "
200                                     "parm, or blocking sem.");
201 mike   1.2                  PEGASUS_ASSERT(false);
202                             pool->_idleThreads.remove(myself);
203                             pool->_currentThreads--;
204                             PEG_METHOD_EXIT();
205 kumpf  1.4                  return (ThreadReturnType) 1;
206 mike   1.2              }
207             
208                         if (work == 0)
209                         {
210 marek  1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
211 kumpf  1.4                      "ThreadPool::_loop: work func is 0, meaning we should "
212                                     "exit.");
213 mike   1.2                  break;
214                         }
215             
216                         Time::gettimeofday(lastActivityTime);
217             
218                         try
219                         {
220 marek  1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
221 mike   1.2                                   "Work starting.");
222 kumpf  1.16                 work(workParm);
223 marek  1.9                  PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
224 mike   1.2                                   "Work finished.");
225                         }
226 kumpf  1.4              catch (Exception& e)
227 mike   1.2              {
228 thilo.boehm 1.18                 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
229                                      "Exception from work in ThreadPool::_loop: %s",
230                                      (const char*)e.getMessage().getCString()));
231 mike        1.2              }
232 kumpf       1.4              catch (const exception& e)
233 mike        1.2              {
234 thilo.boehm 1.18                 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
235                                      "Exception from work in ThreadPool::_loop: %s",e.what()));
236 mike        1.2              }
237 kumpf       1.4              catch (...)
238 mike        1.2              {
239 marek       1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
240 kumpf       1.4                      "Unknown exception from work in ThreadPool::_loop.");
241 mike        1.2              }
242                  
243                              // put myself back onto the available list
244                              try
245                              {
246                                  Time::gettimeofday(lastActivityTime);
247                                  if (blocking_sem != 0)
248                                  {
249                                      blocking_sem->signal();
250                                  }
251                  
252                                  pool->_runningThreads.remove(myself);
253                                  pool->_idleThreads.insert_front(myself);
254                              }
255 kumpf       1.4              catch (...)
256 mike        1.2              {
257 marek       1.15                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
258 kumpf       1.4                      "ThreadPool::_loop: Adding thread to idle pool failed.");
259 mike        1.2                  PEGASUS_ASSERT(false);
260                                  pool->_currentThreads--;
261                                  PEG_METHOD_EXIT();
262 kumpf       1.4                  return (ThreadReturnType) 1;
263 mike        1.2              }
264                          }
265                      }
266 kumpf       1.4      catch (const Exception & e)
267 mike        1.2      {
268 thilo.boehm 1.18         PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
269                              "Caught exception: \"%s\".  Exiting _loop.",
270                              (const char*)e.getMessage().getCString()));
271 mike        1.2      }
272 kumpf       1.4      catch (...)
273 mike        1.2      {
274 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
275 kumpf       1.4              "Caught unrecognized exception.  Exiting _loop.");
276 mike        1.2      }
277                  
278                      PEG_METHOD_EXIT();
279 kumpf       1.4      return (ThreadReturnType) 0;
280 mike        1.2  }
281                  
282 kumpf       1.4  ThreadStatus ThreadPool::allocate_and_awaken(
283                      void* parm,
284                      ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
285                      Semaphore* blocking)
286 mike        1.2  {
287                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
288                  
289                      // Allocate_and_awaken will not run if the _dying flag is set.
290                      // Once the lock is acquired, ~ThreadPool will not change
291                      // the value of _dying until the lock is released.
292                  
293                      try
294                      {
295                          if (_dying.get())
296                          {
297 marek       1.15             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
298 kumpf       1.4                  "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
299 mike        1.2              return PEGASUS_THREAD_UNAVAILABLE;
300                          }
301                          struct timeval start;
302                          Time::gettimeofday(&start);
303                          Thread *th = 0;
304                  
305                          th = _idleThreads.remove_front();
306                  
307                          if (th == 0)
308                          {
309                              if ((_maxThreads == 0) ||
310                                  (_currentThreads.get() < Uint32(_maxThreads)))
311                              {
312                                  th = _initializeThread();
313                              }
314                          }
315                  
316                          if (th == 0)
317                          {
318 marek       1.15             PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
319 kumpf       1.3                  "ThreadPool::allocate_and_awaken: Insufficient resources: "
320                                      " pool = %s, running threads = %d, idle threads = %d",
321 marek       1.9                  _key, _runningThreads.size(), _idleThreads.size()));
322 mike        1.2              return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
323                          }
324                  
325                          // initialize the thread data with the work function and parameters
326 marek       1.9          PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
327 marek       1.14             "Initializing thread(%s)"
328                                  " with work function and parameters: parm = %p",
329                              Threads::id(th->getThreadHandle().thid).buffer,
330 marek       1.9              parm));
331 mike        1.2  
332 mike        1.21         th->delete_tsd(TSD_WORK_FUNC);
333                          th->put_tsd(TSD_WORK_FUNC, NULL,
334 mike        1.2                      sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
335                                              (void *)), (void *) work);
336 mike        1.21         th->delete_tsd(TSD_WORK_PARM);
337                          th->put_tsd(TSD_WORK_PARM, NULL, sizeof (void *), parm);
338                          th->delete_tsd(TSD_BLOCKING_SEM);
339 mike        1.2          if (blocking != 0)
340 mike        1.21             th->put_tsd(TSD_BLOCKING_SEM, NULL, sizeof (Semaphore *), blocking);
341 mike        1.2  
342                          // put the thread on the running list
343                          _runningThreads.insert_front(th);
344                  
345                          // signal the thread's sleep semaphore to awaken it
346 mike        1.21         Semaphore *sleep_sem = (Semaphore *) th->reference_tsd(TSD_SLEEP_SEM);
347 mike        1.2          PEGASUS_ASSERT(sleep_sem != 0);
348                  
349 kumpf       1.10         PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
350                              "Signal thread to awaken");
351 mike        1.2          sleep_sem->signal();
352                          th->dereference_tsd();
353                      }
354 kumpf       1.4      catch (...)
355 mike        1.2      {
356 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
357 mike        1.2                        "ThreadPool::allocate_and_awaken: Operation Failed.");
358                          PEG_METHOD_EXIT();
359                          // ATTN: Error result has not yet been defined
360                          return PEGASUS_THREAD_SETUP_FAILURE;
361                      }
362                      PEG_METHOD_EXIT();
363                      return PEGASUS_THREAD_OK;
364                  }
365                  
366                  // caller is responsible for only calling this routine during slack periods
367                  // but should call it at least once per _deallocateWait interval.
368                  
369                  Uint32 ThreadPool::cleanupIdleThreads()
370                  {
371                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
372                  
373                      Uint32 numThreadsCleanedUp = 0;
374                  
375 kumpf       1.7      Uint32 numIdleThreads = _idleThreads.size();
376                      for (Uint32 i = 0; i < numIdleThreads; i++)
377 mike        1.2      {
378                          // Do not dip below the minimum thread count
379                          if (_currentThreads.get() <= (Uint32) _minThreads)
380                          {
381                              break;
382                          }
383                  
384                          Thread *thread = _idleThreads.remove_back();
385                  
386                          // If there are no more threads in the _idleThreads queue, we're
387                          // done.
388                          if (thread == 0)
389                          {
390                              break;
391                          }
392                  
393 mike        1.21         void* tsd = thread->reference_tsd(TSD_LAST_ACTIVITY_TIME);
394 kumpf       1.20         struct timeval *lastActivityTime =
395                              reinterpret_cast<struct timeval*>(tsd);
396 kumpf       1.17         PEGASUS_ASSERT(lastActivityTime != 0);
397 mike        1.2  
398                          Boolean cleanupThisThread =
399                              _timeIntervalExpired(lastActivityTime, &_deallocateWait);
400                          thread->dereference_tsd();
401                  
402                          if (cleanupThisThread)
403                          {
404                              _cleanupThread(thread);
405                              _currentThreads--;
406                              numThreadsCleanedUp++;
407                          }
408                          else
409                          {
410                              _idleThreads.insert_front(thread);
411                          }
412                      }
413                  
414                      PEG_METHOD_EXIT();
415                      return numThreadsCleanedUp;
416                  }
417                  
418 mike        1.2  void ThreadPool::_cleanupThread(Thread * thread)
419                  {
420                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
421                  
422 mike        1.21     // Set the TSD_WORK_FUNC and TSD_WORK_PARM to 0 so _loop() knows to exit.
423                      thread->delete_tsd(TSD_WORK_FUNC);
424                      thread->put_tsd(TSD_WORK_FUNC, 0,
425 mike        1.2                      sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
426                                              (void *)), (void *) 0);
427 mike        1.21     thread->delete_tsd(TSD_WORK_PARM);
428                      thread->put_tsd(TSD_WORK_PARM, 0, sizeof (void *), 0);
429 mike        1.2  
430                      // signal the thread's sleep semaphore to awaken it
431 mike        1.21     Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd(TSD_SLEEP_SEM);
432 mike        1.2      PEGASUS_ASSERT(sleep_sem != 0);
433                      sleep_sem->signal();
434                      thread->dereference_tsd();
435                  
436                      thread->join();
437                      delete thread;
438                  
439                      PEG_METHOD_EXIT();
440                  }
441                  
442 kumpf       1.4  Boolean ThreadPool::_timeIntervalExpired(
443                      struct timeval* start,
444                      struct timeval* interval)
445 mike        1.2  {
446 kumpf       1.6      PEGASUS_ASSERT(interval != 0);
447                  
448 mike        1.2      // never time out if the interval is zero
449 kumpf       1.6      if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
450 mike        1.2      {
451                          return false;
452                      }
453                  
454                      struct timeval now, finish, remaining;
455                      Uint32 usec;
456                      Time::gettimeofday(&now);
457 karl        1.19 
458                      memset(&remaining, 0, sizeof(remaining));
459 mike        1.2  
460                      finish.tv_sec = start->tv_sec + interval->tv_sec;
461                      usec = start->tv_usec + interval->tv_usec;
462                      finish.tv_sec += (usec / 1000000);
463                      usec %= 1000000;
464                      finish.tv_usec = usec;
465                  
466                      return (Time::subtract(&remaining, &finish, &now) != 0);
467                  }
468                  
469                  void ThreadPool::_deleteSemaphore(void *p)
470                  {
471                      delete(Semaphore *) p;
472                  }
473                  
474                  Thread *ThreadPool::_initializeThread()
475                  {
476                      PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
477                  
478                      Thread *th = (Thread *) new Thread(_loop, this, false);
479                  
480 mike        1.2      // allocate a sleep semaphore and pass it in the thread context
481                      // initial count is zero, loop function will sleep until
482                      // we signal the semaphore
483                      Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
484 kumpf       1.4      th->put_tsd(
485 mike        1.21         TSD_SLEEP_SEM, &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
486 mike        1.2  
487 kumpf       1.4      struct timeval* lastActivityTime =
488 mike        1.2          (struct timeval *)::operator  new(sizeof (struct timeval));
489                      Time::gettimeofday(lastActivityTime);
490                  
491 kumpf       1.4      th->put_tsd(
492 mike        1.21         TSD_LAST_ACTIVITY_TIME,
493 kumpf       1.4          thread_data::default_delete,
494                          sizeof(struct timeval),
495                          (void*) lastActivityTime);
496 mike        1.2      // thread will enter _loop() and sleep on sleep_sem until we signal it
497                  
498                      if (th->run() != PEGASUS_THREAD_OK)
499                      {
500 marek       1.15         PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
501 marek       1.9              "Could not create thread. Error code is %d.", errno));
502 mike        1.2          delete th;
503                          return 0;
504                      }
505                      _currentThreads++;
506                  
507                      PEG_METHOD_EXIT();
508                      return th;
509                  }
510                  
511                  void ThreadPool::_addToIdleThreadsQueue(Thread * th)
512                  {
513                      if (th == 0)
514                      {
515 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
516 kumpf       1.4              "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
517 mike        1.2          throw NullPointer();
518                      }
519                  
520                      try
521                      {
522                          _idleThreads.insert_front(th);
523                      }
524 kumpf       1.4      catch (...)
525 mike        1.2      {
526 marek       1.15         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
527 kumpf       1.4              "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
528                                  "failed.");
529 mike        1.2      }
530                  }
531                  
532                  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2