(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.75 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 chip  1.11 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 david.dillard 1.83 //
 19 chip          1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike          1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21                    // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 chip          1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23                    // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24                    // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike          1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26                    // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //==============================================================================
 29                    //
 30                    // Author: Mike Day (mdday@us.ibm.com)
 31                    //
 32                    // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 33 chip          1.11 //              added nsk platform support
 34 kumpf         1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 35 a.arora       1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 36 gs.keenan     1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 37 david.dillard 1.83 //              David Dillard, VERITAS Software Corp.
 38                    //                  (david.dillard@veritas.com)
 39 mike          1.2  //
 40                    //%/////////////////////////////////////////////////////////////////////////////
 41                    
 42                    #include "Thread.h"
 43 kumpf         1.68 #include <exception>
 44 mike          1.2  #include <Pegasus/Common/IPC.h>
 45 kumpf         1.14 #include <Pegasus/Common/Tracer.h>
 46 mike          1.2  
 47                    #if defined(PEGASUS_OS_TYPE_WINDOWS)
 48 chip          1.11 # include "ThreadWindows.cpp"
 49 mike          1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 50                    # include "ThreadUnix.cpp"
 51                    #elif defined(PEGASUS_OS_TYPE_NSK)
 52                    # include "ThreadNsk.cpp"
 53 gs.keenan     1.76 #elif defined(PEGASUS_OS_VMS)
 54                    # include "ThreadVms.cpp"
 55 mike          1.2  #else
 56                    # error "Unsupported platform"
 57                    #endif
 58                    
 59 kumpf         1.69 PEGASUS_USING_STD;
 60 mike          1.2  PEGASUS_NAMESPACE_BEGIN
 61                    
 62 mday          1.42 
 63 chip          1.11 void thread_data::default_delete(void * data)
 64                    {
 65 mike          1.2     if( data != NULL)
 66 chip          1.11       ::operator delete(data);
 67 mike          1.2  }
 68                    
 69 chuck         1.43 // l10n start
 70                    void language_delete(void * data)
 71                    {
 72                       if( data != NULL)
 73                       {
 74 a.arora       1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
 75 chuck         1.43    }
 76                    }
 77                    // l10n end
 78                    
 79 mike          1.2  Boolean Thread::_signals_blocked = false;
 80 chuck         1.37 // l10n
 81 marek         1.63 #ifndef PEGASUS_OS_ZOS
 82 w.otsuka      1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
 83 marek         1.63 #else
 84                    PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 85                    #endif
 86 chuck         1.37 Boolean Thread::_key_initialized = false;
 87 chuck         1.41 Boolean Thread::_key_error = false;
 88 chuck         1.37 
 89 mike          1.2  
 90 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
 91 mike          1.2  {
 92 a.arora       1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 93 a.arora       1.65     _cleanup.insert_first(cu.get());
 94 a.arora       1.64     cu.release();
 95 mike          1.2      return;
 96                    }
 97 kumpf         1.81 
 98 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
 99 mike          1.2  {
100 david.dillard 1.83     AutoPtr<cleanup_handler> cu;
101 chip          1.11     try
102                        {
103 kumpf         1.81         cu.reset(_cleanup.remove_first());
104 mike          1.2      }
105 chip          1.11     catch(IPCException&)
106 mike          1.2      {
107 kumpf         1.81         PEGASUS_ASSERT(0);
108 mike          1.2       }
109                        if(execute == true)
110 kumpf         1.81         cu->execute();
111 mike          1.2  }
112 kumpf         1.81 
113 mike          1.2  
114 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
115 mike          1.2  
116                    
117 chip          1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
118                    void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
119                    {
120                        // execute the cleanup stack and then return
121 mike          1.2     while( _cleanup.count() )
122                       {
123 chip          1.11        try
124                           {
125 kumpf         1.81            cleanup_pop(true);
126 chip          1.11        }
127                           catch(IPCException&)
128                           {
129 kumpf         1.81           PEGASUS_ASSERT(0);
130                              break;
131 mike          1.2         }
132                       }
133                       _exit_code = exit_code;
134                       exit_thread(exit_code);
135 mday          1.4     _handle.thid = 0;
136 mike          1.2  }
137                    
138                    
139                    #endif
140                    
141 chuck         1.37 // l10n start
142 chuck         1.39 Sint8 Thread::initializeKey()
143                    {
144 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
145                        if (!Thread::_key_initialized)
146                        {
147                            if (Thread::_key_error)
148                            {
149                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
150                                    "Thread: ERROR - thread key error");
151                                return -1;
152                            }
153                    
154                            if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
155                            {
156                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157                                    "Thread: able to create a thread key");
158                                Thread::_key_initialized = true;
159                            }
160                            else
161                            {
162                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
163                                    "Thread: ERROR - unable to create a thread key");
164                                Thread::_key_error = true;
165 kumpf         1.81             return -1;
166                            }
167                        }
168 chuck         1.39 
169 kumpf         1.81     PEG_METHOD_EXIT();
170                        return 0;
171 chuck         1.39 }
172                    
173 chuck         1.37 Thread * Thread::getCurrent()
174                    {
175 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
176 chuck         1.40     if (Thread::initializeKey() != 0)
177 chuck         1.39     {
178 kumpf         1.81         return NULL;
179 chuck         1.39     }
180 kumpf         1.81     PEG_METHOD_EXIT();
181                        return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
182 chuck         1.39 }
183                    
184                    void Thread::setCurrent(Thread * thrd)
185                    {
186 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
187                        if (Thread::initializeKey() == 0)
188                        {
189                            if (pegasus_set_thread_specific(
190                                   Thread::_platform_thread_key, (void *) thrd) == 0)
191 chuck         1.39         {
192 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
193                                    "Successful set Thread * into thread specific storage");
194 chuck         1.39         }
195                            else
196                            {
197 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
198                                    "ERROR: error setting Thread * into thread specific storage");
199 chuck         1.39         }
200 kumpf         1.81     }
201                        PEG_METHOD_EXIT();
202 chuck         1.37 }
203                    
204                    AcceptLanguages * Thread::getLanguages()
205                    {
206 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
207                    
208                        Thread * curThrd = Thread::getCurrent();
209                        if (curThrd == NULL)
210                            return NULL;
211                        AcceptLanguages * acceptLangs =
212                            (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
213                        curThrd->dereference_tsd();
214                        PEG_METHOD_EXIT();
215                        return acceptLangs;
216 chuck         1.37 }
217                    
218                    void Thread::setLanguages(AcceptLanguages *langs) //l10n
219                    {
220 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
221                    
222                        Thread* currentThrd = Thread::getCurrent();
223                        if (currentThrd != NULL)
224                        {
225                            // deletes the old tsd and creates a new one
226                            currentThrd->put_tsd("acceptLanguages",
227                                language_delete,
228                                sizeof(AcceptLanguages *),
229                                langs);
230                        }
231                    
232                        PEG_METHOD_EXIT();
233 chuck         1.37 }
234                    
235                    void Thread::clearLanguages() //l10n
236                    {
237 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
238                    
239                        Thread * currentThrd = Thread::getCurrent();
240                        if (currentThrd != NULL)
241                        {
242                            // deletes the old tsd
243                            currentThrd->delete_tsd("acceptLanguages");
244                        }
245                    
246                        PEG_METHOD_EXIT();
247 chuck         1.37 }
248 kumpf         1.81 // l10n end
249 chuck         1.37 
250 mday          1.52 
251 kumpf         1.81 ///////////////////////////////////////////////////////////////////////////////
252                    //
253                    // ThreadPool
254                    //
255                    ///////////////////////////////////////////////////////////////////////////////
256                    
257                    ThreadPool::ThreadPool(
258                        Sint16 initialSize,
259                        const char* key,
260                        Sint16 minThreads,
261                        Sint16 maxThreads,
262                        struct timeval& deallocateWait)
263                        : _maxThreads(maxThreads),
264                          _minThreads(minThreads),
265                          _currentThreads(0),
266                          _idleThreads(true),
267                          _runningThreads(true),
268                          _dying(0)
269 mday          1.58 {
270 kumpf         1.81     _deallocateWait.tv_sec = deallocateWait.tv_sec;
271                        _deallocateWait.tv_usec = deallocateWait.tv_usec;
272 mday          1.58 
273 kumpf         1.81     memset(_key, 0x00, 17);
274                        if (key != 0)
275                        {
276                            strncpy(_key, key, 16);
277                        }
278                    
279                        if ((_maxThreads > 0) && (_maxThreads < initialSize))
280                        {
281                            _maxThreads = initialSize;
282                        }
283 mday          1.58 
284 kumpf         1.81     if (_minThreads > initialSize)
285                        {
286                            _minThreads = initialSize;
287                        }
288 mday          1.52 
289 kumpf         1.81     for (int i = 0; i < initialSize; i++)
290                        {
291                            _addToIdleThreadsQueue(_initializeThread());
292                        }
293                    }
294 mday          1.20 
295 kumpf         1.81 ThreadPool::~ThreadPool()
296 mday          1.20 {
297 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
298 konrad.r      1.86 
299 kumpf         1.81     try
300                        {
301                            // Set the dying flag so all thread know the destructor has been entered
302                            _dying++;
303 konrad.r      1.86        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
304                    		"Cleaning up %d idle threads. ", _currentThreads.value());
305 kumpf         1.81         while (_currentThreads.value() > 0)
306                            {
307                                Thread* thread = _idleThreads.remove_first();
308                                if (thread != 0)
309                                {
310                                    _cleanupThread(thread);
311                                    _currentThreads--;
312                                }
313                                else
314                                {
315                                    pegasus_yield();
316                                }
317                            }
318                        }
319                        catch (...)
320                        {
321                        }
322 mday          1.20 }
323                    
324 kumpf         1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
325                    {
326                        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
327                    
328                        try
329                        {
330 kumpf         1.82         Thread* myself = (Thread *)parm;
331                            PEGASUS_ASSERT(myself != 0);
332 kumpf         1.81 
333 kumpf         1.82         // Set myself into thread specific storage
334                            // This will allow code to get its own Thread
335                            Thread::setCurrent(myself);
336 kumpf         1.81 
337 kumpf         1.82         ThreadPool* pool = (ThreadPool *)myself->get_parm();
338                            PEGASUS_ASSERT(pool != 0);
339 mike          1.2  
340 kumpf         1.82         Semaphore* sleep_sem = 0;
341                            struct timeval* lastActivityTime = 0;
342 chuck         1.39 
343 kumpf         1.81         try
344                            {
345 kumpf         1.82             sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
346 kumpf         1.81             myself->dereference_tsd();
347 kumpf         1.82             PEGASUS_ASSERT(sleep_sem != 0);
348                    
349                                lastActivityTime =
350                                    (struct timeval *)myself->reference_tsd("last activity time");
351 kumpf         1.81             myself->dereference_tsd();
352 kumpf         1.82             PEGASUS_ASSERT(lastActivityTime != 0);
353 kumpf         1.81         }
354                            catch (...)
355                            {
356                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
357 kumpf         1.82                 "ThreadPool::_loop: Failure getting sleep_sem or "
358                                        "lastActivityTime.");
359 kumpf         1.81             PEGASUS_ASSERT(false);
360                                pool->_idleThreads.remove(myself);
361                                pool->_currentThreads--;
362                                PEG_METHOD_EXIT();
363                                return((PEGASUS_THREAD_RETURN)1);
364                            }
365 mday          1.52 
366 kumpf         1.82         while (1)
367 kumpf         1.81         {
368 kumpf         1.82             try
369                                {
370                                    sleep_sem->wait();
371                                }
372                                catch (...)
373                                {
374                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
375                                        "ThreadPool::_loop: failure on sleep_sem->wait().");
376                                    PEGASUS_ASSERT(false);
377                                    pool->_idleThreads.remove(myself);
378                                    pool->_currentThreads--;
379                                    PEG_METHOD_EXIT();
380                                    return((PEGASUS_THREAD_RETURN)1);
381                                }
382                    
383                                // When we awaken we reside on the _runningThreads queue, not the
384                                // _idleThreads queue.
385                    
386                                PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
387                                void* parm = 0;
388                                Semaphore* blocking_sem = 0;
389 kumpf         1.82 
390                                try
391                                {
392                                    work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
393                                        myself->reference_tsd("work func");
394                                    myself->dereference_tsd();
395                                    parm = myself->reference_tsd("work parm");
396                                    myself->dereference_tsd();
397                                    blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
398                                    myself->dereference_tsd();
399                                }
400                                catch (...)
401                                {
402                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
403                                        "ThreadPool::_loop: Failure accessing work func, work parm, "
404                                            "or blocking sem.");
405                                    PEGASUS_ASSERT(false);
406                                    pool->_idleThreads.remove(myself);
407                                    pool->_currentThreads--;
408                                    PEG_METHOD_EXIT();
409                                    return((PEGASUS_THREAD_RETURN)1);
410 kumpf         1.82             }
411                    
412                                if (work == 0)
413                                {
414 carolann.graves 1.84                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
415 kumpf           1.82                     "ThreadPool::_loop: work func is 0, meaning we should exit.");
416                                      break;
417                                  }
418 mike            1.2  
419 kumpf           1.82             gettimeofday(lastActivityTime, NULL);
420 konrad.r        1.67 
421 kumpf           1.82             try
422                                  {
423                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
424                                      work(parm);
425                                      PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
426                                  }
427                                  catch (Exception & e)
428                                  {
429                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
430                                          String("Exception from work in ThreadPool::_loop: ") +
431                                              e.getMessage());
432                                  }
433 kumpf           1.68 #if !defined(PEGASUS_OS_LSB)
434 konrad.r        1.86             catch (const exception& e)
435 kumpf           1.82             {
436                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
437                                          String("Exception from work in ThreadPool::_loop: ") +
438                                              e.what());
439                                  }
440 kumpf           1.68 #endif
441 kumpf           1.82             catch (...)
442                                  {
443                                      PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
444                                          "Unknown exception from work in ThreadPool::_loop.");
445                                  }
446 kumpf           1.81 
447 kumpf           1.82             // put myself back onto the available list
448                                  try
449 kumpf           1.57             {
450 kumpf           1.82                 gettimeofday(lastActivityTime, NULL);
451                                      if (blocking_sem != 0)
452                                      {
453                                          blocking_sem->signal();
454                                      }
455 s.hills         1.49 
456 kumpf           1.82                 Boolean removed = pool->_runningThreads.remove((void *)myself);
457                                      PEGASUS_ASSERT(removed);
458 s.hills         1.49 
459 kumpf           1.82                 pool->_idleThreads.insert_first(myself);
460                                  }
461                                  catch (...)
462                                  {
463                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
464                                          "ThreadPool::_loop: Adding thread to idle pool failed.");
465                                      PEGASUS_ASSERT(false);
466                                      pool->_currentThreads--;
467                                      PEG_METHOD_EXIT();
468                                      return((PEGASUS_THREAD_RETURN)1);
469                                  }
470 kumpf           1.81         }
471 kumpf           1.82     }
472                          catch (const Exception& e)
473                          {
474                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
475                                  "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
476                          }
477                          catch (...)
478                          {
479                              PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
480                                  "Caught unrecognized exception.  Exiting _loop.");
481 kumpf           1.81     }
482 kumpf           1.14 
483 kumpf           1.81     PEG_METHOD_EXIT();
484                          return((PEGASUS_THREAD_RETURN)0);
485 mike            1.2  }
486                      
487 konrad.r        1.86 ThreadStatus ThreadPool::allocate_and_awaken(
488 kumpf           1.81     void* parm,
489                          PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
490                          Semaphore* blocking)
491 mike            1.2  {
492 kumpf           1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
493                      
494                          // Allocate_and_awaken will not run if the _dying flag is set.
495                          // Once the lock is acquired, ~ThreadPool will not change
496                          // the value of _dying until the lock is released.
497                      
498                          try
499                          {
500                              if (_dying.value())
501                              {
502                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503                                      "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
504 konrad.r        1.86             return PEGASUS_THREAD_UNAVAILABLE;
505 kumpf           1.81         }
506                              struct timeval start;
507                              gettimeofday(&start, NULL);
508                              Thread* th = 0;
509                      
510                              th = _idleThreads.remove_first();
511 kumpf           1.57 
512 kumpf           1.81         if (th == 0)
513                              {
514 mike            1.86.8.1             if ((_maxThreads == 0) || 
515                          		(_currentThreads.value() < Uint32(_maxThreads)))
516 kumpf           1.81                 {
517                                          th = _initializeThread();
518                                      }
519                                  }
520                          
521                                  if (th == 0)
522                                  {
523                                      // ATTN-DME-P3-20031103: This trace message should not be
524                                      // be labeled TRC_DISCARDED_DATA, because it does not
525                                      // necessarily imply that a failure has occurred.  However,
526                                      // this label is being used temporarily to help isolate
527                                      // the cause of client timeout problems.
528                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
529                                          "ThreadPool::allocate_and_awaken: Insufficient resources: "
530                                              " pool = %s, running threads = %d, idle threads = %d",
531                                          _key, _runningThreads.count(), _idleThreads.count());
532 konrad.r        1.86                 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
533 kumpf           1.81             }
534 mike            1.2      
535 kumpf           1.81             // initialize the thread data with the work function and parameters
536                                  Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
537                                      "Initializing thread with work function and parameters: parm = %p",
538                                      parm);
539 mike            1.2      
540 kumpf           1.81             th->delete_tsd("work func");
541                                  th->put_tsd("work func", NULL,
542 kumpf           1.70                 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
543 kumpf           1.81                 (void *)work);
544                                  th->delete_tsd("work parm");
545                                  th->put_tsd("work parm", NULL, sizeof(void *), parm);
546                                  th->delete_tsd("blocking sem");
547                                  if (blocking != 0)
548                                      th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
549                          
550                                  // put the thread on the running list
551                                  _runningThreads.insert_first(th);
552                          
553                                  // signal the thread's sleep semaphore to awaken it
554                                  Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
555                                  PEGASUS_ASSERT(sleep_sem != 0);
556                          
557                                  Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
558                                  sleep_sem->signal();
559                                  th->dereference_tsd();
560 kumpf           1.57         }
561 mday            1.58         catch (...)
562 kumpf           1.57         {
563 kumpf           1.81             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
564                                      "ThreadPool::allocate_and_awaken: Operation Failed.");
565                                  PEG_METHOD_EXIT();
566                                  // ATTN: Error result has not yet been defined
567 konrad.r        1.86             return PEGASUS_THREAD_SETUP_FAILURE;
568 kumpf           1.57         }
569 kumpf           1.81         PEG_METHOD_EXIT();
570 konrad.r        1.86         return PEGASUS_THREAD_OK;
571 mike            1.2      }
572                          
573 kumpf           1.81     // caller is responsible for only calling this routine during slack periods
574                          // but should call it at least once per _deallocateWait interval.
575 mday            1.12     
576 kumpf           1.81     Uint32 ThreadPool::cleanupIdleThreads()
577 mike            1.2      {
578 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
579                          
580                              Uint32 numThreadsCleanedUp = 0;
581                          
582                              Uint32 numIdleThreads = _idleThreads.count();
583                              for (Uint32 i = 0; i < numIdleThreads; i++)
584                              {
585                                  // Do not dip below the minimum thread count
586                                  if (_currentThreads.value() <= (Uint32)_minThreads)
587                                  {
588                                      break;
589                                  }
590                          
591                                  Thread* thread = _idleThreads.remove_last();
592                          
593                                  // If there are no more threads in the _idleThreads queue, we're done.
594                                  if (thread == 0)
595                                  {
596                                      break;
597                                  }
598                          
599 kumpf           1.81             struct timeval* lastActivityTime;
600                                  try
601                                  {
602                                      lastActivityTime = (struct timeval *)thread->try_reference_tsd(
603                                          "last activity time");
604                                      PEGASUS_ASSERT(lastActivityTime != 0);
605                                  }
606                                  catch (...)
607                                  {
608                                      PEGASUS_ASSERT(false);
609                                      _idleThreads.insert_last(thread);
610                                      break;
611                                  }
612                          
613                                  Boolean cleanupThisThread =
614                                      _timeIntervalExpired(lastActivityTime, &_deallocateWait);
615                                  thread->dereference_tsd();
616                          
617                                  if (cleanupThisThread)
618                                  {
619                                      _cleanupThread(thread);
620 kumpf           1.81                 _currentThreads--;
621                                      numThreadsCleanedUp++;
622                                  }
623                                  else
624                                  {
625                                      _idleThreads.insert_first(thread);
626                                  }
627 konrad.r        1.67         }
628 kumpf           1.81     
629                              PEG_METHOD_EXIT();
630                              return numThreadsCleanedUp;
631 konrad.r        1.67     }
632 mday            1.19     
633 kumpf           1.81     void ThreadPool::_cleanupThread(Thread* thread)
634 mday            1.19     {
635 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
636                          
637                              // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
638                              thread->delete_tsd("work func");
639                              thread->put_tsd(
640                                  "work func", 0,
641                                  sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
642                                  (void *) 0);
643                              thread->delete_tsd("work parm");
644                              thread->put_tsd("work parm", 0, sizeof(void *), 0);
645                          
646                              // signal the thread's sleep semaphore to awaken it
647                              Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
648                              PEGASUS_ASSERT(sleep_sem != 0);
649                              sleep_sem->signal();
650                              thread->dereference_tsd();
651                          
652                              thread->join();
653                              delete thread;
654                          
655                              PEG_METHOD_EXIT();
656 mday            1.19     }
657                          
658 kumpf           1.81     Boolean ThreadPool::_timeIntervalExpired(
659                              struct timeval* start,
660                              struct timeval* interval)
661 mday            1.19     {
662 kumpf           1.81         // never time out if the interval is zero
663                              if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
664                              {
665                                  return false;
666                              }
667                          
668                              struct timeval now, finish, remaining;
669                              Uint32 usec;
670                              pegasus_gettimeofday(&now);
671                              pegasus_gettimeofday(&remaining);    // Avoid valgrind error
672                          
673                              finish.tv_sec = start->tv_sec + interval->tv_sec;
674                              usec = start->tv_usec + interval->tv_usec;
675                              finish.tv_sec += (usec / 1000000);
676                              usec %= 1000000;
677                              finish.tv_usec = usec;
678                          
679                              return (timeval_subtract(&remaining, &finish, &now) != 0);
680 mday            1.19     }
681                          
682 kumpf           1.81     void ThreadPool::_deleteSemaphore(void *p)
683 mday            1.19     {
684 kumpf           1.81         delete (Semaphore *)p;
685 mday            1.19     }
686                          
687 kumpf           1.81     Thread* ThreadPool::_initializeThread()
688 mday            1.19     {
689 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
690                          
691                              Thread* th = (Thread *) new Thread(_loop, this, false);
692                          
693                              // allocate a sleep semaphore and pass it in the thread context
694                              // initial count is zero, loop function will sleep until
695                              // we signal the semaphore
696                              Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
697                              th->put_tsd(
698                                  "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
699                          
700                              struct timeval* lastActivityTime =
701                                  (struct timeval *) ::operator new(sizeof(struct timeval));
702                              pegasus_gettimeofday(lastActivityTime);
703                          
704                              th->put_tsd("last activity time", thread_data::default_delete,
705                                  sizeof(struct timeval), (void *)lastActivityTime);
706                              // thread will enter _loop() and sleep on sleep_sem until we signal it
707                          
708 konrad.r        1.86         if (th->run() != PEGASUS_THREAD_OK)
709 kumpf           1.81         {
710 konrad.r        1.86     		Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
711                          			"Could not create thread. Error code is %d.", errno);
712 kumpf           1.81             delete th;
713                                  return 0;
714                              }
715                              _currentThreads++;
716                              pegasus_yield();
717                          
718                              PEG_METHOD_EXIT();
719                              return th;
720 mday            1.19     }
721 mike            1.2      
722 kumpf           1.81     void ThreadPool::_addToIdleThreadsQueue(Thread* th)
723                          {
724                              if (th == 0)
725                              {
726                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
727                                      "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
728                                  throw NullPointer();
729                              }
730                          
731                              try
732                              {
733                                  _idleThreads.insert_first(th);
734                              }
735                              catch (...)
736                              {
737                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
738                                      "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
739                                          "failed.");
740                              }
741                          }
742 mike            1.2      
743                          PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2