(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.75 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 chip  1.11 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 david.dillard 1.83 //
 19 chip          1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike          1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21                    // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 chip          1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23                    // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24                    // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike          1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26                    // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //==============================================================================
 29                    //
 30                    // Author: Mike Day (mdday@us.ibm.com)
 31                    //
 32                    // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 33 chip          1.11 //              added nsk platform support
 34 kumpf         1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 35 a.arora       1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 36 gs.keenan     1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 37 david.dillard 1.83 //              David Dillard, VERITAS Software Corp.
 38                    //                  (david.dillard@veritas.com)
 39 mike          1.2  //
 40                    //%/////////////////////////////////////////////////////////////////////////////
 41                    
 42                    #include "Thread.h"
 43 kumpf         1.68 #include <exception>
 44 mike          1.2  #include <Pegasus/Common/IPC.h>
 45 kumpf         1.14 #include <Pegasus/Common/Tracer.h>
 46 mike          1.2  
 47                    #if defined(PEGASUS_OS_TYPE_WINDOWS)
 48 chip          1.11 # include "ThreadWindows.cpp"
 49 mike          1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 50                    # include "ThreadUnix.cpp"
 51                    #elif defined(PEGASUS_OS_TYPE_NSK)
 52                    # include "ThreadNsk.cpp"
 53 gs.keenan     1.76 #elif defined(PEGASUS_OS_VMS)
 54                    # include "ThreadVms.cpp"
 55 mike          1.2  #else
 56                    # error "Unsupported platform"
 57                    #endif
 58                    
 59 kumpf         1.69 PEGASUS_USING_STD;
 60 mike          1.2  PEGASUS_NAMESPACE_BEGIN
 61                    
 62 mday          1.42 
 63 chip          1.11 void thread_data::default_delete(void * data)
 64                    {
 65 mike          1.2     if( data != NULL)
 66 chip          1.11       ::operator delete(data);
 67 mike          1.2  }
 68                    
 69 chuck         1.43 // l10n start
 70                    void language_delete(void * data)
 71                    {
 72                       if( data != NULL)
 73                       {
 74 a.arora       1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
 75 chuck         1.43    }
 76                    }
 77                    // l10n end
 78                    
 79 mike          1.2  Boolean Thread::_signals_blocked = false;
 80 chuck         1.37 // l10n
 81 marek         1.63 #ifndef PEGASUS_OS_ZOS
 82 w.otsuka      1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
 83 marek         1.63 #else
 84                    PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 85                    #endif
 86 chuck         1.37 Boolean Thread::_key_initialized = false;
 87 chuck         1.41 Boolean Thread::_key_error = false;
 88 chuck         1.37 
 89 mike          1.2  
 90 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
 91 mike          1.2  {
 92 a.arora       1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 93 a.arora       1.65     _cleanup.insert_first(cu.get());
 94 a.arora       1.64     cu.release();
 95 mike          1.2      return;
 96                    }
 97 kumpf         1.81 
 98 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
 99 mike          1.2  {
100 david.dillard 1.83     AutoPtr<cleanup_handler> cu;
101 chip          1.11     try
102                        {
103 kumpf         1.81         cu.reset(_cleanup.remove_first());
104 mike          1.2      }
105 chip          1.11     catch(IPCException&)
106 mike          1.2      {
107 kumpf         1.81         PEGASUS_ASSERT(0);
108 mike          1.2       }
109                        if(execute == true)
110 kumpf         1.81         cu->execute();
111 mike          1.2  }
112 kumpf         1.81 
113 mike          1.2  
114 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
115 mike          1.2  
116                    
117 chip          1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
118                    void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
119                    {
120                        // execute the cleanup stack and then return
121 mike          1.2     while( _cleanup.count() )
122                       {
123 chip          1.11        try
124                           {
125 kumpf         1.81            cleanup_pop(true);
126 chip          1.11        }
127                           catch(IPCException&)
128                           {
129 kumpf         1.81           PEGASUS_ASSERT(0);
130                              break;
131 mike          1.2         }
132                       }
133                       _exit_code = exit_code;
134                       exit_thread(exit_code);
135 mday          1.4     _handle.thid = 0;
136 mike          1.2  }
137                    
138                    
139                    #endif
140                    
141 chuck         1.37 // l10n start
142 chuck         1.39 Sint8 Thread::initializeKey()
143                    {
144 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
145                        if (!Thread::_key_initialized)
146                        {
147                            if (Thread::_key_error)
148                            {
149                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
150                                    "Thread: ERROR - thread key error");
151                                return -1;
152                            }
153                    
154                            if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
155                            {
156                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157                                    "Thread: able to create a thread key");
158                                Thread::_key_initialized = true;
159                            }
160                            else
161                            {
162                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
163                                    "Thread: ERROR - unable to create a thread key");
164                                Thread::_key_error = true;
165 kumpf         1.81             return -1;
166                            }
167                        }
168 chuck         1.39 
169 kumpf         1.81     PEG_METHOD_EXIT();
170                        return 0;
171 chuck         1.39 }
172                    
173 chuck         1.37 Thread * Thread::getCurrent()
174                    {
175 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
176 chuck         1.40     if (Thread::initializeKey() != 0)
177 chuck         1.39     {
178 kumpf         1.81         return NULL;
179 chuck         1.39     }
180 kumpf         1.81     PEG_METHOD_EXIT();
181                        return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
182 chuck         1.39 }
183                    
184                    void Thread::setCurrent(Thread * thrd)
185                    {
186 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
187                        if (Thread::initializeKey() == 0)
188                        {
189                            if (pegasus_set_thread_specific(
190                                   Thread::_platform_thread_key, (void *) thrd) == 0)
191 chuck         1.39         {
192 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
193                                    "Successful set Thread * into thread specific storage");
194 chuck         1.39         }
195                            else
196                            {
197 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
198                                    "ERROR: error setting Thread * into thread specific storage");
199 chuck         1.39         }
200 kumpf         1.81     }
201                        PEG_METHOD_EXIT();
202 chuck         1.37 }
203                    
204                    AcceptLanguages * Thread::getLanguages()
205                    {
206 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
207                    
208                        Thread * curThrd = Thread::getCurrent();
209                        if (curThrd == NULL)
210                            return NULL;
211                        AcceptLanguages * acceptLangs =
212                            (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
213                        curThrd->dereference_tsd();
214                        PEG_METHOD_EXIT();
215                        return acceptLangs;
216 chuck         1.37 }
217                    
218                    void Thread::setLanguages(AcceptLanguages *langs) //l10n
219                    {
220 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
221                    
222                        Thread* currentThrd = Thread::getCurrent();
223                        if (currentThrd != NULL)
224                        {
225                            // deletes the old tsd and creates a new one
226                            currentThrd->put_tsd("acceptLanguages",
227                                language_delete,
228                                sizeof(AcceptLanguages *),
229                                langs);
230                        }
231                    
232                        PEG_METHOD_EXIT();
233 chuck         1.37 }
234                    
235                    void Thread::clearLanguages() //l10n
236                    {
237 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
238                    
239                        Thread * currentThrd = Thread::getCurrent();
240                        if (currentThrd != NULL)
241                        {
242                            // deletes the old tsd
243                            currentThrd->delete_tsd("acceptLanguages");
244                        }
245                    
246                        PEG_METHOD_EXIT();
247 chuck         1.37 }
248 kumpf         1.81 // l10n end
249 chuck         1.37 
250 mday          1.52 
251 kumpf         1.81 ///////////////////////////////////////////////////////////////////////////////
252                    //
253                    // ThreadPool
254                    //
255                    ///////////////////////////////////////////////////////////////////////////////
256                    
257                    ThreadPool::ThreadPool(
258                        Sint16 initialSize,
259                        const char* key,
260                        Sint16 minThreads,
261                        Sint16 maxThreads,
262                        struct timeval& deallocateWait)
263                        : _maxThreads(maxThreads),
264                          _minThreads(minThreads),
265                          _currentThreads(0),
266                          _idleThreads(true),
267                          _runningThreads(true),
268                          _dying(0)
269 mday          1.58 {
270 kumpf         1.81     _deallocateWait.tv_sec = deallocateWait.tv_sec;
271                        _deallocateWait.tv_usec = deallocateWait.tv_usec;
272 mday          1.58 
273 kumpf         1.81     memset(_key, 0x00, 17);
274                        if (key != 0)
275                        {
276                            strncpy(_key, key, 16);
277                        }
278                    
279                        if ((_maxThreads > 0) && (_maxThreads < initialSize))
280                        {
281                            _maxThreads = initialSize;
282                        }
283 mday          1.58 
284 kumpf         1.81     if (_minThreads > initialSize)
285                        {
286                            _minThreads = initialSize;
287                        }
288 mday          1.52 
289 kumpf         1.81     for (int i = 0; i < initialSize; i++)
290                        {
291                            _addToIdleThreadsQueue(_initializeThread());
292                        }
293                    }
294 mday          1.20 
295 kumpf         1.81 ThreadPool::~ThreadPool()
296 mday          1.20 {
297 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
298 konrad.r      1.86 
299 kumpf         1.81     try
300                        {
301                            // Set the dying flag so all thread know the destructor has been entered
302                            _dying++;
303 konrad.r      1.86        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
304                    		"Cleaning up %d idle threads. ", _currentThreads.value());
305 kumpf         1.81         while (_currentThreads.value() > 0)
306                            {
307                                Thread* thread = _idleThreads.remove_first();
308                                if (thread != 0)
309                                {
310                                    _cleanupThread(thread);
311                                    _currentThreads--;
312                                }
313                                else
314                                {
315                                    pegasus_yield();
316                                }
317                            }
318                        }
319                        catch (...)
320                        {
321                        }
322 mday          1.20 }
323                    
324 kumpf         1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
325                    {
326                        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
327                    
328                        try
329                        {
330 kumpf         1.82         Thread* myself = (Thread *)parm;
331                            PEGASUS_ASSERT(myself != 0);
332 kumpf         1.81 
333 kumpf         1.82         // Set myself into thread specific storage
334                            // This will allow code to get its own Thread
335                            Thread::setCurrent(myself);
336 kumpf         1.81 
337 kumpf         1.82         ThreadPool* pool = (ThreadPool *)myself->get_parm();
338                            PEGASUS_ASSERT(pool != 0);
339 mike          1.2  
340 kumpf         1.82         Semaphore* sleep_sem = 0;
341                            struct timeval* lastActivityTime = 0;
342 chuck         1.39 
343 kumpf         1.81         try
344                            {
345 kumpf         1.82             sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
346 kumpf         1.81             myself->dereference_tsd();
347 kumpf         1.82             PEGASUS_ASSERT(sleep_sem != 0);
348                    
349                                lastActivityTime =
350                                    (struct timeval *)myself->reference_tsd("last activity time");
351 kumpf         1.81             myself->dereference_tsd();
352 kumpf         1.82             PEGASUS_ASSERT(lastActivityTime != 0);
353 kumpf         1.81         }
354                            catch (...)
355                            {
356                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
357 kumpf         1.82                 "ThreadPool::_loop: Failure getting sleep_sem or "
358                                        "lastActivityTime.");
359 kumpf         1.81             PEGASUS_ASSERT(false);
360                                pool->_idleThreads.remove(myself);
361                                pool->_currentThreads--;
362                                PEG_METHOD_EXIT();
363                                return((PEGASUS_THREAD_RETURN)1);
364                            }
365 mday          1.52 
366 kumpf         1.82         while (1)
367 kumpf         1.81         {
368 kumpf         1.82             try
369                                {
370                                    sleep_sem->wait();
371                                }
372                                catch (...)
373                                {
374                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
375                                        "ThreadPool::_loop: failure on sleep_sem->wait().");
376                                    PEGASUS_ASSERT(false);
377                                    pool->_idleThreads.remove(myself);
378                                    pool->_currentThreads--;
379                                    PEG_METHOD_EXIT();
380                                    return((PEGASUS_THREAD_RETURN)1);
381                                }
382                    
383                                // When we awaken we reside on the _runningThreads queue, not the
384                                // _idleThreads queue.
385                    
386                                PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
387                                void* parm = 0;
388                                Semaphore* blocking_sem = 0;
389 kumpf         1.82 
390                                try
391                                {
392                                    work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
393                                        myself->reference_tsd("work func");
394                                    myself->dereference_tsd();
395                                    parm = myself->reference_tsd("work parm");
396                                    myself->dereference_tsd();
397                                    blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
398                                    myself->dereference_tsd();
399                                }
400                                catch (...)
401                                {
402                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
403                                        "ThreadPool::_loop: Failure accessing work func, work parm, "
404                                            "or blocking sem.");
405                                    PEGASUS_ASSERT(false);
406                                    pool->_idleThreads.remove(myself);
407                                    pool->_currentThreads--;
408                                    PEG_METHOD_EXIT();
409                                    return((PEGASUS_THREAD_RETURN)1);
410 kumpf         1.82             }
411                    
412                                if (work == 0)
413                                {
414 carolann.graves 1.84                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
415 kumpf           1.82                     "ThreadPool::_loop: work func is 0, meaning we should exit.");
416                                      break;
417                                  }
418 mike            1.2  
419 kumpf           1.82             gettimeofday(lastActivityTime, NULL);
420 konrad.r        1.67 
421 kumpf           1.82             try
422                                  {
423                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
424                                      work(parm);
425                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
426                                  }
427                                  catch (Exception & e)
428                                  {
429                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
430                                          String("Exception from work in ThreadPool::_loop: ") +
431                                              e.getMessage());
432                                  }
433 kumpf           1.68 #if !defined(PEGASUS_OS_LSB)
434 konrad.r        1.86             catch (const exception& e)
435 kumpf           1.82             {
436                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
437                                          String("Exception from work in ThreadPool::_loop: ") +
438                                              e.what());
439                                  }
440 kumpf           1.68 #endif
441 kumpf           1.82             catch (...)
442                                  {
443                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
444                                          "Unknown exception from work in ThreadPool::_loop.");
445                                  }
446 kumpf           1.81 
447 kumpf           1.82             // put myself back onto the available list
448                                  try
449 kumpf           1.57             {
450 kumpf           1.82                 gettimeofday(lastActivityTime, NULL);
451                                      if (blocking_sem != 0)
452                                      {
453                                          blocking_sem->signal();
454                                      }
455 s.hills         1.49 
456 kumpf           1.82                 Boolean removed = pool->_runningThreads.remove((void *)myself);
457                                      PEGASUS_ASSERT(removed);
458 s.hills         1.49 
459 kumpf           1.82                 pool->_idleThreads.insert_first(myself);
460                                  }
461                                  catch (...)
462                                  {
463                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
464                                          "ThreadPool::_loop: Adding thread to idle pool failed.");
465                                      PEGASUS_ASSERT(false);
466                                      pool->_currentThreads--;
467                                      PEG_METHOD_EXIT();
468                                      return((PEGASUS_THREAD_RETURN)1);
469                                  }
470 kumpf           1.81         }
471 kumpf           1.82     }
472                          catch (const Exception& e)
473                          {
474                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
475                                  "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
476                          }
477                          catch (...)
478                          {
479                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
480                                  "Caught unrecognized exception.  Exiting _loop.");
481 kumpf           1.81     }
482 kumpf           1.14 
483 kumpf           1.81     PEG_METHOD_EXIT();
484                          return((PEGASUS_THREAD_RETURN)0);
485 mike            1.2  }
486                      
487 konrad.r        1.86 ThreadStatus ThreadPool::allocate_and_awaken(
488 kumpf           1.81     void* parm,
489                          PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
490                          Semaphore* blocking)
491 mike            1.2  {
492 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
493                      
494                          // Allocate_and_awaken will not run if the _dying flag is set.
495                          // Once the lock is acquired, ~ThreadPool will not change
496                          // the value of _dying until the lock is released.
497                      
498                          try
499                          {
500                              if (_dying.value())
501                              {
502                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503                                      "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
504 konrad.r        1.86             return PEGASUS_THREAD_UNAVAILABLE;
505 kumpf           1.81         }
506                              struct timeval start;
507                              gettimeofday(&start, NULL);
508                              Thread* th = 0;
509                      
510                              th = _idleThreads.remove_first();
511 kumpf           1.57 
512 kumpf           1.81         if (th == 0)
513                              {
514                                  if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
515                                  {
516                                      th = _initializeThread();
517                                  }
518                              }
519                      
520                              if (th == 0)
521                              {
522                                  // ATTN-DME-P3-20031103: This trace message should not be
523                                  // be labeled TRC_DISCARDED_DATA, because it does not
524                                  // necessarily imply that a failure has occurred.  However,
525                                  // this label is being used temporarily to help isolate
526                                  // the cause of client timeout problems.
527                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
528                                      "ThreadPool::allocate_and_awaken: Insufficient resources: "
529                                          " pool = %s, running threads = %d, idle threads = %d",
530                                      _key, _runningThreads.count(), _idleThreads.count());
531 konrad.r        1.86             return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
532 kumpf           1.81         }
533 mike            1.2  
534 kumpf           1.81         // initialize the thread data with the work function and parameters
535                              Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
536                                  "Initializing thread with work function and parameters: parm = %p",
537                                  parm);
538 mike            1.2  
539 kumpf           1.81         th->delete_tsd("work func");
540                              th->put_tsd("work func", NULL,
541 kumpf           1.70             sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
542 kumpf           1.81             (void *)work);
543                              th->delete_tsd("work parm");
544                              th->put_tsd("work parm", NULL, sizeof(void *), parm);
545                              th->delete_tsd("blocking sem");
546                              if (blocking != 0)
547                                  th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
548                      
549                              // put the thread on the running list
550                              _runningThreads.insert_first(th);
551                      
552                              // signal the thread's sleep semaphore to awaken it
553                              Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
554                              PEGASUS_ASSERT(sleep_sem != 0);
555                      
556                              Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
557                              sleep_sem->signal();
558                              th->dereference_tsd();
559 kumpf           1.57     }
560 mday            1.58     catch (...)
561 kumpf           1.57     {
562 kumpf           1.81         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
563                                  "ThreadPool::allocate_and_awaken: Operation Failed.");
564                              PEG_METHOD_EXIT();
565                              // ATTN: Error result has not yet been defined
566 konrad.r        1.86         return PEGASUS_THREAD_SETUP_FAILURE;
567 kumpf           1.57     }
568 kumpf           1.81     PEG_METHOD_EXIT();
569 konrad.r        1.86     return PEGASUS_THREAD_OK;
570 mike            1.2  }
571                      
572 kumpf           1.81 // caller is responsible for only calling this routine during slack periods
573                      // but should call it at least once per _deallocateWait interval.
574 mday            1.12 
575 kumpf           1.81 Uint32 ThreadPool::cleanupIdleThreads()
576 mike            1.2  {
577 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
578                      
579                          Uint32 numThreadsCleanedUp = 0;
580                      
581                          Uint32 numIdleThreads = _idleThreads.count();
582                          for (Uint32 i = 0; i < numIdleThreads; i++)
583                          {
584                              // Do not dip below the minimum thread count
585                              if (_currentThreads.value() <= (Uint32)_minThreads)
586                              {
587                                  break;
588                              }
589                      
590                              Thread* thread = _idleThreads.remove_last();
591                      
592                              // If there are no more threads in the _idleThreads queue, we're done.
593                              if (thread == 0)
594                              {
595                                  break;
596                              }
597                      
598 kumpf           1.81         struct timeval* lastActivityTime;
599                              try
600                              {
601                                  lastActivityTime = (struct timeval *)thread->try_reference_tsd(
602                                      "last activity time");
603                                  PEGASUS_ASSERT(lastActivityTime != 0);
604                              }
605                              catch (...)
606                              {
607                                  PEGASUS_ASSERT(false);
608                                  _idleThreads.insert_last(thread);
609                                  break;
610                              }
611                      
612                              Boolean cleanupThisThread =
613                                  _timeIntervalExpired(lastActivityTime, &_deallocateWait);
614                              thread->dereference_tsd();
615                      
616                              if (cleanupThisThread)
617                              {
618                                  _cleanupThread(thread);
619 kumpf           1.81             _currentThreads--;
620                                  numThreadsCleanedUp++;
621                              }
622                              else
623                              {
624                                  _idleThreads.insert_first(thread);
625                              }
626 konrad.r        1.67     }
627 kumpf           1.81 
628                          PEG_METHOD_EXIT();
629                          return numThreadsCleanedUp;
630 konrad.r        1.67 }
631 mday            1.19 
632 kumpf           1.81 void ThreadPool::_cleanupThread(Thread* thread)
633 mday            1.19 {
634 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
635                      
636                          // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
637                          thread->delete_tsd("work func");
638                          thread->put_tsd(
639                              "work func", 0,
640                              sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
641                              (void *) 0);
642                          thread->delete_tsd("work parm");
643                          thread->put_tsd("work parm", 0, sizeof(void *), 0);
644                      
645                          // signal the thread's sleep semaphore to awaken it
646                          Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
647                          PEGASUS_ASSERT(sleep_sem != 0);
648                          sleep_sem->signal();
649                          thread->dereference_tsd();
650                      
651                          thread->join();
652                          delete thread;
653                      
654                          PEG_METHOD_EXIT();
655 mday            1.19 }
656                      
657 kumpf           1.81 Boolean ThreadPool::_timeIntervalExpired(
658                          struct timeval* start,
659                          struct timeval* interval)
660 mday            1.19 {
661 kumpf           1.81     // never time out if the interval is zero
662                          if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
663                          {
664                              return false;
665                          }
666                      
667                          struct timeval now, finish, remaining;
668                          Uint32 usec;
669                          pegasus_gettimeofday(&now);
670                          pegasus_gettimeofday(&remaining);    // Avoid valgrind error
671                      
672                          finish.tv_sec = start->tv_sec + interval->tv_sec;
673                          usec = start->tv_usec + interval->tv_usec;
674                          finish.tv_sec += (usec / 1000000);
675                          usec %= 1000000;
676                          finish.tv_usec = usec;
677                      
678                          return (timeval_subtract(&remaining, &finish, &now) != 0);
679 mday            1.19 }
680                      
681 kumpf           1.81 void ThreadPool::_deleteSemaphore(void *p)
682 mday            1.19 {
683 kumpf           1.81     delete (Semaphore *)p;
684 mday            1.19 }
685                      
686 kumpf           1.81 Thread* ThreadPool::_initializeThread()
687 mday            1.19 {
688 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
689                      
690                          Thread* th = (Thread *) new Thread(_loop, this, false);
691                      
692                          // allocate a sleep semaphore and pass it in the thread context
693                          // initial count is zero, loop function will sleep until
694                          // we signal the semaphore
695                          Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
696                          th->put_tsd(
697                              "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
698                      
699                          struct timeval* lastActivityTime =
700                              (struct timeval *) ::operator new(sizeof(struct timeval));
701                          pegasus_gettimeofday(lastActivityTime);
702                      
703                          th->put_tsd("last activity time", thread_data::default_delete,
704                              sizeof(struct timeval), (void *)lastActivityTime);
705                          // thread will enter _loop() and sleep on sleep_sem until we signal it
706                      
707 konrad.r        1.86     if (th->run() != PEGASUS_THREAD_OK)
708 kumpf           1.81     {
709 konrad.r        1.86 		Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
710                      			"Could not create thread. Error code is %d.", errno);
711 kumpf           1.81         delete th;
712                              return 0;
713                          }
714                          _currentThreads++;
715                          pegasus_yield();
716                      
717                          PEG_METHOD_EXIT();
718                          return th;
719 mday            1.19 }
720 mike            1.2  
721 kumpf           1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
722                      {
723                          if (th == 0)
724                          {
725                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
726                                  "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
727                              throw NullPointer();
728                          }
729                      
730                          try
731                          {
732                              _idleThreads.insert_first(th);
733                          }
734                          catch (...)
735                          {
736                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
737                                  "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
738                                      "failed.");
739                          }
740                      }
741 mike            1.2  
742                      PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2