(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.75 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 chip  1.11 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 david.dillard 1.83 //
 19 chip          1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike          1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21                    // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 chip          1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23                    // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24                    // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike          1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26                    // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //==============================================================================
 29                    //
 30                    // Author: Mike Day (mdday@us.ibm.com)
 31                    //
 32                    // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 33 chip          1.11 //              added nsk platform support
 34 kumpf         1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 35 a.arora       1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 36 gs.keenan     1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 37 david.dillard 1.83 //              David Dillard, VERITAS Software Corp.
 38                    //                  (david.dillard@veritas.com)
 39 mike          1.2  //
 40                    //%/////////////////////////////////////////////////////////////////////////////
 41                    
 42                    #include "Thread.h"
 43 kumpf         1.68 #include <exception>
 44 mike          1.2  #include <Pegasus/Common/IPC.h>
 45 kumpf         1.14 #include <Pegasus/Common/Tracer.h>
 46 mike          1.2  
 47                    #if defined(PEGASUS_OS_TYPE_WINDOWS)
 48 chip          1.11 # include "ThreadWindows.cpp"
 49 mike          1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 50                    # include "ThreadUnix.cpp"
 51                    #elif defined(PEGASUS_OS_TYPE_NSK)
 52                    # include "ThreadNsk.cpp"
 53 gs.keenan     1.76 #elif defined(PEGASUS_OS_VMS)
 54                    # include "ThreadVms.cpp"
 55 mike          1.2  #else
 56                    # error "Unsupported platform"
 57                    #endif
 58                    
 59 kumpf         1.69 PEGASUS_USING_STD;
 60 mike          1.2  PEGASUS_NAMESPACE_BEGIN
 61                    
 62 mday          1.42 
 63 chip          1.11 void thread_data::default_delete(void * data)
 64                    {
 65 mike          1.2     if( data != NULL)
 66 chip          1.11       ::operator delete(data);
 67 mike          1.2  }
 68                    
 69 chuck         1.43 // l10n start
 70                    void language_delete(void * data)
 71                    {
 72                       if( data != NULL)
 73                       {
 74 a.arora       1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
 75 chuck         1.43    }
 76                    }
 77                    // l10n end
 78                    
 79 mike          1.2  Boolean Thread::_signals_blocked = false;
 80 chuck         1.37 // l10n
 81 marek         1.63 #ifndef PEGASUS_OS_ZOS
 82 w.otsuka      1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
 83 marek         1.63 #else
 84                    PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 85                    #endif
 86 chuck         1.37 Boolean Thread::_key_initialized = false;
 87 chuck         1.41 Boolean Thread::_key_error = false;
 88 chuck         1.37 
 89 mike          1.2  
 90 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
 91 mike          1.2  {
 92 a.arora       1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 93 a.arora       1.65     _cleanup.insert_first(cu.get());
 94 a.arora       1.64     cu.release();
 95 mike          1.2      return;
 96                    }
 97 kumpf         1.81 
 98 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
 99 mike          1.2  {
100 david.dillard 1.83     AutoPtr<cleanup_handler> cu;
101 chip          1.11     try
102                        {
103 kumpf         1.81         cu.reset(_cleanup.remove_first());
104 mike          1.2      }
105 chip          1.11     catch(IPCException&)
106 mike          1.2      {
107 kumpf         1.81         PEGASUS_ASSERT(0);
108 mike          1.2       }
109                        if(execute == true)
110 kumpf         1.81         cu->execute();
111 mike          1.2  }
112 kumpf         1.81 
113 mike          1.2  
114 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
115 mike          1.2  
116                    
117 chip          1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
118                    void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
119                    {
120                        // execute the cleanup stack and then return
121 mike          1.2     while( _cleanup.count() )
122                       {
123 chip          1.11        try
124                           {
125 kumpf         1.81            cleanup_pop(true);
126 chip          1.11        }
127                           catch(IPCException&)
128                           {
129 kumpf         1.81           PEGASUS_ASSERT(0);
130                              break;
131 mike          1.2         }
132                       }
133                       _exit_code = exit_code;
134                       exit_thread(exit_code);
135 mday          1.4     _handle.thid = 0;
136 mike          1.2  }
137                    
138                    
139                    #endif
140                    
141 chuck         1.37 // l10n start
142 chuck         1.39 Sint8 Thread::initializeKey()
143                    {
144 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
145                        if (!Thread::_key_initialized)
146                        {
147                            if (Thread::_key_error)
148                            {
149                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
150                                    "Thread: ERROR - thread key error");
151                                return -1;
152                            }
153                    
154                            if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
155                            {
156                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157                                    "Thread: able to create a thread key");
158                                Thread::_key_initialized = true;
159                            }
160                            else
161                            {
162                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
163                                    "Thread: ERROR - unable to create a thread key");
164                                Thread::_key_error = true;
165 kumpf         1.81             return -1;
166                            }
167                        }
168 chuck         1.39 
169 kumpf         1.81     PEG_METHOD_EXIT();
170                        return 0;
171 chuck         1.39 }
172                    
173 chuck         1.37 Thread * Thread::getCurrent()
174                    {
175 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
176 chuck         1.40     if (Thread::initializeKey() != 0)
177 chuck         1.39     {
178 kumpf         1.81         return NULL;
179 chuck         1.39     }
180 kumpf         1.81     PEG_METHOD_EXIT();
181                        return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
182 chuck         1.39 }
183                    
184                    void Thread::setCurrent(Thread * thrd)
185                    {
186 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
187                        if (Thread::initializeKey() == 0)
188                        {
189                            if (pegasus_set_thread_specific(
190                                   Thread::_platform_thread_key, (void *) thrd) == 0)
191 chuck         1.39         {
192 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
193                                    "Successful set Thread * into thread specific storage");
194 chuck         1.39         }
195                            else
196                            {
197 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
198                                    "ERROR: error setting Thread * into thread specific storage");
199 chuck         1.39         }
200 kumpf         1.81     }
201                        PEG_METHOD_EXIT();
202 chuck         1.37 }
203                    
204                    AcceptLanguages * Thread::getLanguages()
205                    {
206 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
207                    
208                        Thread * curThrd = Thread::getCurrent();
209                        if (curThrd == NULL)
210                            return NULL;
211                        AcceptLanguages * acceptLangs =
212                            (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
213                        curThrd->dereference_tsd();
214                        PEG_METHOD_EXIT();
215                        return acceptLangs;
216 chuck         1.37 }
217                    
218                    void Thread::setLanguages(AcceptLanguages *langs) //l10n
219                    {
220 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
221                    
222                        Thread* currentThrd = Thread::getCurrent();
223                        if (currentThrd != NULL)
224                        {
225                            // deletes the old tsd and creates a new one
226                            currentThrd->put_tsd("acceptLanguages",
227                                language_delete,
228                                sizeof(AcceptLanguages *),
229                                langs);
230                        }
231                    
232                        PEG_METHOD_EXIT();
233 chuck         1.37 }
234                    
235                    void Thread::clearLanguages() //l10n
236                    {
237 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
238                    
239                        Thread * currentThrd = Thread::getCurrent();
240                        if (currentThrd != NULL)
241                        {
242                            // deletes the old tsd
243                            currentThrd->delete_tsd("acceptLanguages");
244                        }
245                    
246                        PEG_METHOD_EXIT();
247 chuck         1.37 }
248 kumpf         1.81 // l10n end
249 chuck         1.37 
250 mday          1.52 
251 kumpf         1.81 ///////////////////////////////////////////////////////////////////////////////
252                    //
253                    // ThreadPool
254                    //
255                    ///////////////////////////////////////////////////////////////////////////////
256                    
257                    ThreadPool::ThreadPool(
258                        Sint16 initialSize,
259                        const char* key,
260                        Sint16 minThreads,
261                        Sint16 maxThreads,
262                        struct timeval& deallocateWait)
263                        : _maxThreads(maxThreads),
264                          _minThreads(minThreads),
265                          _currentThreads(0),
266                          _idleThreads(true),
267                          _runningThreads(true),
268                          _dying(0)
269 mday          1.58 {
270 kumpf         1.81     _deallocateWait.tv_sec = deallocateWait.tv_sec;
271                        _deallocateWait.tv_usec = deallocateWait.tv_usec;
272 mday          1.58 
273 kumpf         1.81     memset(_key, 0x00, 17);
274                        if (key != 0)
275                        {
276                            strncpy(_key, key, 16);
277                        }
278                    
279                        if ((_maxThreads > 0) && (_maxThreads < initialSize))
280                        {
281                            _maxThreads = initialSize;
282                        }
283 mday          1.58 
284 kumpf         1.81     if (_minThreads > initialSize)
285                        {
286                            _minThreads = initialSize;
287                        }
288 mday          1.52 
289 kumpf         1.81     for (int i = 0; i < initialSize; i++)
290                        {
291                            _addToIdleThreadsQueue(_initializeThread());
292                        }
293                    }
294 mday          1.20 
295 kumpf         1.81 ThreadPool::~ThreadPool()
296 mday          1.20 {
297 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
298                        try
299                        {
300                            // Set the dying flag so all thread know the destructor has been entered
301                            _dying++;
302                    
303                            while (_currentThreads.value() > 0)
304                            {
305                                Thread* thread = _idleThreads.remove_first();
306                                if (thread != 0)
307                                {
308                                    _cleanupThread(thread);
309                                    _currentThreads--;
310                                }
311                                else
312                                {
313                                    pegasus_yield();
314                                }
315                            }
316                        }
317                        catch (...)
318 kumpf         1.81     {
319                        }
320 mday          1.20 }
321                    
322 kumpf         1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
323                    {
324                        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
325                    
326                        try
327                        {
328 kumpf         1.82         Thread* myself = (Thread *)parm;
329                            PEGASUS_ASSERT(myself != 0);
330 kumpf         1.81 
331 kumpf         1.82         // Set myself into thread specific storage
332                            // This will allow code to get its own Thread
333                            Thread::setCurrent(myself);
334 kumpf         1.81 
335 kumpf         1.82         ThreadPool* pool = (ThreadPool *)myself->get_parm();
336                            PEGASUS_ASSERT(pool != 0);
337 mike          1.2  
338 kumpf         1.82         Semaphore* sleep_sem = 0;
339                            struct timeval* lastActivityTime = 0;
340 chuck         1.39 
341 kumpf         1.81         try
342                            {
343 kumpf         1.82             sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
344 kumpf         1.81             myself->dereference_tsd();
345 kumpf         1.82             PEGASUS_ASSERT(sleep_sem != 0);
346                    
347                                lastActivityTime =
348                                    (struct timeval *)myself->reference_tsd("last activity time");
349 kumpf         1.81             myself->dereference_tsd();
350 kumpf         1.82             PEGASUS_ASSERT(lastActivityTime != 0);
351 kumpf         1.81         }
352                            catch (...)
353                            {
354                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
355 kumpf         1.82                 "ThreadPool::_loop: Failure getting sleep_sem or "
356                                        "lastActivityTime.");
357 kumpf         1.81             PEGASUS_ASSERT(false);
358                                pool->_idleThreads.remove(myself);
359                                pool->_currentThreads--;
360                                PEG_METHOD_EXIT();
361                                return((PEGASUS_THREAD_RETURN)1);
362                            }
363 mday          1.52 
364 kumpf         1.82         while (1)
365 kumpf         1.81         {
366 kumpf         1.82             try
367                                {
368                                    sleep_sem->wait();
369                                }
370                                catch (...)
371                                {
372                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
373                                        "ThreadPool::_loop: failure on sleep_sem->wait().");
374                                    PEGASUS_ASSERT(false);
375                                    pool->_idleThreads.remove(myself);
376                                    pool->_currentThreads--;
377                                    PEG_METHOD_EXIT();
378                                    return((PEGASUS_THREAD_RETURN)1);
379                                }
380                    
381                                // When we awaken we reside on the _runningThreads queue, not the
382                                // _idleThreads queue.
383                    
384                                PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
385                                void* parm = 0;
386                                Semaphore* blocking_sem = 0;
387 kumpf         1.82 
388                                try
389                                {
390                                    work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
391                                        myself->reference_tsd("work func");
392                                    myself->dereference_tsd();
393                                    parm = myself->reference_tsd("work parm");
394                                    myself->dereference_tsd();
395                                    blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
396                                    myself->dereference_tsd();
397                                }
398                                catch (...)
399                                {
400                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
401                                        "ThreadPool::_loop: Failure accessing work func, work parm, "
402                                            "or blocking sem.");
403                                    PEGASUS_ASSERT(false);
404                                    pool->_idleThreads.remove(myself);
405                                    pool->_currentThreads--;
406                                    PEG_METHOD_EXIT();
407                                    return((PEGASUS_THREAD_RETURN)1);
408 kumpf         1.82             }
409                    
410                                if (work == 0)
411                                {
412 carolann.graves 1.84                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
413 kumpf           1.82                     "ThreadPool::_loop: work func is 0, meaning we should exit.");
414                                      break;
415                                  }
416 mike            1.2  
417 kumpf           1.82             gettimeofday(lastActivityTime, NULL);
418 konrad.r        1.67 
419 kumpf           1.82             try
420                                  {
421                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
422                                      work(parm);
423                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
424                                  }
425                                  catch (Exception & e)
426                                  {
427                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
428                                          String("Exception from work in ThreadPool::_loop: ") +
429                                              e.getMessage());
430                                  }
431 kumpf           1.68 #if !defined(PEGASUS_OS_LSB)
432 kumpf           1.82             catch (exception& e)
433                                  {
434                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
435                                          String("Exception from work in ThreadPool::_loop: ") +
436                                              e.what());
437                                  }
438 kumpf           1.68 #endif
439 kumpf           1.82             catch (...)
440                                  {
441                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
442                                          "Unknown exception from work in ThreadPool::_loop.");
443                                  }
444 kumpf           1.81 
445 kumpf           1.82             // put myself back onto the available list
446                                  try
447 kumpf           1.57             {
448 kumpf           1.82                 gettimeofday(lastActivityTime, NULL);
449                                      if (blocking_sem != 0)
450                                      {
451                                          blocking_sem->signal();
452                                      }
453 s.hills         1.49 
454 kumpf           1.82                 Boolean removed = pool->_runningThreads.remove((void *)myself);
455                                      PEGASUS_ASSERT(removed);
456 s.hills         1.49 
457 kumpf           1.82                 pool->_idleThreads.insert_first(myself);
458                                  }
459                                  catch (...)
460                                  {
461                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
462                                          "ThreadPool::_loop: Adding thread to idle pool failed.");
463                                      PEGASUS_ASSERT(false);
464                                      pool->_currentThreads--;
465                                      PEG_METHOD_EXIT();
466                                      return((PEGASUS_THREAD_RETURN)1);
467                                  }
468 kumpf           1.81         }
469 kumpf           1.82     }
470                          catch (const Exception& e)
471                          {
472                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
473                                  "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
474                          }
475                          catch (...)
476                          {
477                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
478                                  "Caught unrecognized exception.  Exiting _loop.");
479 kumpf           1.81     }
480 kumpf           1.14 
481 kumpf           1.81     PEG_METHOD_EXIT();
482                          return((PEGASUS_THREAD_RETURN)0);
483 mike            1.2  }
484                      
485 kumpf           1.81 Boolean ThreadPool::allocate_and_awaken(
486                          void* parm,
487                          PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
488                          Semaphore* blocking)
489 mike            1.2  {
490 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
491                      
492                          // Allocate_and_awaken will not run if the _dying flag is set.
493                          // Once the lock is acquired, ~ThreadPool will not change
494                          // the value of _dying until the lock is released.
495                      
496                          try
497                          {
498                              if (_dying.value())
499                              {
500                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
501                                      "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
502                                  // ATTN: Error result has not yet been defined
503                                  return true;
504                              }
505                              struct timeval start;
506                              gettimeofday(&start, NULL);
507                              Thread* th = 0;
508                      
509                              th = _idleThreads.remove_first();
510 kumpf           1.57 
511 kumpf           1.81         if (th == 0)
512                              {
513                                  if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
514                                  {
515                                      th = _initializeThread();
516                                  }
517                              }
518                      
519                              if (th == 0)
520                              {
521                                  // ATTN-DME-P3-20031103: This trace message should not be
522                                  // be labeled TRC_DISCARDED_DATA, because it does not
523                                  // necessarily imply that a failure has occurred.  However,
524                                  // this label is being used temporarily to help isolate
525                                  // the cause of client timeout problems.
526 kumpf           1.60 
527 kumpf           1.81             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
528                                      "ThreadPool::allocate_and_awaken: Insufficient resources: "
529                                          " pool = %s, running threads = %d, idle threads = %d",
530                                      _key, _runningThreads.count(), _idleThreads.count());
531                                  return false;
532                              }
533 mike            1.2  
534 kumpf           1.81         // initialize the thread data with the work function and parameters
535                              Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
536                                  "Initializing thread with work function and parameters: parm = %p",
537                                  parm);
538 mike            1.2  
539 kumpf           1.81         th->delete_tsd("work func");
540                              th->put_tsd("work func", NULL,
541 kumpf           1.70             sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
542 kumpf           1.81             (void *)work);
543                              th->delete_tsd("work parm");
544                              th->put_tsd("work parm", NULL, sizeof(void *), parm);
545                              th->delete_tsd("blocking sem");
546                              if (blocking != 0)
547                                  th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
548                      
549                              // put the thread on the running list
550                              _runningThreads.insert_first(th);
551                      
552                              // signal the thread's sleep semaphore to awaken it
553                              Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
554                              PEGASUS_ASSERT(sleep_sem != 0);
555                      
556                              Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
557                              sleep_sem->signal();
558                              th->dereference_tsd();
559 kumpf           1.57     }
560 mday            1.58     catch (...)
561 kumpf           1.57     {
562 kumpf           1.81         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
563                                  "ThreadPool::allocate_and_awaken: Operation Failed.");
564                              PEG_METHOD_EXIT();
565                              // ATTN: Error result has not yet been defined
566                              return true;
567 kumpf           1.57     }
568 kumpf           1.81     PEG_METHOD_EXIT();
569                          return true;
570 mike            1.2  }
571                      
572 kumpf           1.81 // caller is responsible for only calling this routine during slack periods
573                      // but should call it at least once per _deallocateWait interval.
574 mday            1.12 
575 kumpf           1.81 Uint32 ThreadPool::cleanupIdleThreads()
576 mike            1.2  {
577 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
578                      
579                          Uint32 numThreadsCleanedUp = 0;
580                      
581                          Uint32 numIdleThreads = _idleThreads.count();
582                          for (Uint32 i = 0; i < numIdleThreads; i++)
583                          {
584                              // Do not dip below the minimum thread count
585                              if (_currentThreads.value() <= (Uint32)_minThreads)
586                              {
587                                  break;
588                              }
589                      
590                              Thread* thread = _idleThreads.remove_last();
591                      
592                              // If there are no more threads in the _idleThreads queue, we're done.
593                              if (thread == 0)
594                              {
595                                  break;
596                              }
597                      
598 kumpf           1.81         struct timeval* lastActivityTime;
599                              try
600                              {
601                                  lastActivityTime = (struct timeval *)thread->try_reference_tsd(
602                                      "last activity time");
603                                  PEGASUS_ASSERT(lastActivityTime != 0);
604                              }
605                              catch (...)
606                              {
607                                  PEGASUS_ASSERT(false);
608                                  _idleThreads.insert_last(thread);
609                                  break;
610                              }
611                      
612                              Boolean cleanupThisThread =
613                                  _timeIntervalExpired(lastActivityTime, &_deallocateWait);
614                              thread->dereference_tsd();
615                      
616                              if (cleanupThisThread)
617                              {
618                                  _cleanupThread(thread);
619 kumpf           1.81             _currentThreads--;
620                                  numThreadsCleanedUp++;
621                              }
622                              else
623                              {
624                                  _idleThreads.insert_first(thread);
625                              }
626 konrad.r        1.67     }
627 kumpf           1.81 
628                          PEG_METHOD_EXIT();
629                          return numThreadsCleanedUp;
630 konrad.r        1.67 }
631 mday            1.19 
632 kumpf           1.81 void ThreadPool::_cleanupThread(Thread* thread)
633 mday            1.19 {
634 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
635                      
636                          // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
637                          thread->delete_tsd("work func");
638                          thread->put_tsd(
639                              "work func", 0,
640                              sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
641                              (void *) 0);
642                          thread->delete_tsd("work parm");
643                          thread->put_tsd("work parm", 0, sizeof(void *), 0);
644                      
645                          // signal the thread's sleep semaphore to awaken it
646                          Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
647                          PEGASUS_ASSERT(sleep_sem != 0);
648                          sleep_sem->signal();
649                          thread->dereference_tsd();
650                      
651                          thread->join();
652                          delete thread;
653                      
654                          PEG_METHOD_EXIT();
655 mday            1.19 }
656                      
657 kumpf           1.81 Boolean ThreadPool::_timeIntervalExpired(
658                          struct timeval* start,
659                          struct timeval* interval)
660 mday            1.19 {
661 kumpf           1.81     // never time out if the interval is zero
662                          if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
663                          {
664                              return false;
665                          }
666                      
667                          struct timeval now, finish, remaining;
668                          Uint32 usec;
669                          pegasus_gettimeofday(&now);
670                          pegasus_gettimeofday(&remaining);    // Avoid valgrind error
671                      
672                          finish.tv_sec = start->tv_sec + interval->tv_sec;
673                          usec = start->tv_usec + interval->tv_usec;
674                          finish.tv_sec += (usec / 1000000);
675                          usec %= 1000000;
676                          finish.tv_usec = usec;
677                      
678                          return (timeval_subtract(&remaining, &finish, &now) != 0);
679 mday            1.19 }
680                      
681 kumpf           1.81 void ThreadPool::_deleteSemaphore(void *p)
682 mday            1.19 {
683 kumpf           1.81     delete (Semaphore *)p;
684 mday            1.19 }
685                      
686 kumpf           1.81 Thread* ThreadPool::_initializeThread()
687 mday            1.19 {
688 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
689                      
690                          Thread* th = (Thread *) new Thread(_loop, this, false);
691                      
692                          // allocate a sleep semaphore and pass it in the thread context
693                          // initial count is zero, loop function will sleep until
694                          // we signal the semaphore
695                          Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
696                          th->put_tsd(
697                              "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
698                      
699                          struct timeval* lastActivityTime =
700                              (struct timeval *) ::operator new(sizeof(struct timeval));
701                          pegasus_gettimeofday(lastActivityTime);
702                      
703                          th->put_tsd("last activity time", thread_data::default_delete,
704                              sizeof(struct timeval), (void *)lastActivityTime);
705                          // thread will enter _loop() and sleep on sleep_sem until we signal it
706                      
707                          if (!th->run())
708                          {
709 kumpf           1.81         delete th;
710                              return 0;
711                          }
712                          _currentThreads++;
713                          pegasus_yield();
714                      
715                          PEG_METHOD_EXIT();
716                          return th;
717 mday            1.19 }
718 mike            1.2  
719 kumpf           1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
720                      {
721                          if (th == 0)
722                          {
723                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
724                                  "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
725                              throw NullPointer();
726                          }
727                      
728                          try
729                          {
730                              _idleThreads.insert_first(th);
731                          }
732                          catch (...)
733                          {
734                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
735                                  "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
736                                      "failed.");
737                          }
738                      }
739 mike            1.2  
740                      PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2