(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.89 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.89 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12            // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2  //
 14            // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 chip  1.11 // of this software and associated documentation files (the "Software"), to
 16            // deal in the Software without restriction, including without limitation the
 17            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 19            // furnished to do so, subject to the following conditions:
 20 karl  1.89 // 
 21 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29            //
 30            //==============================================================================
 31            //
 32            // Author: Mike Day (mdday@us.ibm.com)
 33            //
 34            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 35 chip  1.11 //              added nsk platform support
 36 kumpf 1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 37 a.arora 1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 38 gs.keenan 1.76 //              Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 39 david.dillard 1.83 //              David Dillard, VERITAS Software Corp.
 40                    //                  (david.dillard@veritas.com)
 41 mike          1.2  //
 42                    //%/////////////////////////////////////////////////////////////////////////////
 43                    
 44                    #include "Thread.h"
 45 kumpf         1.68 #include <exception>
 46 kumpf         1.14 #include <Pegasus/Common/Tracer.h>
 47 mike          1.90.2.1 #include "Time.h"
 48 mike          1.2      
 49 mike          1.90.2.6 PEGASUS_USING_STD;
 50                        
 51                        PEGASUS_NAMESPACE_BEGIN
 52                        
 53                        //==============================================================================
 54                        //
 55                        // POSIX Threads Implementation:
 56                        //
 57                        //==============================================================================
 58                        
 59                        #if defined(PEGASUS_HAVE_PTHREADS)
 60                        
 61 mike          1.90.2.7 extern "C" void *_start_wrapper(void *arg_)
 62 mike          1.90.2.6 {
 63 mike          1.90.2.7     StartWrapperArg *arg = (StartWrapperArg *) arg_;
 64 mike          1.90.2.6 
 65 mike          1.90.2.7     void *return_value = (*arg->start) (arg->arg);
 66 mike          1.90.2.6     delete arg;
 67                        
 68                            return return_value;
 69                        }
 70                        
 71 mike          1.90.2.7 static sigset_t *block_signal_mask(sigset_t * sig)
 72 mike          1.90.2.6 {
 73                            sigemptyset(sig);
 74                            // should not be used for main()
 75                            sigaddset(sig, SIGHUP);
 76                            sigaddset(sig, SIGINT);
 77                            // maybe useless, since KILL can't be blocked according to POSIX
 78                            sigaddset(sig, SIGKILL);
 79                        
 80                            sigaddset(sig, SIGABRT);
 81                            sigaddset(sig, SIGALRM);
 82                            sigaddset(sig, SIGPIPE);
 83                        
 84                        
 85                        // Note: older versions of the linux pthreads library use SIGUSR1 and SIGUSR2
 86                        // internally to stop and start threads that are blocking, the newer ones
 87                        // implement this through the kernel's real time signals
 88                        // since SIGSTOP/CONT can handle suspend()/resume() on Linux
 89                        // block them
 90                        // #if defined(PEGASUS_PLATFORM_LINUX_IX86_GNU)
 91                        //     sigaddset(sig, SIGUSR1);
 92                        //     sigaddset(sig, SIGUSR2);
 93 mike          1.90.2.6 // #endif
 94                        #ifndef PEGASUS_PLATFORM_ZOS_ZSERIES_IBM
 95                            pthread_sigmask(SIG_BLOCK, sig, NULL);
 96 mike          1.2      #else
 97 mike          1.90.2.6     sigprocmask(SIG_BLOCK, sig, NULL);
 98 mike          1.2      #endif
 99 mike          1.90.2.6     return sig;
100                        }
101 mike          1.2      
102 mike          1.90.2.7 Thread::Thread(ThreadReturnType(PEGASUS_THREAD_CDECL * start) (void *), void *parameter, Boolean detached):_is_detached(detached),
103                        _cancel_enabled(true),
104                        _cancelled(false),
105                        _start(start), _cleanup(), _tsd(), _thread_parm(parameter), _exit_code(0)
106 mike          1.90.2.6 {
107                            Threads::clear(_handle.thid);
108                        }
109 mike          1.2      
110 mike          1.90.2.6 Thread::~Thread()
111                        {
112                            try
113                            {
114                                join();
115                                empty_tsd();
116                            }
117 mike          1.90.2.7     catch(...)
118 mike          1.90.2.6     {
119                                // Do not allow the destructor to throw an exception
120                            }
121                        }
122                        
123                        #endif /* PEGASUS_HAVE_PTHREADS */
124                        
125                        //==============================================================================
126                        //
127                        // Windows Threads Implementation:
128                        //
129                        //==============================================================================
130                        
131                        #if defined(PEGASUS_HAVE_WINDOWS_THREADS)
132                        
133 mike          1.90.2.7 Thread::Thread(ThreadReturnType(PEGASUS_THREAD_CDECL * start) (void *),
134                                       void *parameter,
135                                       Boolean detached):_is_detached(detached),
136                        _cancel_enabled(true),
137                        _cancelled(false),
138                        _start(start), _cleanup(), _tsd(), _thread_parm(parameter), _exit_code(0)
139 mike          1.90.2.6 {
140 mike          1.90.2.7     Threads::clear(_handle.thid);
141 mike          1.90.2.6 }
142                        
143                        Thread::~Thread()
144                        {
145 mike          1.90.2.7     try
146                            {
147                                join();
148                                empty_tsd();
149                            }
150                            catch(...)
151                            {
152                            }
153 mike          1.90.2.6 }
154                        
155                        #endif /* PEGASUS_HAVE_WINDOWS_THREADS */
156                        
157                        //==============================================================================
158                        //
159                        // Common implementation:
160                        //
161                        //==============================================================================
162 mday          1.42     
163 mike          1.90.2.7 void thread_data::default_delete(void *data)
164 chip          1.11     {
165 mike          1.90.2.7     if (data != NULL)
166                                ::operator  delete(data);
167 mike          1.2      }
168                        
169 mike          1.90.2.7 void language_delete(void *data)
170 chuck         1.43     {
171 mike          1.90.2.7     if (data != NULL)
172                            {
173                                AutoPtr < AcceptLanguageList > al(static_cast <
174                                                                  AcceptLanguageList * >(data));
175                            }
176 chuck         1.43     }
177                        
178 mike          1.2      Boolean Thread::_signals_blocked = false;
179 marek         1.63     #ifndef PEGASUS_OS_ZOS
180 mike          1.90.2.1 TSDKeyType Thread::_platform_thread_key = TSDKeyType(-1);
181 marek         1.63     #else
182 mike          1.90.2.1 TSDKeyType Thread::_platform_thread_key;
183 marek         1.63     #endif
184 chuck         1.37     Boolean Thread::_key_initialized = false;
185 chuck         1.41     Boolean Thread::_key_error = false;
186 chuck         1.37     
187 mike          1.90.2.7 void Thread::cleanup_push(void (*routine) (void *), void *parm)
188 mike          1.2      {
189 mike          1.90.2.7     AutoPtr < cleanup_handler > cu(new cleanup_handler(routine, parm));
190 mike          1.90         _cleanup.insert_front(cu.get());
191 a.arora       1.64         cu.release();
192 mike          1.2          return;
193                        }
194 kumpf         1.81     
195 david.dillard 1.83     void Thread::cleanup_pop(Boolean execute)
196 mike          1.2      {
197 mike          1.90.2.7     AutoPtr < cleanup_handler > cu;
198 chip          1.11         try
199                            {
200 mike          1.90             cu.reset(_cleanup.remove_front());
201 mike          1.2          }
202 mike          1.90.2.7     catch(IPCException &)
203 mike          1.2          {
204 kumpf         1.81             PEGASUS_ASSERT(0);
205 mike          1.90.2.7     }
206                            if (execute == true)
207 kumpf         1.81             cu->execute();
208 mike          1.2      }
209 kumpf         1.81     
210 mike          1.2      
211 david.dillard 1.83     //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
212 mike          1.2      
213                        
214 mike          1.90.2.1 void Thread::exit_self(ThreadReturnType exit_code)
215 chip          1.11     {
216 mike          1.90.2.6 #if defined(PEGASUS_PLATFORM_HPUX_ACC) || \
217                            defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
218                            // NOTE: pthread_exit exhibits unusual behavior on RHEL 3 U2, as
219                            // documented in Bugzilla 3836.  Where feasible, it may be advantageous
220                            // to avoid using this function.
221                            pthread_exit(exit_code);
222                        #else
223 chip          1.11         // execute the cleanup stack and then return
224 mike          1.90.2.7     while (_cleanup.size())
225                            {
226                                try
227                                {
228                                    cleanup_pop(true);
229                                }
230                                catch(IPCException &)
231                                {
232                                    PEGASUS_ASSERT(0);
233                                    break;
234                                }
235                            }
236                            _exit_code = exit_code;
237                            Threads::exit(exit_code);
238                            Threads::clear(_handle.thid);
239 mike          1.2      #endif
240 mike          1.90.2.6 }
241 mike          1.2      
242 chuck         1.39     Sint8 Thread::initializeKey()
243                        {
244 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
245                            if (!Thread::_key_initialized)
246                            {
247                                if (Thread::_key_error)
248                                {
249                                    Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
250 mike          1.90.2.7                           "Thread: ERROR - thread key error");
251 kumpf         1.81                 return -1;
252                                }
253                        
254 mike          1.90.2.1         if (TSDKey::create(&Thread::_platform_thread_key) == 0)
255 kumpf         1.81             {
256                                    Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
257 mike          1.90.2.7                           "Thread: able to create a thread key");
258 kumpf         1.81                 Thread::_key_initialized = true;
259                                }
260                                else
261                                {
262                                    Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
263 mike          1.90.2.7                           "Thread: ERROR - unable to create a thread key");
264 kumpf         1.81                 Thread::_key_error = true;
265                                    return -1;
266                                }
267                            }
268 chuck         1.39     
269 kumpf         1.81         PEG_METHOD_EXIT();
270                            return 0;
271 chuck         1.39     }
272                        
273 mike          1.90.2.7 Thread *Thread::getCurrent()
274 chuck         1.37     {
275 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
276 chuck         1.40         if (Thread::initializeKey() != 0)
277 chuck         1.39         {
278 kumpf         1.81             return NULL;
279 chuck         1.39         }
280 kumpf         1.81         PEG_METHOD_EXIT();
281 mike          1.90.2.7     return (Thread *) TSDKey::get_thread_specific(_platform_thread_key);
282 chuck         1.39     }
283                        
284                        void Thread::setCurrent(Thread * thrd)
285                        {
286 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
287                            if (Thread::initializeKey() == 0)
288                            {
289 mike          1.90.2.7         if (TSDKey::
290                                    set_thread_specific(Thread::_platform_thread_key,
291                                                        (void *) thrd) == 0)
292 chuck         1.39             {
293 kumpf         1.81                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
294                                        "Successful set Thread * into thread specific storage");
295 chuck         1.39             }
296                                else
297                                {
298 kumpf         1.81                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
299                                        "ERROR: error setting Thread * into thread specific storage");
300 chuck         1.39             }
301 kumpf         1.81         }
302                            PEG_METHOD_EXIT();
303 chuck         1.37     }
304                        
305 mike          1.90.2.7 AcceptLanguageList *Thread::getLanguages()
306 chuck         1.37     {
307 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
308                        
309 mike          1.90.2.7     Thread *curThrd = Thread::getCurrent();
310 kumpf         1.81         if (curThrd == NULL)
311                                return NULL;
312 mike          1.90.2.7     AcceptLanguageList *acceptLangs =
313                                (AcceptLanguageList *) curThrd->reference_tsd("acceptLanguages");
314 kumpf         1.81         curThrd->dereference_tsd();
315                            PEG_METHOD_EXIT();
316                            return acceptLangs;
317 chuck         1.37     }
318                        
319 mike          1.90.2.7 void Thread::setLanguages(AcceptLanguageList * langs)   // l10n
320 chuck         1.37     {
321 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
322                        
323 mike          1.90.2.7     Thread *currentThrd = Thread::getCurrent();
324 kumpf         1.81         if (currentThrd != NULL)
325                            {
326                                // deletes the old tsd and creates a new one
327                                currentThrd->put_tsd("acceptLanguages",
328 mike          1.90.2.7                              language_delete,
329                                                     sizeof (AcceptLanguageList *), langs);
330 kumpf         1.81         }
331                        
332                            PEG_METHOD_EXIT();
333 chuck         1.37     }
334                        
335 mike          1.90.2.7 void Thread::clearLanguages()   // l10n
336 chuck         1.37     {
337 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
338                        
339 mike          1.90.2.7     Thread *currentThrd = Thread::getCurrent();
340 kumpf         1.81         if (currentThrd != NULL)
341                            {
342                                // deletes the old tsd
343                                currentThrd->delete_tsd("acceptLanguages");
344                            }
345                        
346                            PEG_METHOD_EXIT();
347 chuck         1.37     }
348 mday          1.52     
349 kumpf         1.81     ///////////////////////////////////////////////////////////////////////////////
350                        //
351                        // ThreadPool
352                        //
353                        ///////////////////////////////////////////////////////////////////////////////
354                        
355 mike          1.90.2.7 ThreadPool::ThreadPool(Sint16 initialSize,
356                                               const char *key,
357                                               Sint16 minThreads,
358                                               Sint16 maxThreads,
359                                               struct timeval
360                                               &deallocateWait):_maxThreads(maxThreads),
361                        _minThreads(minThreads), _currentThreads(0), _idleThreads(),
362                        _runningThreads(), _dying(0)
363 mday          1.58     {
364 kumpf         1.81         _deallocateWait.tv_sec = deallocateWait.tv_sec;
365                            _deallocateWait.tv_usec = deallocateWait.tv_usec;
366 mday          1.58     
367 kumpf         1.81         memset(_key, 0x00, 17);
368                            if (key != 0)
369                            {
370                                strncpy(_key, key, 16);
371                            }
372                        
373                            if ((_maxThreads > 0) && (_maxThreads < initialSize))
374                            {
375                                _maxThreads = initialSize;
376                            }
377 mday          1.58     
378 kumpf         1.81         if (_minThreads > initialSize)
379                            {
380                                _minThreads = initialSize;
381                            }
382 mday          1.52     
383 kumpf         1.81         for (int i = 0; i < initialSize; i++)
384                            {
385                                _addToIdleThreadsQueue(_initializeThread());
386                            }
387                        }
388 mday          1.20     
389 kumpf         1.81     ThreadPool::~ThreadPool()
390 mday          1.20     {
391 kumpf         1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
392 konrad.r      1.86     
393 kumpf         1.81         try
394                            {
395 mike          1.90.2.7         // Set the dying flag so all thread know the destructor has been
396                                // entered
397 kumpf         1.81             _dying++;
398 mike          1.90.2.7         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
399                                              "Cleaning up %d idle threads. ", _currentThreads.get());
400 mike          1.90.2.4 
401 mike          1.87             while (_currentThreads.get() > 0)
402 kumpf         1.81             {
403 mike          1.90.2.7             Thread *thread = _idleThreads.remove_front();
404 kumpf         1.81                 if (thread != 0)
405                                    {
406                                        _cleanupThread(thread);
407                                        _currentThreads--;
408                                    }
409                                    else
410                                    {
411 mike          1.90.2.1                 Threads::yield();
412 kumpf         1.81                 }
413                                }
414                            }
415 mike          1.90.2.7     catch(...)
416 kumpf         1.81         {
417                            }
418 mday          1.20     }
419                        
420 mike          1.90.2.7 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
421 kumpf         1.81     {
422                            PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
423                        
424                            try
425                            {
426 mike          1.90.2.7         Thread *myself = (Thread *) parm;
427 kumpf         1.82             PEGASUS_ASSERT(myself != 0);
428 kumpf         1.81     
429 kumpf         1.82             // Set myself into thread specific storage
430                                // This will allow code to get its own Thread
431                                Thread::setCurrent(myself);
432 kumpf         1.81     
433 mike          1.90.2.7         ThreadPool *pool = (ThreadPool *) myself->get_parm();
434 kumpf         1.82             PEGASUS_ASSERT(pool != 0);
435 mike          1.2      
436 mike          1.90.2.7         Semaphore *sleep_sem = 0;
437                                struct timeval *lastActivityTime = 0;
438 chuck         1.39     
439 kumpf         1.81             try
440                                {
441 mike          1.90.2.7             sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
442 kumpf         1.81                 myself->dereference_tsd();
443 kumpf         1.82                 PEGASUS_ASSERT(sleep_sem != 0);
444                        
445                                    lastActivityTime =
446 mike          1.90.2.7                 (struct timeval *) myself->
447                                        reference_tsd("last activity time");
448 kumpf         1.81                 myself->dereference_tsd();
449 kumpf         1.82                 PEGASUS_ASSERT(lastActivityTime != 0);
450 kumpf         1.81             }
451 mike          1.90.2.7         catch(...)
452 kumpf         1.81             {
453                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
454 mike          1.90.2.7                           "ThreadPool::_loop: Failure getting sleep_sem or "
455                                                  "lastActivityTime.");
456 kumpf         1.81                 PEGASUS_ASSERT(false);
457                                    pool->_idleThreads.remove(myself);
458                                    pool->_currentThreads--;
459                                    PEG_METHOD_EXIT();
460 mike          1.90.2.7             return ((ThreadReturnType) 1);
461 kumpf         1.81             }
462 mday          1.52     
463 kumpf         1.82             while (1)
464 kumpf         1.81             {
465 kumpf         1.82                 try
466                                    {
467                                        sleep_sem->wait();
468                                    }
469 mike          1.90.2.7             catch(...)
470 kumpf         1.82                 {
471                                        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
472 mike          1.90.2.7                               "ThreadPool::_loop: failure on sleep_sem->wait().");
473 kumpf         1.82                     PEGASUS_ASSERT(false);
474                                        pool->_idleThreads.remove(myself);
475                                        pool->_currentThreads--;
476                                        PEG_METHOD_EXIT();
477 mike          1.90.2.7                 return ((ThreadReturnType) 1);
478 kumpf         1.82                 }
479                        
480                                    // When we awaken we reside on the _runningThreads queue, not the
481                                    // _idleThreads queue.
482                        
483 mike          1.90.2.7             ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
484                                    void *parm = 0;
485                                    Semaphore *blocking_sem = 0;
486 kumpf         1.82     
487                                    try
488                                    {
489 mike          1.90.2.7                 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
490 kumpf         1.82                         myself->reference_tsd("work func");
491                                        myself->dereference_tsd();
492                                        parm = myself->reference_tsd("work parm");
493                                        myself->dereference_tsd();
494 mike          1.90.2.7                 blocking_sem =
495                                            (Semaphore *) myself->reference_tsd("blocking sem");
496 kumpf         1.82                     myself->dereference_tsd();
497                                    }
498 mike          1.90.2.7             catch(...)
499 kumpf         1.82                 {
500                                        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
501 mike          1.90.2.7                               "ThreadPool::_loop: Failure accessing work func, work parm, "
502                                                      "or blocking sem.");
503 kumpf         1.82                     PEGASUS_ASSERT(false);
504                                        pool->_idleThreads.remove(myself);
505                                        pool->_currentThreads--;
506                                        PEG_METHOD_EXIT();
507 mike          1.90.2.7                 return ((ThreadReturnType) 1);
508 kumpf         1.82                 }
509                        
510                                    if (work == 0)
511                                    {
512 carolann.graves 1.84                     Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
513 mike            1.90.2.7                               "ThreadPool::_loop: work func is 0, meaning we should exit.");
514 kumpf           1.82                     break;
515                                      }
516 mike            1.2      
517 mike            1.90.2.1             Time::gettimeofday(lastActivityTime);
518 konrad.r        1.67     
519 kumpf           1.82                 try
520                                      {
521 mike            1.90.2.7                 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
522                                                           "Work starting.");
523 kumpf           1.82                     work(parm);
524 mike            1.90.2.7                 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
525                                                           "Work finished.");
526 kumpf           1.82                 }
527 mike            1.90.2.7             catch(Exception & e)
528 kumpf           1.82                 {
529                                          PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
530 mike            1.90.2.7                                  String
531                                                           ("Exception from work in ThreadPool::_loop: ")
532                                                           + e.getMessage());
533 kumpf           1.82                 }
534 kumpf           1.68     #if !defined(PEGASUS_OS_LSB)
535 mike            1.90.2.7             catch(const exception & e)
536 kumpf           1.82                 {
537                                          PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
538 mike            1.90.2.7                                  String
539                                                           ("Exception from work in ThreadPool::_loop: ")
540                                                           + e.what());
541 kumpf           1.82                 }
542 kumpf           1.68     #endif
543 mike            1.90.2.7             catch(...)
544 kumpf           1.82                 {
545                                          PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
546 mike            1.90.2.7                                  "Unknown exception from work in ThreadPool::_loop.");
547 kumpf           1.82                 }
548 kumpf           1.81     
549 kumpf           1.82                 // put myself back onto the available list
550                                      try
551 kumpf           1.57                 {
552 mike            1.90.2.1                 Time::gettimeofday(lastActivityTime);
553 kumpf           1.82                     if (blocking_sem != 0)
554                                          {
555                                              blocking_sem->signal();
556                                          }
557 s.hills         1.49     
558 mike            1.90                     pool->_runningThreads.remove(myself);
559                                          pool->_idleThreads.insert_front(myself);
560 kumpf           1.82                 }
561 mike            1.90.2.7             catch(...)
562 kumpf           1.82                 {
563                                          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
564 mike            1.90.2.7                               "ThreadPool::_loop: Adding thread to idle pool failed.");
565 kumpf           1.82                     PEGASUS_ASSERT(false);
566                                          pool->_currentThreads--;
567                                          PEG_METHOD_EXIT();
568 mike            1.90.2.7                 return ((ThreadReturnType) 1);
569 kumpf           1.82                 }
570 kumpf           1.81             }
571 kumpf           1.82         }
572 mike            1.90.2.7     catch(const Exception & e)
573 kumpf           1.82         {
574                                  PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
575 mike            1.90.2.7                          "Caught exception: \"" + e.getMessage() +
576                                                   "\".  Exiting _loop.");
577 kumpf           1.82         }
578 mike            1.90.2.7     catch(...)
579 kumpf           1.82         {
580                                  PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
581 mike            1.90.2.7                          "Caught unrecognized exception.  Exiting _loop.");
582 kumpf           1.81         }
583 kumpf           1.14     
584 kumpf           1.81         PEG_METHOD_EXIT();
585 mike            1.90.2.7     return ((ThreadReturnType) 0);
586 mike            1.2      }
587                          
588 mike            1.90.2.7 ThreadStatus ThreadPool::allocate_and_awaken(void *parm,
589                                                                       ThreadReturnType
590                                                                       (PEGASUS_THREAD_CDECL *
591                                                                        work) (void *),
592                                                                       Semaphore * blocking)
593 mike            1.2      {
594 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
595                          
596                              // Allocate_and_awaken will not run if the _dying flag is set.
597                              // Once the lock is acquired, ~ThreadPool will not change
598                              // the value of _dying until the lock is released.
599                          
600                              try
601                              {
602 mike            1.87             if (_dying.get())
603 kumpf           1.81             {
604                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
605 mike            1.90.2.7                           "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
606 konrad.r        1.86                 return PEGASUS_THREAD_UNAVAILABLE;
607 kumpf           1.81             }
608                                  struct timeval start;
609 mike            1.90.2.1         Time::gettimeofday(&start);
610 mike            1.90.2.7         Thread *th = 0;
611 kumpf           1.81     
612 mike            1.90             th = _idleThreads.remove_front();
613 kumpf           1.57     
614 kumpf           1.81             if (th == 0)
615                                  {
616 mike            1.90.2.7             if ((_maxThreads == 0) ||
617                                          (_currentThreads.get() < Uint32(_maxThreads)))
618 kumpf           1.81                 {
619                                          th = _initializeThread();
620                                      }
621                                  }
622                          
623                                  if (th == 0)
624                                  {
625                                      // ATTN-DME-P3-20031103: This trace message should not be
626                                      // be labeled TRC_DISCARDED_DATA, because it does not
627                                      // necessarily imply that a failure has occurred.  However,
628                                      // this label is being used temporarily to help isolate
629                                      // the cause of client timeout problems.
630                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
631 mike            1.90.2.7                           "ThreadPool::allocate_and_awaken: Insufficient resources: "
632                                                    " pool = %s, running threads = %d, idle threads = %d",
633                                                    _key, _runningThreads.size(), _idleThreads.size());
634 konrad.r        1.86                 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
635 kumpf           1.81             }
636 mike            1.2      
637 kumpf           1.81             // initialize the thread data with the work function and parameters
638                                  Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
639 mike            1.90.2.7                       "Initializing thread with work function and parameters: parm = %p",
640                                                parm);
641 mike            1.2      
642 kumpf           1.81             th->delete_tsd("work func");
643                                  th->put_tsd("work func", NULL,
644 mike            1.90.2.7                     sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
645                                                      (void *)), (void *) work);
646 kumpf           1.81             th->delete_tsd("work parm");
647 mike            1.90.2.7         th->put_tsd("work parm", NULL, sizeof (void *), parm);
648 kumpf           1.81             th->delete_tsd("blocking sem");
649                                  if (blocking != 0)
650 mike            1.90.2.7             th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
651 kumpf           1.81     
652                                  // put the thread on the running list
653 mike            1.90             _runningThreads.insert_front(th);
654 kumpf           1.81     
655                                  // signal the thread's sleep semaphore to awaken it
656 mike            1.90.2.7         Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
657 kumpf           1.81             PEGASUS_ASSERT(sleep_sem != 0);
658                          
659                                  Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
660                                  sleep_sem->signal();
661                                  th->dereference_tsd();
662 kumpf           1.57         }
663 mike            1.90.2.7     catch(...)
664 kumpf           1.57         {
665 kumpf           1.81             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
666 mike            1.90.2.7                       "ThreadPool::allocate_and_awaken: Operation Failed.");
667 kumpf           1.81             PEG_METHOD_EXIT();
668                                  // ATTN: Error result has not yet been defined
669 konrad.r        1.86             return PEGASUS_THREAD_SETUP_FAILURE;
670 kumpf           1.57         }
671 kumpf           1.81         PEG_METHOD_EXIT();
672 konrad.r        1.86         return PEGASUS_THREAD_OK;
673 mike            1.2      }
674                          
675 kumpf           1.81     // caller is responsible for only calling this routine during slack periods
676                          // but should call it at least once per _deallocateWait interval.
677 mday            1.12     
678 kumpf           1.81     Uint32 ThreadPool::cleanupIdleThreads()
679 mike            1.2      {
680 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
681                          
682                              Uint32 numThreadsCleanedUp = 0;
683                          
684 mike            1.90         Uint32 numIdleThreads = _idleThreads.size();
685 kumpf           1.81         for (Uint32 i = 0; i < numIdleThreads; i++)
686                              {
687                                  // Do not dip below the minimum thread count
688 mike            1.90.2.7         if (_currentThreads.get() <= (Uint32) _minThreads)
689 kumpf           1.81             {
690                                      break;
691                                  }
692                          
693 mike            1.90.2.7         Thread *thread = _idleThreads.remove_back();
694 kumpf           1.81     
695 mike            1.90.2.7         // If there are no more threads in the _idleThreads queue, we're
696                                  // done.
697 kumpf           1.81             if (thread == 0)
698                                  {
699                                      break;
700                                  }
701                          
702 mike            1.90.2.7         struct timeval *lastActivityTime;
703 kumpf           1.81             try
704                                  {
705 mike            1.90.2.7             lastActivityTime =
706                                          (struct timeval *) thread->
707                                          try_reference_tsd("last activity time");
708 kumpf           1.81                 PEGASUS_ASSERT(lastActivityTime != 0);
709                                  }
710 mike            1.90.2.7         catch(...)
711 kumpf           1.81             {
712                                      PEGASUS_ASSERT(false);
713 mike            1.90                 _idleThreads.insert_back(thread);
714 kumpf           1.81                 break;
715                                  }
716                          
717                                  Boolean cleanupThisThread =
718                                      _timeIntervalExpired(lastActivityTime, &_deallocateWait);
719                                  thread->dereference_tsd();
720                          
721                                  if (cleanupThisThread)
722                                  {
723                                      _cleanupThread(thread);
724                                      _currentThreads--;
725                                      numThreadsCleanedUp++;
726                                  }
727                                  else
728                                  {
729 mike            1.90                 _idleThreads.insert_front(thread);
730 kumpf           1.81             }
731 konrad.r        1.67         }
732 kumpf           1.81     
733                              PEG_METHOD_EXIT();
734                              return numThreadsCleanedUp;
735 konrad.r        1.67     }
736 mday            1.19     
737 mike            1.90.2.7 void ThreadPool::_cleanupThread(Thread * thread)
738 mday            1.19     {
739 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
740                          
741                              // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
742                              thread->delete_tsd("work func");
743 mike            1.90.2.7     thread->put_tsd("work func", 0,
744                                              sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
745                                                      (void *)), (void *) 0);
746 kumpf           1.81         thread->delete_tsd("work parm");
747 mike            1.90.2.7     thread->put_tsd("work parm", 0, sizeof (void *), 0);
748 kumpf           1.81     
749                              // signal the thread's sleep semaphore to awaken it
750 mike            1.90.2.7     Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
751 kumpf           1.81         PEGASUS_ASSERT(sleep_sem != 0);
752                              sleep_sem->signal();
753                              thread->dereference_tsd();
754                          
755                              thread->join();
756                              delete thread;
757                          
758                              PEG_METHOD_EXIT();
759 mday            1.19     }
760                          
761 mike            1.90.2.7 Boolean ThreadPool::_timeIntervalExpired(struct timeval *start,
762                                                                   struct timeval *interval)
763 mday            1.19     {
764 kumpf           1.81         // never time out if the interval is zero
765                              if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
766                              {
767                                  return false;
768                              }
769                          
770                              struct timeval now, finish, remaining;
771                              Uint32 usec;
772 mike            1.90.2.1     Time::gettimeofday(&now);
773 mike            1.90.2.7     Time::gettimeofday(&remaining);     // Avoid valgrind error
774 kumpf           1.81     
775                              finish.tv_sec = start->tv_sec + interval->tv_sec;
776                              usec = start->tv_usec + interval->tv_usec;
777                              finish.tv_sec += (usec / 1000000);
778                              usec %= 1000000;
779                              finish.tv_usec = usec;
780                          
781 mike            1.90.2.1     return (Time::subtract(&remaining, &finish, &now) != 0);
782 mday            1.19     }
783                          
784 kumpf           1.81     void ThreadPool::_deleteSemaphore(void *p)
785 mday            1.19     {
786 mike            1.90.2.7     delete(Semaphore *) p;
787 mday            1.19     }
788                          
789 mike            1.90.2.7 Thread *ThreadPool::_initializeThread()
790 mday            1.19     {
791 kumpf           1.81         PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
792                          
793 mike            1.90.2.7     Thread *th = (Thread *) new Thread(_loop, this, false);
794 kumpf           1.81     
795                              // allocate a sleep semaphore and pass it in the thread context
796                              // initial count is zero, loop function will sleep until
797                              // we signal the semaphore
798 mike            1.90.2.7     Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
799                              th->put_tsd("sleep sem", &_deleteSemaphore, sizeof (Semaphore),
800                                          (void *) sleep_sem);
801 kumpf           1.81     
802 mike            1.90.2.7     struct timeval *lastActivityTime =
803                                  (struct timeval *)::operator  new(sizeof (struct timeval));
804 mike            1.90.2.1     Time::gettimeofday(lastActivityTime);
805 kumpf           1.81     
806                              th->put_tsd("last activity time", thread_data::default_delete,
807 mike            1.90.2.7                 sizeof (struct timeval), (void *) lastActivityTime);
808 kumpf           1.81         // thread will enter _loop() and sleep on sleep_sem until we signal it
809                          
810 konrad.r        1.86         if (th->run() != PEGASUS_THREAD_OK)
811 kumpf           1.81         {
812 mike            1.90.2.7         Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
813                                                "Could not create thread. Error code is %d.", errno);
814 kumpf           1.81             delete th;
815                                  return 0;
816                              }
817                              _currentThreads++;
818 mike            1.90.2.1     Threads::yield();
819 kumpf           1.81     
820                              PEG_METHOD_EXIT();
821                              return th;
822 mday            1.19     }
823 mike            1.2      
824 mike            1.90.2.7 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
825 kumpf           1.81     {
826                              if (th == 0)
827                              {
828                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
829 mike            1.90.2.7                       "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
830 kumpf           1.81             throw NullPointer();
831                              }
832                          
833                              try
834                              {
835 mike            1.90             _idleThreads.insert_front(th);
836 kumpf           1.81         }
837 mike            1.90.2.7     catch(...)
838 kumpf           1.81         {
839                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
840 mike            1.90.2.7                       "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
841                                                "failed.");
842 kumpf           1.81         }
843                          }
844 mike            1.2      
845 mike            1.90.2.1 // ATTN: not sure where to put this!
846                          #ifdef PEGASUS_ZOS_SECURITY
847 mike            1.90.2.7 bool isEnhancedSecurity = 99;
848 mike            1.90.2.1 #endif
849                          
850 mike            1.2      PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2