(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.89 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.89 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12            // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2  //
 14            // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 chip  1.11 // of this software and associated documentation files (the "Software"), to
 16            // deal in the Software without restriction, including without limitation the
 17            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 19            // furnished to do so, subject to the following conditions:
 20 karl  1.89 // 
 21 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29            //
 30            //==============================================================================
 31            //
 32            // Author: Mike Day (mdday@us.ibm.com)
 33            //
 34            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 35 chip  1.11 //              added nsk platform support
 36 kumpf 1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 37 a.arora 1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 38 gs.keenan 1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 39 david.dillard 1.83 //              David Dillard, VERITAS Software Corp.
 40                    //                  (david.dillard@veritas.com)
 41 mike          1.2  //
 42                    //%/////////////////////////////////////////////////////////////////////////////
 43                    
 44                    #include "Thread.h"
 45 kumpf         1.68 #include <exception>
 46 mike          1.2  #include <Pegasus/Common/IPC.h>
 47 kumpf         1.14 #include <Pegasus/Common/Tracer.h>
 48 mike          1.2  
 49                    #if defined(PEGASUS_OS_TYPE_WINDOWS)
 50 chip          1.11 # include "ThreadWindows.cpp"
 51 mike          1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 52                    # include "ThreadUnix.cpp"
 53                    #elif defined(PEGASUS_OS_TYPE_NSK)
 54                    # include "ThreadNsk.cpp"
 55 gs.keenan     1.76 #elif defined(PEGASUS_OS_VMS)
 56                    # include "ThreadVms.cpp"
 57 mike          1.2  #else
 58                    # error "Unsupported platform"
 59                    #endif
 60                    
 61 kumpf         1.69 PEGASUS_USING_STD;
 62 mike          1.2  PEGASUS_NAMESPACE_BEGIN
 63                    
 64 mday          1.42 
 65 chip          1.11 void thread_data::default_delete(void * data)
 66                    {
 67 mike          1.2     if( data != NULL)
 68 chip          1.11       ::operator delete(data);
 69 mike          1.2  }
 70                    
 71 chuck         1.43 // l10n start
 72                    void language_delete(void * data)
 73                    {
 74                       if( data != NULL)
 75                       {
 76 kumpf         1.88       AutoPtr<AcceptLanguageList> al(static_cast<AcceptLanguageList *>(data));
 77 chuck         1.43    }
 78                    }
 79                    // l10n end
 80                    
 81 mike          1.2  Boolean Thread::_signals_blocked = false;
 82 chuck         1.37 // l10n
 83 marek         1.63 #ifndef PEGASUS_OS_ZOS
 84 w.otsuka      1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
 85 marek         1.63 #else
 86                    PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 87                    #endif
 88 chuck         1.37 Boolean Thread::_key_initialized = false;
 89 chuck         1.41 Boolean Thread::_key_error = false;
 90 chuck         1.37 
 91 mike          1.2  
 92 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
 93 mike          1.2  {
 94 a.arora       1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 95 a.arora       1.65     _cleanup.insert_first(cu.get());
 96 a.arora       1.64     cu.release();
 97 mike          1.2      return;
 98                    }
 99 kumpf         1.81 
100 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
101 mike          1.2  {
102 david.dillard 1.83     AutoPtr<cleanup_handler> cu;
103 chip          1.11     try
104                        {
105 kumpf         1.81         cu.reset(_cleanup.remove_first());
106 mike          1.2      }
107 chip          1.11     catch(IPCException&)
108 mike          1.2      {
109 kumpf         1.81         PEGASUS_ASSERT(0);
110 mike          1.2       }
111                        if(execute == true)
112 kumpf         1.81         cu->execute();
113 mike          1.2  }
114 kumpf         1.81 
115 mike          1.2  
116 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
117 mike          1.2  
118                    
119 chip          1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
120                    void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
121                    {
122                        // execute the cleanup stack and then return
123 mike          1.2     while( _cleanup.count() )
124                       {
125 chip          1.11        try
126                           {
127 kumpf         1.81            cleanup_pop(true);
128 chip          1.11        }
129                           catch(IPCException&)
130                           {
131 kumpf         1.81           PEGASUS_ASSERT(0);
132                              break;
133 mike          1.2         }
134                       }
135                       _exit_code = exit_code;
136                       exit_thread(exit_code);
137 mday          1.4     _handle.thid = 0;
138 mike          1.2  }
139                    
140                    
141                    #endif
142                    
143 chuck         1.37 // l10n start
144 chuck         1.39 Sint8 Thread::initializeKey()
145                    {
146 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
147                        if (!Thread::_key_initialized)
148                        {
149                            if (Thread::_key_error)
150                            {
151                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
152                                    "Thread: ERROR - thread key error");
153                                return -1;
154                            }
155                    
156                            if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
157                            {
158                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
159                                    "Thread: able to create a thread key");
160                                Thread::_key_initialized = true;
161                            }
162                            else
163                            {
164                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
165                                    "Thread: ERROR - unable to create a thread key");
166                                Thread::_key_error = true;
167 kumpf         1.81             return -1;
168                            }
169                        }
170 chuck         1.39 
171 kumpf         1.81     PEG_METHOD_EXIT();
172                        return 0;
173 chuck         1.39 }
174                    
175 chuck         1.37 Thread * Thread::getCurrent()
176                    {
177 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
178 chuck         1.40     if (Thread::initializeKey() != 0)
179 chuck         1.39     {
180 kumpf         1.81         return NULL;
181 chuck         1.39     }
182 kumpf         1.81     PEG_METHOD_EXIT();
183                        return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
184 chuck         1.39 }
185                    
186                    void Thread::setCurrent(Thread * thrd)
187                    {
188 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
189                        if (Thread::initializeKey() == 0)
190                        {
191                            if (pegasus_set_thread_specific(
192                                   Thread::_platform_thread_key, (void *) thrd) == 0)
193 chuck         1.39         {
194 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
195                                    "Successful set Thread * into thread specific storage");
196 chuck         1.39         }
197                            else
198                            {
199 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
200                                    "ERROR: error setting Thread * into thread specific storage");
201 chuck         1.39         }
202 kumpf         1.81     }
203                        PEG_METHOD_EXIT();
204 chuck         1.37 }
205                    
206 kumpf         1.88 AcceptLanguageList * Thread::getLanguages()
207 chuck         1.37 {
208 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
209                    
210                        Thread * curThrd = Thread::getCurrent();
211                        if (curThrd == NULL)
212                            return NULL;
213 kumpf         1.88     AcceptLanguageList * acceptLangs =
214                            (AcceptLanguageList *)curThrd->reference_tsd("acceptLanguages");
215 kumpf         1.81     curThrd->dereference_tsd();
216                        PEG_METHOD_EXIT();
217                        return acceptLangs;
218 chuck         1.37 }
219                    
220 kumpf         1.88 void Thread::setLanguages(AcceptLanguageList *langs) //l10n
221 chuck         1.37 {
222 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
223                    
224                        Thread* currentThrd = Thread::getCurrent();
225                        if (currentThrd != NULL)
226                        {
227                            // deletes the old tsd and creates a new one
228                            currentThrd->put_tsd("acceptLanguages",
229                                language_delete,
230 kumpf         1.88             sizeof(AcceptLanguageList *),
231 kumpf         1.81             langs);
232                        }
233                    
234                        PEG_METHOD_EXIT();
235 chuck         1.37 }
236                    
237                    void Thread::clearLanguages() //l10n
238                    {
239 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
240                    
241                        Thread * currentThrd = Thread::getCurrent();
242                        if (currentThrd != NULL)
243                        {
244                            // deletes the old tsd
245                            currentThrd->delete_tsd("acceptLanguages");
246                        }
247                    
248                        PEG_METHOD_EXIT();
249 chuck         1.37 }
250 kumpf         1.81 // l10n end
251 chuck         1.37 
252 mday          1.52 
253 kumpf         1.81 ///////////////////////////////////////////////////////////////////////////////
254                    //
255                    // ThreadPool
256                    //
257                    ///////////////////////////////////////////////////////////////////////////////
258                    
259                    ThreadPool::ThreadPool(
260                        Sint16 initialSize,
261                        const char* key,
262                        Sint16 minThreads,
263                        Sint16 maxThreads,
264                        struct timeval& deallocateWait)
265                        : _maxThreads(maxThreads),
266                          _minThreads(minThreads),
267                          _currentThreads(0),
268                          _idleThreads(true),
269                          _runningThreads(true),
270                          _dying(0)
271 mday          1.58 {
272 kumpf         1.81     _deallocateWait.tv_sec = deallocateWait.tv_sec;
273                        _deallocateWait.tv_usec = deallocateWait.tv_usec;
274 mday          1.58 
275 kumpf         1.81     memset(_key, 0x00, 17);
276                        if (key != 0)
277                        {
278                            strncpy(_key, key, 16);
279                        }
280                    
281                        if ((_maxThreads > 0) && (_maxThreads < initialSize))
282                        {
283                            _maxThreads = initialSize;
284                        }
285 mday          1.58 
286 kumpf         1.81     if (_minThreads > initialSize)
287                        {
288                            _minThreads = initialSize;
289                        }
290 mday          1.52 
291 kumpf         1.81     for (int i = 0; i < initialSize; i++)
292                        {
293                            _addToIdleThreadsQueue(_initializeThread());
294                        }
295                    }
296 mday          1.20 
297 kumpf         1.81 ThreadPool::~ThreadPool()
298 mday          1.20 {
299 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
300 konrad.r      1.86 
301 kumpf         1.81     try
302                        {
303                            // Set the dying flag so all thread know the destructor has been entered
304                            _dying++;
305 konrad.r      1.86        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
306 mike          1.87 		"Cleaning up %d idle threads. ", _currentThreads.get());
307                            while (_currentThreads.get() > 0)
308 kumpf         1.81         {
309                                Thread* thread = _idleThreads.remove_first();
310                                if (thread != 0)
311                                {
312                                    _cleanupThread(thread);
313                                    _currentThreads--;
314                                }
315                                else
316                                {
317                                    pegasus_yield();
318                                }
319                            }
320                        }
321                        catch (...)
322                        {
323                        }
324 mday          1.20 }
325                    
326 kumpf         1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
327                    {
328                        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
329                    
330                        try
331                        {
332 kumpf         1.82         Thread* myself = (Thread *)parm;
333                            PEGASUS_ASSERT(myself != 0);
334 kumpf         1.81 
335 kumpf         1.82         // Set myself into thread specific storage
336                            // This will allow code to get its own Thread
337                            Thread::setCurrent(myself);
338 kumpf         1.81 
339 kumpf         1.82         ThreadPool* pool = (ThreadPool *)myself->get_parm();
340                            PEGASUS_ASSERT(pool != 0);
341 mike          1.2  
342 kumpf         1.82         Semaphore* sleep_sem = 0;
343                            struct timeval* lastActivityTime = 0;
344 chuck         1.39 
345 kumpf         1.81         try
346                            {
347 kumpf         1.82             sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
348 kumpf         1.81             myself->dereference_tsd();
349 kumpf         1.82             PEGASUS_ASSERT(sleep_sem != 0);
350                    
351                                lastActivityTime =
352                                    (struct timeval *)myself->reference_tsd("last activity time");
353 kumpf         1.81             myself->dereference_tsd();
354 kumpf         1.82             PEGASUS_ASSERT(lastActivityTime != 0);
355 kumpf         1.81         }
356                            catch (...)
357                            {
358                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
359 kumpf         1.82                 "ThreadPool::_loop: Failure getting sleep_sem or "
360                                        "lastActivityTime.");
361 kumpf         1.81             PEGASUS_ASSERT(false);
362                                pool->_idleThreads.remove(myself);
363                                pool->_currentThreads--;
364                                PEG_METHOD_EXIT();
365                                return((PEGASUS_THREAD_RETURN)1);
366                            }
367 mday          1.52 
368 kumpf         1.82         while (1)
369 kumpf         1.81         {
370 kumpf         1.82             try
371                                {
372                                    sleep_sem->wait();
373                                }
374                                catch (...)
375                                {
376                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
377                                        "ThreadPool::_loop: failure on sleep_sem->wait().");
378                                    PEGASUS_ASSERT(false);
379                                    pool->_idleThreads.remove(myself);
380                                    pool->_currentThreads--;
381                                    PEG_METHOD_EXIT();
382                                    return((PEGASUS_THREAD_RETURN)1);
383                                }
384                    
385                                // When we awaken we reside on the _runningThreads queue, not the
386                                // _idleThreads queue.
387                    
388                                PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
389                                void* parm = 0;
390                                Semaphore* blocking_sem = 0;
391 kumpf         1.82 
392                                try
393                                {
394                                    work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
395                                        myself->reference_tsd("work func");
396                                    myself->dereference_tsd();
397                                    parm = myself->reference_tsd("work parm");
398                                    myself->dereference_tsd();
399                                    blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
400                                    myself->dereference_tsd();
401                                }
402                                catch (...)
403                                {
404                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
405                                        "ThreadPool::_loop: Failure accessing work func, work parm, "
406                                            "or blocking sem.");
407                                    PEGASUS_ASSERT(false);
408                                    pool->_idleThreads.remove(myself);
409                                    pool->_currentThreads--;
410                                    PEG_METHOD_EXIT();
411                                    return((PEGASUS_THREAD_RETURN)1);
412 kumpf         1.82             }
413                    
414                                if (work == 0)
415                                {
416 carolann.graves 1.84                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
417 kumpf           1.82                     "ThreadPool::_loop: work func is 0, meaning we should exit.");
418                                      break;
419                                  }
420 mike            1.2  
421 kumpf           1.82             gettimeofday(lastActivityTime, NULL);
422 konrad.r        1.67 
423 kumpf           1.82             try
424                                  {
425                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
426                                      work(parm);
427                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
428                                  }
429                                  catch (Exception & e)
430                                  {
431                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
432                                          String("Exception from work in ThreadPool::_loop: ") +
433                                              e.getMessage());
434                                  }
435 kumpf           1.68 #if !defined(PEGASUS_OS_LSB)
436 konrad.r        1.86             catch (const exception& e)
437 kumpf           1.82             {
438                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
439                                          String("Exception from work in ThreadPool::_loop: ") +
440                                              e.what());
441                                  }
442 kumpf           1.68 #endif
443 kumpf           1.82             catch (...)
444                                  {
445                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
446                                          "Unknown exception from work in ThreadPool::_loop.");
447                                  }
448 kumpf           1.81 
449 kumpf           1.82             // put myself back onto the available list
450                                  try
451 kumpf           1.57             {
452 kumpf           1.82                 gettimeofday(lastActivityTime, NULL);
453                                      if (blocking_sem != 0)
454                                      {
455                                          blocking_sem->signal();
456                                      }
457 s.hills         1.49 
458 kumpf           1.82                 Boolean removed = pool->_runningThreads.remove((void *)myself);
459                                      PEGASUS_ASSERT(removed);
460 s.hills         1.49 
461 kumpf           1.82                 pool->_idleThreads.insert_first(myself);
462                                  }
463                                  catch (...)
464                                  {
465                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
466                                          "ThreadPool::_loop: Adding thread to idle pool failed.");
467                                      PEGASUS_ASSERT(false);
468                                      pool->_currentThreads--;
469                                      PEG_METHOD_EXIT();
470                                      return((PEGASUS_THREAD_RETURN)1);
471                                  }
472 kumpf           1.81         }
473 kumpf           1.82     }
474                          catch (const Exception& e)
475                          {
476                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
477                                  "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
478                          }
479                          catch (...)
480                          {
481                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
482                                  "Caught unrecognized exception.  Exiting _loop.");
483 kumpf           1.81     }
484 kumpf           1.14 
485 kumpf           1.81     PEG_METHOD_EXIT();
486                          return((PEGASUS_THREAD_RETURN)0);
487 mike            1.2  }
488                      
489 konrad.r        1.86 ThreadStatus ThreadPool::allocate_and_awaken(
490 kumpf           1.81     void* parm,
491                          PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
492                          Semaphore* blocking)
493 mike            1.2  {
494 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
495                      
496                          // Allocate_and_awaken will not run if the _dying flag is set.
497                          // Once the lock is acquired, ~ThreadPool will not change
498                          // the value of _dying until the lock is released.
499                      
500                          try
501                          {
502 mike            1.87         if (_dying.get())
503 kumpf           1.81         {
504                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
505                                      "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
506 konrad.r        1.86             return PEGASUS_THREAD_UNAVAILABLE;
507 kumpf           1.81         }
508                              struct timeval start;
509                              gettimeofday(&start, NULL);
510                              Thread* th = 0;
511                      
512                              th = _idleThreads.remove_first();
513 kumpf           1.57 
514 kumpf           1.81         if (th == 0)
515                              {
516 mike            1.87             if ((_maxThreads == 0) || 
517                      		(_currentThreads.get() < Uint32(_maxThreads)))
518 kumpf           1.81             {
519                                      th = _initializeThread();
520                                  }
521                              }
522                      
523                              if (th == 0)
524                              {
525 kumpf           1.89.2.1             Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
526 kumpf           1.81                     "ThreadPool::allocate_and_awaken: Insufficient resources: "
527                                              " pool = %s, running threads = %d, idle threads = %d",
528                                          _key, _runningThreads.count(), _idleThreads.count());
529 konrad.r        1.86                 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
530 kumpf           1.81             }
531 mike            1.2      
532 kumpf           1.81             // initialize the thread data with the work function and parameters
533                                  Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
534                                      "Initializing thread with work function and parameters: parm = %p",
535                                      parm);
536 mike            1.2      
537 kumpf           1.81             th->delete_tsd("work func");
538                                  th->put_tsd("work func", NULL,
539 kumpf           1.70                 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
540 kumpf           1.81                 (void *)work);
541                                  th->delete_tsd("work parm");
542                                  th->put_tsd("work parm", NULL, sizeof(void *), parm);
543                                  th->delete_tsd("blocking sem");
544                                  if (blocking != 0)
545                                      th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
546                          
547                                  // put the thread on the running list
548                                  _runningThreads.insert_first(th);
549                          
550                                  // signal the thread's sleep semaphore to awaken it
551                                  Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
552                                  PEGASUS_ASSERT(sleep_sem != 0);
553                          
554                                  Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
555                                  sleep_sem->signal();
556                                  th->dereference_tsd();
557 kumpf           1.57         }
558 mday            1.58         catch (...)
559 kumpf           1.57         {
560 kumpf           1.81             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
561                                      "ThreadPool::allocate_and_awaken: Operation Failed.");
562                                  PEG_METHOD_EXIT();
563                                  // ATTN: Error result has not yet been defined
564 konrad.r        1.86             return PEGASUS_THREAD_SETUP_FAILURE;
565 kumpf           1.57         }
566 kumpf           1.81         PEG_METHOD_EXIT();
567 konrad.r        1.86         return PEGASUS_THREAD_OK;
568 mike            1.2      }
569                          
570 kumpf           1.81     // caller is responsible for only calling this routine during slack periods
571                          // but should call it at least once per _deallocateWait interval.
572 mday            1.12     
573 kumpf           1.81     Uint32 ThreadPool::cleanupIdleThreads()
574 mike            1.2      {
575 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
576                          
577                              Uint32 numThreadsCleanedUp = 0;
578                          
579                              Uint32 numIdleThreads = _idleThreads.count();
580                              for (Uint32 i = 0; i < numIdleThreads; i++)
581                              {
582                                  // Do not dip below the minimum thread count
583 mike            1.87             if (_currentThreads.get() <= (Uint32)_minThreads)
584 kumpf           1.81             {
585                                      break;
586                                  }
587                          
588                                  Thread* thread = _idleThreads.remove_last();
589                          
590                                  // If there are no more threads in the _idleThreads queue, we're done.
591                                  if (thread == 0)
592                                  {
593                                      break;
594                                  }
595                          
596                                  struct timeval* lastActivityTime;
597                                  try
598                                  {
599                                      lastActivityTime = (struct timeval *)thread->try_reference_tsd(
600                                          "last activity time");
601                                      PEGASUS_ASSERT(lastActivityTime != 0);
602                                  }
603                                  catch (...)
604                                  {
605 kumpf           1.81                 PEGASUS_ASSERT(false);
606                                      _idleThreads.insert_last(thread);
607                                      break;
608                                  }
609                          
610                                  Boolean cleanupThisThread =
611                                      _timeIntervalExpired(lastActivityTime, &_deallocateWait);
612                                  thread->dereference_tsd();
613                          
614                                  if (cleanupThisThread)
615                                  {
616                                      _cleanupThread(thread);
617                                      _currentThreads--;
618                                      numThreadsCleanedUp++;
619                                  }
620                                  else
621                                  {
622                                      _idleThreads.insert_first(thread);
623                                  }
624 konrad.r        1.67         }
625 kumpf           1.81     
626                              PEG_METHOD_EXIT();
627                              return numThreadsCleanedUp;
628 konrad.r        1.67     }
629 mday            1.19     
630 kumpf           1.81     void ThreadPool::_cleanupThread(Thread* thread)
631 mday            1.19     {
632 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
633                          
634                              // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
635                              thread->delete_tsd("work func");
636                              thread->put_tsd(
637                                  "work func", 0,
638                                  sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
639                                  (void *) 0);
640                              thread->delete_tsd("work parm");
641                              thread->put_tsd("work parm", 0, sizeof(void *), 0);
642                          
643                              // signal the thread's sleep semaphore to awaken it
644                              Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
645                              PEGASUS_ASSERT(sleep_sem != 0);
646                              sleep_sem->signal();
647                              thread->dereference_tsd();
648                          
649                              thread->join();
650                              delete thread;
651                          
652                              PEG_METHOD_EXIT();
653 mday            1.19     }
654                          
655 kumpf           1.81     Boolean ThreadPool::_timeIntervalExpired(
656                              struct timeval* start,
657                              struct timeval* interval)
658 mday            1.19     {
659 kumpf           1.81         // never time out if the interval is zero
660                              if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
661                              {
662                                  return false;
663                              }
664                          
665                              struct timeval now, finish, remaining;
666                              Uint32 usec;
667                              pegasus_gettimeofday(&now);
668                              pegasus_gettimeofday(&remaining);    // Avoid valgrind error
669                          
670                              finish.tv_sec = start->tv_sec + interval->tv_sec;
671                              usec = start->tv_usec + interval->tv_usec;
672                              finish.tv_sec += (usec / 1000000);
673                              usec %= 1000000;
674                              finish.tv_usec = usec;
675                          
676                              return (timeval_subtract(&remaining, &finish, &now) != 0);
677 mday            1.19     }
678                          
679 kumpf           1.81     void ThreadPool::_deleteSemaphore(void *p)
680 mday            1.19     {
681 kumpf           1.81         delete (Semaphore *)p;
682 mday            1.19     }
683                          
684 kumpf           1.81     Thread* ThreadPool::_initializeThread()
685 mday            1.19     {
686 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
687                          
688                              Thread* th = (Thread *) new Thread(_loop, this, false);
689                          
690                              // allocate a sleep semaphore and pass it in the thread context
691                              // initial count is zero, loop function will sleep until
692                              // we signal the semaphore
693                              Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
694                              th->put_tsd(
695                                  "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
696                          
697                              struct timeval* lastActivityTime =
698                                  (struct timeval *) ::operator new(sizeof(struct timeval));
699                              pegasus_gettimeofday(lastActivityTime);
700                          
701                              th->put_tsd("last activity time", thread_data::default_delete,
702                                  sizeof(struct timeval), (void *)lastActivityTime);
703                              // thread will enter _loop() and sleep on sleep_sem until we signal it
704                          
705 konrad.r        1.86         if (th->run() != PEGASUS_THREAD_OK)
706 kumpf           1.81         {
707 konrad.r        1.86     		Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
708                          			"Could not create thread. Error code is %d.", errno);
709 kumpf           1.81             delete th;
710                                  return 0;
711                              }
712                              _currentThreads++;
713                              pegasus_yield();
714                          
715                              PEG_METHOD_EXIT();
716                              return th;
717 mday            1.19     }
718 mike            1.2      
719 kumpf           1.81     void ThreadPool::_addToIdleThreadsQueue(Thread* th)
720                          {
721                              if (th == 0)
722                              {
723                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
724                                      "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
725                                  throw NullPointer();
726                              }
727                          
728                              try
729                              {
730                                  _idleThreads.insert_first(th);
731                              }
732                              catch (...)
733                              {
734                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
735                                      "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
736                                          "failed.");
737                              }
738                          }
739 mike            1.2      
740                          PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2