(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.75 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 chip  1.11 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 kumpf 1.17 // 
 19 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27            //
 28            //==============================================================================
 29            //
 30            // Author: Mike Day (mdday@us.ibm.com)
 31            //
 32            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 33 chip  1.11 //              added nsk platform support
 34 kumpf 1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 35 a.arora 1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 36 gs.keenan 1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 37 mike      1.2  //
 38                //%/////////////////////////////////////////////////////////////////////////////
 39                
 40                #include "Thread.h"
 41 kumpf     1.68 #include <exception>
 42 mike      1.2  #include <Pegasus/Common/IPC.h>
 43 kumpf     1.14 #include <Pegasus/Common/Tracer.h>
 44 mike      1.2  
 45                #if defined(PEGASUS_OS_TYPE_WINDOWS)
 46 chip      1.11 # include "ThreadWindows.cpp"
 47 mike      1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 48                # include "ThreadUnix.cpp"
 49                #elif defined(PEGASUS_OS_TYPE_NSK)
 50                # include "ThreadNsk.cpp"
 51 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
 52                # include "ThreadVms.cpp"
 53 mike      1.2  #else
 54                # error "Unsupported platform"
 55                #endif
 56                
 57 kumpf     1.69 PEGASUS_USING_STD;
 58 mike      1.2  PEGASUS_NAMESPACE_BEGIN
 59                
 60 mday      1.42 
 61 chip      1.11 void thread_data::default_delete(void * data)
 62                {
 63 mike      1.2     if( data != NULL)
 64 chip      1.11       ::operator delete(data);
 65 mike      1.2  }
 66                
 67 chuck     1.43 // l10n start
 68                void language_delete(void * data)
 69                {
 70                   if( data != NULL)
 71                   {
 72 a.arora   1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
 73 chuck     1.43    }
 74                }
 75                // l10n end
 76                
 77 mike      1.2  Boolean Thread::_signals_blocked = false;
 78 chuck     1.37 // l10n
 79 marek     1.63 #ifndef PEGASUS_OS_ZOS
 80 w.otsuka  1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
 81 marek     1.63 #else
 82                PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 83                #endif
 84 chuck     1.37 Boolean Thread::_key_initialized = false;
 85 chuck     1.41 Boolean Thread::_key_error = false;
 86 chuck     1.37 
 87 mike      1.2  
 88                // for non-native implementations
 89 chip      1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 90 mike      1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 91                {
 92 a.arora   1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 93 a.arora   1.65     _cleanup.insert_first(cu.get());
 94 a.arora   1.64     cu.release();
 95 mike      1.2      return;
 96                }
 97 kumpf     1.81 
 98 mike      1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 99                {
100 a.arora   1.64     AutoPtr<cleanup_handler> cu ;
101 chip      1.11     try
102                    {
103 kumpf     1.81         cu.reset(_cleanup.remove_first());
104 mike      1.2      }
105 chip      1.11     catch(IPCException&)
106 mike      1.2      {
107 kumpf     1.81         PEGASUS_ASSERT(0);
108 mike      1.2       }
109                    if(execute == true)
110 kumpf     1.81         cu->execute();
111 mike      1.2  }
112 kumpf     1.81 
113 mike      1.2  #endif
114                
115                
116 kumpf     1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
117 mike      1.2  
118                
119 chip      1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
120                void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
121                {
122                    // execute the cleanup stack and then return
123 mike      1.2     while( _cleanup.count() )
124                   {
125 chip      1.11        try
126                       {
127 kumpf     1.81            cleanup_pop(true);
128 chip      1.11        }
129                       catch(IPCException&)
130                       {
131 kumpf     1.81           PEGASUS_ASSERT(0);
132                          break;
133 mike      1.2         }
134                   }
135                   _exit_code = exit_code;
136                   exit_thread(exit_code);
137 mday      1.4     _handle.thid = 0;
138 mike      1.2  }
139                
140                
141                #endif
142                
143 chuck     1.37 // l10n start
144 chuck     1.39 Sint8 Thread::initializeKey()
145                {
146 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
147                    if (!Thread::_key_initialized)
148                    {
149                        if (Thread::_key_error)
150                        {
151                            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
152                                "Thread: ERROR - thread key error");
153                            return -1;
154                        }
155                
156                        if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
157                        {
158                            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
159                                "Thread: able to create a thread key");
160                            Thread::_key_initialized = true;
161                        }
162                        else
163                        {
164                            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
165                                "Thread: ERROR - unable to create a thread key");
166                            Thread::_key_error = true;
167 kumpf     1.81             return -1;
168                        }
169                    }
170 chuck     1.39 
171 kumpf     1.81     PEG_METHOD_EXIT();
172                    return 0;
173 chuck     1.39 }
174                
175 chuck     1.37 Thread * Thread::getCurrent()
176                {
177 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
178 chuck     1.40     if (Thread::initializeKey() != 0)
179 chuck     1.39     {
180 kumpf     1.81         return NULL;
181 chuck     1.39     }
182 kumpf     1.81     PEG_METHOD_EXIT();
183                    return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
184 chuck     1.39 }
185                
186                void Thread::setCurrent(Thread * thrd)
187                {
188 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
189                    if (Thread::initializeKey() == 0)
190                    {
191                        if (pegasus_set_thread_specific(
192                               Thread::_platform_thread_key, (void *) thrd) == 0)
193 chuck     1.39         {
194 kumpf     1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
195                                "Successful set Thread * into thread specific storage");
196 chuck     1.39         }
197                        else
198                        {
199 kumpf     1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
200                                "ERROR: error setting Thread * into thread specific storage");
201 chuck     1.39         }
202 kumpf     1.81     }
203                    PEG_METHOD_EXIT();
204 chuck     1.37 }
205                
206                AcceptLanguages * Thread::getLanguages()
207                {
208 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
209                
210                    Thread * curThrd = Thread::getCurrent();
211                    if (curThrd == NULL)
212                        return NULL;
213                    AcceptLanguages * acceptLangs =
214                        (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
215                    curThrd->dereference_tsd();
216                    PEG_METHOD_EXIT();
217                    return acceptLangs;
218 chuck     1.37 }
219                
220                void Thread::setLanguages(AcceptLanguages *langs) //l10n
221                {
222 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
223                
224                    Thread* currentThrd = Thread::getCurrent();
225                    if (currentThrd != NULL)
226                    {
227                        // deletes the old tsd and creates a new one
228                        currentThrd->put_tsd("acceptLanguages",
229                            language_delete,
230                            sizeof(AcceptLanguages *),
231                            langs);
232                    }
233                
234                    PEG_METHOD_EXIT();
235 chuck     1.37 }
236                
237                void Thread::clearLanguages() //l10n
238                {
239 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
240                
241                    Thread * currentThrd = Thread::getCurrent();
242                    if (currentThrd != NULL)
243                    {
244                        // deletes the old tsd
245                        currentThrd->delete_tsd("acceptLanguages");
246                    }
247                
248                    PEG_METHOD_EXIT();
249 chuck     1.37 }
250 kumpf     1.81 // l10n end
251 chuck     1.37 
252 mday      1.52 
253 kumpf     1.81 ///////////////////////////////////////////////////////////////////////////////
254                //
255                // ThreadPool
256                //
257                ///////////////////////////////////////////////////////////////////////////////
258                
259                ThreadPool::ThreadPool(
260                    Sint16 initialSize,
261                    const char* key,
262                    Sint16 minThreads,
263                    Sint16 maxThreads,
264                    struct timeval& deallocateWait)
265                    : _maxThreads(maxThreads),
266                      _minThreads(minThreads),
267                      _currentThreads(0),
268                      _idleThreads(true),
269                      _runningThreads(true),
270                      _dying(0)
271 mday      1.58 {
272 kumpf     1.81     _deallocateWait.tv_sec = deallocateWait.tv_sec;
273                    _deallocateWait.tv_usec = deallocateWait.tv_usec;
274 mday      1.58 
275 kumpf     1.81     memset(_key, 0x00, 17);
276                    if (key != 0)
277                    {
278                        strncpy(_key, key, 16);
279                    }
280                
281                    if ((_maxThreads > 0) && (_maxThreads < initialSize))
282                    {
283                        _maxThreads = initialSize;
284                    }
285 mday      1.58 
286 kumpf     1.81     if (_minThreads > initialSize)
287                    {
288                        _minThreads = initialSize;
289                    }
290 mday      1.52 
291 kumpf     1.81     for (int i = 0; i < initialSize; i++)
292                    {
293                        _addToIdleThreadsQueue(_initializeThread());
294                    }
295                }
296 mday      1.20 
297 kumpf     1.81 ThreadPool::~ThreadPool()
298 mday      1.20 {
299 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
300                    try
301                    {
302                        // Set the dying flag so all thread know the destructor has been entered
303                        _dying++;
304                
305                        while (_currentThreads.value() > 0)
306                        {
307                            Thread* thread = _idleThreads.remove_first();
308                            if (thread != 0)
309                            {
310                                _cleanupThread(thread);
311                                _currentThreads--;
312                            }
313                            else
314                            {
315                                pegasus_yield();
316                            }
317                        }
318                    }
319                    catch (...)
320 kumpf     1.81     {
321                    }
322 mday      1.20 }
323                
324 kumpf     1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
325                {
326                    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
327                
328                    Thread* myself = (Thread *)parm;
329                    PEGASUS_ASSERT(myself != 0);
330 mday      1.20 
331 kumpf     1.81     // Set myself into thread specific storage
332                    // This will allow code to get its own Thread
333                    Thread::setCurrent(myself);
334 chip      1.11 
335 kumpf     1.81     ThreadPool* pool = (ThreadPool *)myself->get_parm();
336                    PEGASUS_ASSERT(pool != 0);
337 mike      1.2  
338 kumpf     1.81     Semaphore* sleep_sem = 0;
339                    struct timeval* lastActivityTime = 0;
340 chip      1.11 
341 kumpf     1.81     try
342                    {
343                        sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
344                        myself->dereference_tsd();
345                        PEGASUS_ASSERT(sleep_sem != 0);
346                
347                        lastActivityTime =
348                            (struct timeval *)myself->reference_tsd("last activity time");
349                        myself->dereference_tsd();
350                        PEGASUS_ASSERT(lastActivityTime != 0);
351                    }
352                    catch (...)
353                    {
354                        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
355                            "ThreadPool::_loop: Failure getting sleep_sem or "
356                                "lastActivityTime.");
357                        PEGASUS_ASSERT(false);
358                        pool->_idleThreads.remove(myself);
359                        pool->_currentThreads--;
360                        PEG_METHOD_EXIT();
361                        return((PEGASUS_THREAD_RETURN)1);
362 kumpf     1.81     }
363                
364                    while (1)
365                    {
366                        try
367                        {
368                            sleep_sem->wait();
369                        }
370                        catch (...)
371                        {
372                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
373                                "ThreadPool::_loop: failure on sleep_sem->wait().");
374                            PEGASUS_ASSERT(false);
375                            pool->_idleThreads.remove(myself);
376                            pool->_currentThreads--;
377                            PEG_METHOD_EXIT();
378                            return((PEGASUS_THREAD_RETURN)1);
379                        }
380 mike      1.2  
381 kumpf     1.81         // When we awaken we reside on the _runningThreads queue, not the
382                        // _idleThreads queue.
383 kumpf     1.14 
384 kumpf     1.81         PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
385                        void* parm = 0;
386                        Semaphore* blocking_sem = 0;
387 chuck     1.39 
388 kumpf     1.81         try
389                        {
390                            work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
391                                myself->reference_tsd("work func");
392                            myself->dereference_tsd();
393                            parm = myself->reference_tsd("work parm");
394                            myself->dereference_tsd();
395                            blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
396                            myself->dereference_tsd();
397                        }
398                        catch (...)
399                        {
400                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
401                                "ThreadPool::_loop: Failure accessing work func, work parm, "
402                                    "or blocking sem.");
403                            PEGASUS_ASSERT(false);
404                            pool->_idleThreads.remove(myself);
405                            pool->_currentThreads--;
406                            PEG_METHOD_EXIT();
407                            return((PEGASUS_THREAD_RETURN)1);
408                        }
409 mday      1.52 
410 kumpf     1.81         if (work == 0)
411                        {
412                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
413                                "ThreadPool::_loop: work func is 0, meaning we should exit.");
414                            break;
415                        }
416 mike      1.2  
417 kumpf     1.81         gettimeofday(lastActivityTime, NULL);
418 konrad.r  1.67 
419 kumpf     1.81         try
420                        {
421                            PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
422                            work(parm);
423                            PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
424                        }
425                        catch (Exception & e)
426                        {
427 konrad.r  1.67             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
428 kumpf     1.81                 String("Exception from work in ThreadPool::_loop: ") +
429                                    e.getMessage());
430                        }
431 kumpf     1.68 #if !defined(PEGASUS_OS_LSB)
432 kumpf     1.81         catch (exception& e)
433                        {
434 kumpf     1.68             PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
435 kumpf     1.81                 String("Exception from work in ThreadPool::_loop: ") +
436                                    e.what());
437                        }
438 kumpf     1.68 #endif
439 kumpf     1.81         catch (...)
440                        {
441                            PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
442                                "Unknown exception from work in ThreadPool::_loop.");
443                        }
444                
445                        // put myself back onto the available list
446                        try
447                        {
448                            gettimeofday(lastActivityTime, NULL);
449                            if (blocking_sem != 0)
450 kumpf     1.57             {
451 kumpf     1.81                 blocking_sem->signal();
452 kumpf     1.57             }
453 s.hills   1.49 
454 kumpf     1.81             Boolean removed = pool->_runningThreads.remove((void *)myself);
455                            PEGASUS_ASSERT(removed);
456 s.hills   1.49 
457 kumpf     1.81             pool->_idleThreads.insert_first(myself);
458                        }
459                        catch (...)
460                        {
461                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
462                                "ThreadPool::_loop: Adding thread to idle pool failed.");
463                            PEGASUS_ASSERT(false);
464                            pool->_currentThreads--;
465                            PEG_METHOD_EXIT();
466                            return((PEGASUS_THREAD_RETURN)1);
467                        }
468                    }
469 kumpf     1.14 
470 kumpf     1.81     PEG_METHOD_EXIT();
471                    return((PEGASUS_THREAD_RETURN)0);
472 mike      1.2  }
473                
474 kumpf     1.81 Boolean ThreadPool::allocate_and_awaken(
475                    void* parm,
476                    PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
477                    Semaphore* blocking)
478 mike      1.2  {
479 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
480                
481                    // Allocate_and_awaken will not run if the _dying flag is set.
482                    // Once the lock is acquired, ~ThreadPool will not change
483                    // the value of _dying until the lock is released.
484                
485                    try
486                    {
487                        if (_dying.value())
488                        {
489                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
490                                "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
491                            // ATTN: Error result has not yet been defined
492                            return true;
493                        }
494                        struct timeval start;
495                        gettimeofday(&start, NULL);
496                        Thread* th = 0;
497                
498                        th = _idleThreads.remove_first();
499 kumpf     1.57 
500 kumpf     1.81         if (th == 0)
501                        {
502                            if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
503                            {
504                                th = _initializeThread();
505                            }
506                        }
507                
508                        if (th == 0)
509                        {
510                            // ATTN-DME-P3-20031103: This trace message should not be
511                            // be labeled TRC_DISCARDED_DATA, because it does not
512                            // necessarily imply that a failure has occurred.  However,
513                            // this label is being used temporarily to help isolate
514                            // the cause of client timeout problems.
515 kumpf     1.60 
516 kumpf     1.81             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
517                                "ThreadPool::allocate_and_awaken: Insufficient resources: "
518                                    " pool = %s, running threads = %d, idle threads = %d",
519                                _key, _runningThreads.count(), _idleThreads.count());
520                            return false;
521                        }
522 mike      1.2  
523 kumpf     1.81         // initialize the thread data with the work function and parameters
524                        Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
525                            "Initializing thread with work function and parameters: parm = %p",
526                            parm);
527 mike      1.2  
528 kumpf     1.81         th->delete_tsd("work func");
529                        th->put_tsd("work func", NULL,
530 kumpf     1.70             sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
531 kumpf     1.81             (void *)work);
532                        th->delete_tsd("work parm");
533                        th->put_tsd("work parm", NULL, sizeof(void *), parm);
534                        th->delete_tsd("blocking sem");
535                        if (blocking != 0)
536                            th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
537                
538                        // put the thread on the running list
539                        _runningThreads.insert_first(th);
540                
541                        // signal the thread's sleep semaphore to awaken it
542                        Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
543                        PEGASUS_ASSERT(sleep_sem != 0);
544                
545                        Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
546                        sleep_sem->signal();
547                        th->dereference_tsd();
548 kumpf     1.57     }
549 mday      1.58     catch (...)
550 kumpf     1.57     {
551 kumpf     1.81         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
552                            "ThreadPool::allocate_and_awaken: Operation Failed.");
553                        PEG_METHOD_EXIT();
554                        // ATTN: Error result has not yet been defined
555                        return true;
556 kumpf     1.57     }
557 kumpf     1.81     PEG_METHOD_EXIT();
558                    return true;
559 mike      1.2  }
560                
561 kumpf     1.81 // caller is responsible for only calling this routine during slack periods
562                // but should call it at least once per _deallocateWait interval.
563 mday      1.12 
564 kumpf     1.81 Uint32 ThreadPool::cleanupIdleThreads()
565 mike      1.2  {
566 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
567                
568                    Uint32 numThreadsCleanedUp = 0;
569                
570                    Uint32 numIdleThreads = _idleThreads.count();
571                    for (Uint32 i = 0; i < numIdleThreads; i++)
572                    {
573                        // Do not dip below the minimum thread count
574                        if (_currentThreads.value() <= (Uint32)_minThreads)
575                        {
576                            break;
577                        }
578                
579                        Thread* thread = _idleThreads.remove_last();
580                
581                        // If there are no more threads in the _idleThreads queue, we're done.
582                        if (thread == 0)
583                        {
584                            break;
585                        }
586                
587 kumpf     1.81         struct timeval* lastActivityTime;
588                        try
589                        {
590                            lastActivityTime = (struct timeval *)thread->try_reference_tsd(
591                                "last activity time");
592                            PEGASUS_ASSERT(lastActivityTime != 0);
593                        }
594                        catch (...)
595                        {
596                            PEGASUS_ASSERT(false);
597                            _idleThreads.insert_last(thread);
598                            break;
599                        }
600                
601                        Boolean cleanupThisThread =
602                            _timeIntervalExpired(lastActivityTime, &_deallocateWait);
603                        thread->dereference_tsd();
604                
605                        if (cleanupThisThread)
606                        {
607                            _cleanupThread(thread);
608 kumpf     1.81             _currentThreads--;
609                            numThreadsCleanedUp++;
610                        }
611                        else
612                        {
613                            _idleThreads.insert_first(thread);
614                        }
615 konrad.r  1.67     }
616 kumpf     1.81 
617                    PEG_METHOD_EXIT();
618                    return numThreadsCleanedUp;
619 konrad.r  1.67 }
620 mday      1.19 
621 kumpf     1.81 void ThreadPool::_cleanupThread(Thread* thread)
622 mday      1.19 {
623 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
624                
625                    // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
626                    thread->delete_tsd("work func");
627                    thread->put_tsd(
628                        "work func", 0,
629                        sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
630                        (void *) 0);
631                    thread->delete_tsd("work parm");
632                    thread->put_tsd("work parm", 0, sizeof(void *), 0);
633                
634                    // signal the thread's sleep semaphore to awaken it
635                    Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
636                    PEGASUS_ASSERT(sleep_sem != 0);
637                    sleep_sem->signal();
638                    thread->dereference_tsd();
639                
640                    thread->join();
641                    delete thread;
642                
643                    PEG_METHOD_EXIT();
644 mday      1.19 }
645                
646 kumpf     1.81 Boolean ThreadPool::_timeIntervalExpired(
647                    struct timeval* start,
648                    struct timeval* interval)
649 mday      1.19 {
650 kumpf     1.81     // never time out if the interval is zero
651                    if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
652                    {
653                        return false;
654                    }
655                
656                    struct timeval now, finish, remaining;
657                    Uint32 usec;
658                    pegasus_gettimeofday(&now);
659                    pegasus_gettimeofday(&remaining);    // Avoid valgrind error
660                
661                    finish.tv_sec = start->tv_sec + interval->tv_sec;
662                    usec = start->tv_usec + interval->tv_usec;
663                    finish.tv_sec += (usec / 1000000);
664                    usec %= 1000000;
665                    finish.tv_usec = usec;
666                
667                    return (timeval_subtract(&remaining, &finish, &now) != 0);
668 mday      1.19 }
669                
670 kumpf     1.81 void ThreadPool::_deleteSemaphore(void *p)
671 mday      1.19 {
672 kumpf     1.81     delete (Semaphore *)p;
673 mday      1.19 }
674                
675 kumpf     1.81 Thread* ThreadPool::_initializeThread()
676 mday      1.19 {
677 kumpf     1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
678                
679                    Thread* th = (Thread *) new Thread(_loop, this, false);
680                
681                    // allocate a sleep semaphore and pass it in the thread context
682                    // initial count is zero, loop function will sleep until
683                    // we signal the semaphore
684                    Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
685                    th->put_tsd(
686                        "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
687                
688                    struct timeval* lastActivityTime =
689                        (struct timeval *) ::operator new(sizeof(struct timeval));
690                    pegasus_gettimeofday(lastActivityTime);
691                
692                    th->put_tsd("last activity time", thread_data::default_delete,
693                        sizeof(struct timeval), (void *)lastActivityTime);
694                    // thread will enter _loop() and sleep on sleep_sem until we signal it
695                
696                    if (!th->run())
697                    {
698 kumpf     1.81         delete th;
699                        return 0;
700                    }
701                    _currentThreads++;
702                    pegasus_yield();
703                
704                    PEG_METHOD_EXIT();
705                    return th;
706 mday      1.19 }
707 mike      1.2  
708 kumpf     1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
709                {
710                    if (th == 0)
711                    {
712                        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
713                            "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
714                        throw NullPointer();
715                    }
716                
717                    try
718                    {
719                        _idleThreads.insert_first(th);
720                    }
721                    catch (...)
722                    {
723                        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
724                            "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
725                                "failed.");
726                    }
727                }
728 mike      1.2  
729                PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2