(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.75 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 chip  1.11 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 david.dillard 1.83 //
 19 chip          1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike          1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21                    // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 chip          1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23                    // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24                    // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike          1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26                    // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //==============================================================================
 29                    //
 30                    // Author: Mike Day (mdday@us.ibm.com)
 31                    //
 32                    // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 33 chip          1.11 //              added nsk platform support
 34 kumpf         1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 35 a.arora       1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 36 gs.keenan     1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 37 david.dillard 1.83 //              David Dillard, VERITAS Software Corp.
 38                    //                  (david.dillard@veritas.com)
 39 mike          1.2  //
 40                    //%/////////////////////////////////////////////////////////////////////////////
 41                    
 42                    #include "Thread.h"
 43 kumpf         1.68 #include <exception>
 44 mike          1.2  #include <Pegasus/Common/IPC.h>
 45 kumpf         1.14 #include <Pegasus/Common/Tracer.h>
 46 mike          1.2  
 47                    #if defined(PEGASUS_OS_TYPE_WINDOWS)
 48 chip          1.11 # include "ThreadWindows.cpp"
 49 mike          1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 50                    # include "ThreadUnix.cpp"
 51                    #elif defined(PEGASUS_OS_TYPE_NSK)
 52                    # include "ThreadNsk.cpp"
 53 gs.keenan     1.76 #elif defined(PEGASUS_OS_VMS)
 54                    # include "ThreadVms.cpp"
 55 mike          1.2  #else
 56                    # error "Unsupported platform"
 57                    #endif
 58                    
 59 kumpf         1.69 PEGASUS_USING_STD;
 60 mike          1.2  PEGASUS_NAMESPACE_BEGIN
 61                    
 62 mday          1.42 
 63 chip          1.11 void thread_data::default_delete(void * data)
 64                    {
 65 mike          1.2     if( data != NULL)
 66 chip          1.11       ::operator delete(data);
 67 mike          1.2  }
 68                    
 69 chuck         1.43 // l10n start
 70                    void language_delete(void * data)
 71                    {
 72                       if( data != NULL)
 73                       {
 74 a.arora       1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
 75 chuck         1.43    }
 76                    }
 77                    // l10n end
 78                    
 79 mike          1.2  Boolean Thread::_signals_blocked = false;
 80 chuck         1.37 // l10n
 81 marek         1.63 #ifndef PEGASUS_OS_ZOS
 82 w.otsuka      1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
 83 marek         1.63 #else
 84                    PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 85                    #endif
 86 chuck         1.37 Boolean Thread::_key_initialized = false;
 87 chuck         1.41 Boolean Thread::_key_error = false;
 88 chuck         1.37 
 89 mike          1.2  
 90                    // for non-native implementations
 91 chip          1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 92 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
 93 mike          1.2  {
 94 a.arora       1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 95 a.arora       1.65     _cleanup.insert_first(cu.get());
 96 a.arora       1.64     cu.release();
 97 mike          1.2      return;
 98                    }
 99 kumpf         1.81 
100 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
101 mike          1.2  {
102 david.dillard 1.83     AutoPtr<cleanup_handler> cu;
103 chip          1.11     try
104                        {
105 kumpf         1.81         cu.reset(_cleanup.remove_first());
106 mike          1.2      }
107 chip          1.11     catch(IPCException&)
108 mike          1.2      {
109 kumpf         1.81         PEGASUS_ASSERT(0);
110 mike          1.2       }
111                        if(execute == true)
112 kumpf         1.81         cu->execute();
113 mike          1.2  }
114 kumpf         1.81 
115 mike          1.2  #endif
116                    
117                    
118 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
119 mike          1.2  
120                    
121 chip          1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
122                    void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
123                    {
124                        // execute the cleanup stack and then return
125 mike          1.2     while( _cleanup.count() )
126                       {
127 chip          1.11        try
128                           {
129 kumpf         1.81            cleanup_pop(true);
130 chip          1.11        }
131                           catch(IPCException&)
132                           {
133 kumpf         1.81           PEGASUS_ASSERT(0);
134                              break;
135 mike          1.2         }
136                       }
137                       _exit_code = exit_code;
138                       exit_thread(exit_code);
139 mday          1.4     _handle.thid = 0;
140 mike          1.2  }
141                    
142                    
143                    #endif
144                    
145 chuck         1.37 // l10n start
146 chuck         1.39 Sint8 Thread::initializeKey()
147                    {
148 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
149                        if (!Thread::_key_initialized)
150                        {
151                            if (Thread::_key_error)
152                            {
153                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
154                                    "Thread: ERROR - thread key error");
155                                return -1;
156                            }
157                    
158                            if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
159                            {
160                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
161                                    "Thread: able to create a thread key");
162                                Thread::_key_initialized = true;
163                            }
164                            else
165                            {
166                                Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
167                                    "Thread: ERROR - unable to create a thread key");
168                                Thread::_key_error = true;
169 kumpf         1.81             return -1;
170                            }
171                        }
172 chuck         1.39 
173 kumpf         1.81     PEG_METHOD_EXIT();
174                        return 0;
175 chuck         1.39 }
176                    
177 chuck         1.37 Thread * Thread::getCurrent()
178                    {
179 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
180 chuck         1.40     if (Thread::initializeKey() != 0)
181 chuck         1.39     {
182 kumpf         1.81         return NULL;
183 chuck         1.39     }
184 kumpf         1.81     PEG_METHOD_EXIT();
185                        return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
186 chuck         1.39 }
187                    
188                    void Thread::setCurrent(Thread * thrd)
189                    {
190 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
191                        if (Thread::initializeKey() == 0)
192                        {
193                            if (pegasus_set_thread_specific(
194                                   Thread::_platform_thread_key, (void *) thrd) == 0)
195 chuck         1.39         {
196 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
197                                    "Successful set Thread * into thread specific storage");
198 chuck         1.39         }
199                            else
200                            {
201 kumpf         1.81             Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
202                                    "ERROR: error setting Thread * into thread specific storage");
203 chuck         1.39         }
204 kumpf         1.81     }
205                        PEG_METHOD_EXIT();
206 chuck         1.37 }
207                    
208                    AcceptLanguages * Thread::getLanguages()
209                    {
210 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
211                    
212                        Thread * curThrd = Thread::getCurrent();
213                        if (curThrd == NULL)
214                            return NULL;
215                        AcceptLanguages * acceptLangs =
216                            (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
217                        curThrd->dereference_tsd();
218                        PEG_METHOD_EXIT();
219                        return acceptLangs;
220 chuck         1.37 }
221                    
222                    void Thread::setLanguages(AcceptLanguages *langs) //l10n
223                    {
224 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
225                    
226                        Thread* currentThrd = Thread::getCurrent();
227                        if (currentThrd != NULL)
228                        {
229                            // deletes the old tsd and creates a new one
230                            currentThrd->put_tsd("acceptLanguages",
231                                language_delete,
232                                sizeof(AcceptLanguages *),
233                                langs);
234                        }
235                    
236                        PEG_METHOD_EXIT();
237 chuck         1.37 }
238                    
239                    void Thread::clearLanguages() //l10n
240                    {
241 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
242                    
243                        Thread * currentThrd = Thread::getCurrent();
244                        if (currentThrd != NULL)
245                        {
246                            // deletes the old tsd
247                            currentThrd->delete_tsd("acceptLanguages");
248                        }
249                    
250                        PEG_METHOD_EXIT();
251 chuck         1.37 }
252 kumpf         1.81 // l10n end
253 chuck         1.37 
254 mday          1.52 
255 kumpf         1.81 ///////////////////////////////////////////////////////////////////////////////
256                    //
257                    // ThreadPool
258                    //
259                    ///////////////////////////////////////////////////////////////////////////////
260                    
261                    ThreadPool::ThreadPool(
262                        Sint16 initialSize,
263                        const char* key,
264                        Sint16 minThreads,
265                        Sint16 maxThreads,
266                        struct timeval& deallocateWait)
267                        : _maxThreads(maxThreads),
268                          _minThreads(minThreads),
269                          _currentThreads(0),
270                          _idleThreads(true),
271                          _runningThreads(true),
272                          _dying(0)
273 mday          1.58 {
274 kumpf         1.81     _deallocateWait.tv_sec = deallocateWait.tv_sec;
275                        _deallocateWait.tv_usec = deallocateWait.tv_usec;
276 mday          1.58 
277 kumpf         1.81     memset(_key, 0x00, 17);
278                        if (key != 0)
279                        {
280                            strncpy(_key, key, 16);
281                        }
282                    
283                        if ((_maxThreads > 0) && (_maxThreads < initialSize))
284                        {
285                            _maxThreads = initialSize;
286                        }
287 mday          1.58 
288 kumpf         1.81     if (_minThreads > initialSize)
289                        {
290                            _minThreads = initialSize;
291                        }
292 mday          1.52 
293 kumpf         1.81     for (int i = 0; i < initialSize; i++)
294                        {
295                            _addToIdleThreadsQueue(_initializeThread());
296                        }
297                    }
298 mday          1.20 
299 kumpf         1.81 ThreadPool::~ThreadPool()
300 mday          1.20 {
301 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
302                        try
303                        {
304                            // Set the dying flag so all thread know the destructor has been entered
305                            _dying++;
306                    
307                            while (_currentThreads.value() > 0)
308                            {
309                                Thread* thread = _idleThreads.remove_first();
310                                if (thread != 0)
311                                {
312                                    _cleanupThread(thread);
313                                    _currentThreads--;
314                                }
315                                else
316                                {
317                                    pegasus_yield();
318                                }
319                            }
320                        }
321                        catch (...)
322 kumpf         1.81     {
323                        }
324 mday          1.20 }
325                    
326 kumpf         1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
327                    {
328                        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
329                    
330                        try
331                        {
332 kumpf         1.82         Thread* myself = (Thread *)parm;
333                            PEGASUS_ASSERT(myself != 0);
334 kumpf         1.81 
335 kumpf         1.82         // Set myself into thread specific storage
336                            // This will allow code to get its own Thread
337                            Thread::setCurrent(myself);
338 kumpf         1.81 
339 kumpf         1.82         ThreadPool* pool = (ThreadPool *)myself->get_parm();
340                            PEGASUS_ASSERT(pool != 0);
341 mike          1.2  
342 kumpf         1.82         Semaphore* sleep_sem = 0;
343                            struct timeval* lastActivityTime = 0;
344 chuck         1.39 
345 kumpf         1.81         try
346                            {
347 kumpf         1.82             sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
348 kumpf         1.81             myself->dereference_tsd();
349 kumpf         1.82             PEGASUS_ASSERT(sleep_sem != 0);
350                    
351                                lastActivityTime =
352                                    (struct timeval *)myself->reference_tsd("last activity time");
353 kumpf         1.81             myself->dereference_tsd();
354 kumpf         1.82             PEGASUS_ASSERT(lastActivityTime != 0);
355 kumpf         1.81         }
356                            catch (...)
357                            {
358                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
359 kumpf         1.82                 "ThreadPool::_loop: Failure getting sleep_sem or "
360                                        "lastActivityTime.");
361 kumpf         1.81             PEGASUS_ASSERT(false);
362                                pool->_idleThreads.remove(myself);
363                                pool->_currentThreads--;
364                                PEG_METHOD_EXIT();
365                                return((PEGASUS_THREAD_RETURN)1);
366                            }
367 mday          1.52 
368 kumpf         1.82         while (1)
369 kumpf         1.81         {
370 kumpf         1.82             try
371                                {
372                                    sleep_sem->wait();
373                                }
374                                catch (...)
375                                {
376                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
377                                        "ThreadPool::_loop: failure on sleep_sem->wait().");
378                                    PEGASUS_ASSERT(false);
379                                    pool->_idleThreads.remove(myself);
380                                    pool->_currentThreads--;
381                                    PEG_METHOD_EXIT();
382                                    return((PEGASUS_THREAD_RETURN)1);
383                                }
384                    
385                                // When we awaken we reside on the _runningThreads queue, not the
386                                // _idleThreads queue.
387                    
388                                PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
389                                void* parm = 0;
390                                Semaphore* blocking_sem = 0;
391 kumpf         1.82 
392                                try
393                                {
394                                    work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
395                                        myself->reference_tsd("work func");
396                                    myself->dereference_tsd();
397                                    parm = myself->reference_tsd("work parm");
398                                    myself->dereference_tsd();
399                                    blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
400                                    myself->dereference_tsd();
401                                }
402                                catch (...)
403                                {
404                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
405                                        "ThreadPool::_loop: Failure accessing work func, work parm, "
406                                            "or blocking sem.");
407                                    PEGASUS_ASSERT(false);
408                                    pool->_idleThreads.remove(myself);
409                                    pool->_currentThreads--;
410                                    PEG_METHOD_EXIT();
411                                    return((PEGASUS_THREAD_RETURN)1);
412 kumpf         1.82             }
413                    
414                                if (work == 0)
415                                {
416                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
417                                        "ThreadPool::_loop: work func is 0, meaning we should exit.");
418                                    break;
419                                }
420 mike          1.2  
421 kumpf         1.82             gettimeofday(lastActivityTime, NULL);
422 konrad.r      1.67 
423 kumpf         1.82             try
424                                {
425                                    PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
426                                    work(parm);
427                                    PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
428                                }
429                                catch (Exception & e)
430                                {
431                                    PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
432                                        String("Exception from work in ThreadPool::_loop: ") +
433                                            e.getMessage());
434                                }
435 kumpf         1.68 #if !defined(PEGASUS_OS_LSB)
436 kumpf         1.82             catch (exception& e)
437                                {
438                                    PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
439                                        String("Exception from work in ThreadPool::_loop: ") +
440                                            e.what());
441                                }
442 kumpf         1.68 #endif
443 kumpf         1.82             catch (...)
444                                {
445                                    PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
446                                        "Unknown exception from work in ThreadPool::_loop.");
447                                }
448 kumpf         1.81 
449 kumpf         1.82             // put myself back onto the available list
450                                try
451 kumpf         1.57             {
452 kumpf         1.82                 gettimeofday(lastActivityTime, NULL);
453                                    if (blocking_sem != 0)
454                                    {
455                                        blocking_sem->signal();
456                                    }
457 s.hills       1.49 
458 kumpf         1.82                 Boolean removed = pool->_runningThreads.remove((void *)myself);
459                                    PEGASUS_ASSERT(removed);
460 s.hills       1.49 
461 kumpf         1.82                 pool->_idleThreads.insert_first(myself);
462                                }
463                                catch (...)
464                                {
465                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
466                                        "ThreadPool::_loop: Adding thread to idle pool failed.");
467                                    PEGASUS_ASSERT(false);
468                                    pool->_currentThreads--;
469                                    PEG_METHOD_EXIT();
470                                    return((PEGASUS_THREAD_RETURN)1);
471                                }
472 kumpf         1.81         }
473 kumpf         1.82     }
474                        catch (const Exception& e)
475                        {
476                            PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
477                                "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");
478                        }
479                        catch (...)
480                        {
481                            PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
482                                "Caught unrecognized exception.  Exiting _loop.");
483 kumpf         1.81     }
484 kumpf         1.14 
485 kumpf         1.81     PEG_METHOD_EXIT();
486                        return((PEGASUS_THREAD_RETURN)0);
487 mike          1.2  }
488                    
489 kumpf         1.81 Boolean ThreadPool::allocate_and_awaken(
490                        void* parm,
491                        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
492                        Semaphore* blocking)
493 mike          1.2  {
494 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
495                    
496                        // Allocate_and_awaken will not run if the _dying flag is set.
497                        // Once the lock is acquired, ~ThreadPool will not change
498                        // the value of _dying until the lock is released.
499                    
500                        try
501                        {
502                            if (_dying.value())
503                            {
504                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
505                                    "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
506                                // ATTN: Error result has not yet been defined
507                                return true;
508                            }
509                            struct timeval start;
510                            gettimeofday(&start, NULL);
511                            Thread* th = 0;
512                    
513                            th = _idleThreads.remove_first();
514 kumpf         1.57 
515 kumpf         1.81         if (th == 0)
516                            {
517                                if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
518                                {
519                                    th = _initializeThread();
520                                }
521                            }
522                    
523                            if (th == 0)
524                            {
525                                // ATTN-DME-P3-20031103: This trace message should not be
526                                // be labeled TRC_DISCARDED_DATA, because it does not
527                                // necessarily imply that a failure has occurred.  However,
528                                // this label is being used temporarily to help isolate
529                                // the cause of client timeout problems.
530 kumpf         1.60 
531 kumpf         1.81             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
532                                    "ThreadPool::allocate_and_awaken: Insufficient resources: "
533                                        " pool = %s, running threads = %d, idle threads = %d",
534                                    _key, _runningThreads.count(), _idleThreads.count());
535                                return false;
536                            }
537 mike          1.2  
538 kumpf         1.81         // initialize the thread data with the work function and parameters
539                            Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
540                                "Initializing thread with work function and parameters: parm = %p",
541                                parm);
542 mike          1.2  
543 kumpf         1.81         th->delete_tsd("work func");
544                            th->put_tsd("work func", NULL,
545 kumpf         1.70             sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
546 kumpf         1.81             (void *)work);
547                            th->delete_tsd("work parm");
548                            th->put_tsd("work parm", NULL, sizeof(void *), parm);
549                            th->delete_tsd("blocking sem");
550                            if (blocking != 0)
551                                th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
552                    
553                            // put the thread on the running list
554                            _runningThreads.insert_first(th);
555                    
556                            // signal the thread's sleep semaphore to awaken it
557                            Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
558                            PEGASUS_ASSERT(sleep_sem != 0);
559                    
560                            Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
561                            sleep_sem->signal();
562                            th->dereference_tsd();
563 kumpf         1.57     }
564 mday          1.58     catch (...)
565 kumpf         1.57     {
566 kumpf         1.81         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
567                                "ThreadPool::allocate_and_awaken: Operation Failed.");
568                            PEG_METHOD_EXIT();
569                            // ATTN: Error result has not yet been defined
570                            return true;
571 kumpf         1.57     }
572 kumpf         1.81     PEG_METHOD_EXIT();
573                        return true;
574 mike          1.2  }
575                    
576 kumpf         1.81 // caller is responsible for only calling this routine during slack periods
577                    // but should call it at least once per _deallocateWait interval.
578 mday          1.12 
579 kumpf         1.81 Uint32 ThreadPool::cleanupIdleThreads()
580 mike          1.2  {
581 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
582                    
583                        Uint32 numThreadsCleanedUp = 0;
584                    
585                        Uint32 numIdleThreads = _idleThreads.count();
586                        for (Uint32 i = 0; i < numIdleThreads; i++)
587                        {
588                            // Do not dip below the minimum thread count
589                            if (_currentThreads.value() <= (Uint32)_minThreads)
590                            {
591                                break;
592                            }
593                    
594                            Thread* thread = _idleThreads.remove_last();
595                    
596                            // If there are no more threads in the _idleThreads queue, we're done.
597                            if (thread == 0)
598                            {
599                                break;
600                            }
601                    
602 kumpf         1.81         struct timeval* lastActivityTime;
603                            try
604                            {
605                                lastActivityTime = (struct timeval *)thread->try_reference_tsd(
606                                    "last activity time");
607                                PEGASUS_ASSERT(lastActivityTime != 0);
608                            }
609                            catch (...)
610                            {
611                                PEGASUS_ASSERT(false);
612                                _idleThreads.insert_last(thread);
613                                break;
614                            }
615                    
616                            Boolean cleanupThisThread =
617                                _timeIntervalExpired(lastActivityTime, &_deallocateWait);
618                            thread->dereference_tsd();
619                    
620                            if (cleanupThisThread)
621                            {
622                                _cleanupThread(thread);
623 kumpf         1.81             _currentThreads--;
624                                numThreadsCleanedUp++;
625                            }
626                            else
627                            {
628                                _idleThreads.insert_first(thread);
629                            }
630 konrad.r      1.67     }
631 kumpf         1.81 
632                        PEG_METHOD_EXIT();
633                        return numThreadsCleanedUp;
634 konrad.r      1.67 }
635 mday          1.19 
636 kumpf         1.81 void ThreadPool::_cleanupThread(Thread* thread)
637 mday          1.19 {
638 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
639                    
640                        // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
641                        thread->delete_tsd("work func");
642                        thread->put_tsd(
643                            "work func", 0,
644                            sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
645                            (void *) 0);
646                        thread->delete_tsd("work parm");
647                        thread->put_tsd("work parm", 0, sizeof(void *), 0);
648                    
649                        // signal the thread's sleep semaphore to awaken it
650                        Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
651                        PEGASUS_ASSERT(sleep_sem != 0);
652                        sleep_sem->signal();
653                        thread->dereference_tsd();
654                    
655                        thread->join();
656                        delete thread;
657                    
658                        PEG_METHOD_EXIT();
659 mday          1.19 }
660                    
661 kumpf         1.81 Boolean ThreadPool::_timeIntervalExpired(
662                        struct timeval* start,
663                        struct timeval* interval)
664 mday          1.19 {
665 kumpf         1.81     // never time out if the interval is zero
666                        if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
667                        {
668                            return false;
669                        }
670                    
671                        struct timeval now, finish, remaining;
672                        Uint32 usec;
673                        pegasus_gettimeofday(&now);
674                        pegasus_gettimeofday(&remaining);    // Avoid valgrind error
675                    
676                        finish.tv_sec = start->tv_sec + interval->tv_sec;
677                        usec = start->tv_usec + interval->tv_usec;
678                        finish.tv_sec += (usec / 1000000);
679                        usec %= 1000000;
680                        finish.tv_usec = usec;
681                    
682                        return (timeval_subtract(&remaining, &finish, &now) != 0);
683 mday          1.19 }
684                    
685 kumpf         1.81 void ThreadPool::_deleteSemaphore(void *p)
686 mday          1.19 {
687 kumpf         1.81     delete (Semaphore *)p;
688 mday          1.19 }
689                    
690 kumpf         1.81 Thread* ThreadPool::_initializeThread()
691 mday          1.19 {
692 kumpf         1.81     PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
693                    
694                        Thread* th = (Thread *) new Thread(_loop, this, false);
695                    
696                        // allocate a sleep semaphore and pass it in the thread context
697                        // initial count is zero, loop function will sleep until
698                        // we signal the semaphore
699                        Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
700                        th->put_tsd(
701                            "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
702                    
703                        struct timeval* lastActivityTime =
704                            (struct timeval *) ::operator new(sizeof(struct timeval));
705                        pegasus_gettimeofday(lastActivityTime);
706                    
707                        th->put_tsd("last activity time", thread_data::default_delete,
708                            sizeof(struct timeval), (void *)lastActivityTime);
709                        // thread will enter _loop() and sleep on sleep_sem until we signal it
710                    
711                        if (!th->run())
712                        {
713 kumpf         1.81         delete th;
714                            return 0;
715                        }
716                        _currentThreads++;
717                        pegasus_yield();
718                    
719                        PEG_METHOD_EXIT();
720                        return th;
721 mday          1.19 }
722 mike          1.2  
723 kumpf         1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
724                    {
725                        if (th == 0)
726                        {
727                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
728                                "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
729                            throw NullPointer();
730                        }
731                    
732                        try
733                        {
734                            _idleThreads.insert_first(th);
735                        }
736                        catch (...)
737                        {
738                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
739                                "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
740                                    "failed.");
741                        }
742                    }
743 mike          1.2  
744                    PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2