(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 chip  1.11 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12 kumpf 1.17 // 
 13 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Day (mdday@us.ibm.com)
 25            //
 26            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 27 chip  1.11 //              added nsk platform support
 28 mike  1.2  //
 29            //%/////////////////////////////////////////////////////////////////////////////
 30            
 31            #include "Thread.h"
 32            #include <Pegasus/Common/IPC.h>
 33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
 34 mike  1.2  
 35            #if defined(PEGASUS_OS_TYPE_WINDOWS)
 36 chip  1.11 # include "ThreadWindows.cpp"
 37 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 38            # include "ThreadUnix.cpp"
 39            #elif defined(PEGASUS_OS_TYPE_NSK)
 40            # include "ThreadNsk.cpp"
 41            #else
 42            # error "Unsupported platform"
 43            #endif
 44            
 45            PEGASUS_NAMESPACE_BEGIN
 46            
 47 chip  1.11 void thread_data::default_delete(void * data)
 48            {
 49 mike  1.2     if( data != NULL)
 50 chip  1.11       ::operator delete(data);
 51 mike  1.2  }
 52            
 53            Boolean Thread::_signals_blocked = false;
 54            
 55            // for non-native implementations
 56 chip  1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 57 mike  1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 58            {
 59                cleanup_handler *cu = new cleanup_handler(routine, parm);
 60 chip  1.11     try
 61                {
 62            	_cleanup.insert_first(cu);
 63                }
 64                catch(IPCException&)
 65 mike  1.2      {
 66            	delete cu;
 67 chip  1.11 	throw;
 68 mike  1.2      }
 69                return;
 70            }
 71 chip  1.11 	
 72 mike  1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 73            {
 74                cleanup_handler *cu ;
 75 chip  1.11     try
 76                {
 77 mike  1.2  	cu = _cleanup.remove_first() ;
 78                }
 79 chip  1.11     catch(IPCException&)
 80 mike  1.2      {
 81 chip  1.11 	PEGASUS_ASSERT(0);
 82 mike  1.2       }
 83                if(execute == true)
 84            	cu->execute();
 85                delete cu;
 86            }
 87 chip  1.11 		
 88 mike  1.2  #endif
 89            
 90            
 91 kumpf 1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 92 mike  1.2  
 93            
 94 chip  1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
 95            void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
 96            {
 97                // execute the cleanup stack and then return
 98 mike  1.2     while( _cleanup.count() )
 99               {
100 chip  1.11        try
101                   {
102            	   cleanup_pop(true);
103                   }
104                   catch(IPCException&)
105                   {
106            	  PEGASUS_ASSERT(0);
107            	  break;
108 mike  1.2         }
109               }
110               _exit_code = exit_code;
111               exit_thread(exit_code);
112 mday  1.4     _handle.thid = 0;
113 mike  1.2  }
114            
115            
116            #endif
117            
118 mday  1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
119            
120            
121            void ThreadPool::kill_idle_threads(void)
122            {
123               static struct timeval now, last = {0, 0};
124               
125               pegasus_gettimeofday(&now);
126               if(now.tv_sec - last.tv_sec > 5)
127               {
128                  _pools.lock();
129                  ThreadPool *p = _pools.next(0);
130                  while(p != 0)
131                  {
132            	 try 
133            	 {
134            	    p->kill_dead_threads();
135            	 }
136            	 catch(...)
137            	 {
138            	 }
139 mday  1.20 	 p = _pools.next(p);
140                  }
141                  _pools.unlock();
142                  pegasus_gettimeofday(&last);
143               }
144            }
145            
146            
147 mike  1.2  ThreadPool::ThreadPool(Sint16 initial_size,
148 kumpf 1.8  		       const Sint8 *key,
149 mike  1.2  		       Sint16 min,
150            		       Sint16 max,
151            		       struct timeval & alloc_wait,
152 chip  1.11 		       struct timeval & dealloc_wait,
153 mike  1.2  		       struct timeval & deadlock_detect)
154               : _max_threads(max), _min_threads(min),
155 mday  1.12      _current_threads(0),
156                 _pool(true), _running(true),
157 mike  1.2       _dead(true), _dying(0)
158            {
159               _allocate_wait.tv_sec = alloc_wait.tv_sec;
160               _allocate_wait.tv_usec = alloc_wait.tv_usec;
161 chip  1.11    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
162 mike  1.2     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
163               _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
164               _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
165               memset(_key, 0x00, 17);
166               if(key != 0)
167                  strncpy(_key, key, 16);
168 mday  1.21    if(_max_threads > 0 && _max_threads < initial_size)
169 mike  1.2        _max_threads = initial_size;
170               if(_min_threads > initial_size)
171                  _min_threads = initial_size;
172 chip  1.11 
173 mike  1.2     int i;
174               for(i = 0; i < initial_size; i++)
175               {
176                  _link_pool(_init_thread());
177               }
178 mday  1.20    _pools.insert_last(this);
179               
180 mike  1.2  }
181            
182 chip  1.11 
183 mike  1.2  
184            ThreadPool::~ThreadPool(void)
185            {
186 mday  1.19 
187 mday  1.33   try {
188 mday  1.20    _pools.remove(this);
189 mike  1.2     _dying++;
190 mday  1.12    Thread *th = 0;
191 kumpf 1.16    th = _pool.remove_first();
192 mike  1.2     while(th != 0)
193 chip  1.11    {
194 mike  1.2        Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
195            
196                  if(sleep_sem == 0)
197                  {
198            	 th->dereference_tsd();
199            	 throw NullPointer();
200                  }
201 chip  1.11 
202 mike  1.2        sleep_sem->signal();
203                  sleep_sem->signal();
204                  th->dereference_tsd();
205                  // signal the thread's sleep semaphore
206                  th->cancel();
207                  th->join();
208                  th->empty_tsd();
209                  delete th;
210                  th = _pool.remove_first();
211               }
212            
213               th = _running.remove_first();
214               while(th != 0)
215               {
216                  // signal the thread's sleep semaphore
217                  th->cancel();
218                  th->join();
219                  th->empty_tsd();
220                  delete th;
221                  th = _running.remove_first();
222               }
223 mike  1.2  
224               th = _dead.remove_first();
225               while(th != 0)
226               {
227                  // signal the thread's sleep semaphore
228                  th->cancel();
229                  th->join();
230                  th->empty_tsd();
231                  delete th;
232                  th = _dead.remove_first();
233               }
234 mday  1.33   }
235              catch(...){}
236 mike  1.2  }
237            
238 chip  1.11 // make this static to the class
239 mike  1.2  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
240            {
241 kumpf 1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
242            
243 mday  1.33    Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool::_loop entered");
244 mike  1.2     Thread *myself = (Thread *)parm;
245               if(myself == 0)
246 kumpf 1.14    {
247                  PEG_METHOD_EXIT();
248 mike  1.2        throw NullPointer();
249 kumpf 1.14    }
250 mike  1.2     ThreadPool *pool = (ThreadPool *)myself->get_parm();
251 kumpf 1.14    if(pool == 0 ) 
252               {
253                  PEG_METHOD_EXIT();
254 mike  1.2        throw NullPointer();
255 kumpf 1.14    }
256 mike  1.5     Semaphore *sleep_sem = 0;
257 mday  1.13    Semaphore *blocking_sem = 0;
258               
259 mike  1.5     struct timeval *deadlock_timer = 0;
260 chip  1.11 
261               try
262 mike  1.2     {
263                  sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
264                  myself->dereference_tsd();
265                  deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
266 mday  1.22       myself->dereference_tsd(); 
267 mike  1.2     }
268 mike  1.6     catch(IPCException &)
269 mike  1.2     {
270 mday  1.32       Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
271 mday  1.33 		    "IPCException Caught - EXITING");
272 kumpf 1.14       PEG_METHOD_EXIT();
273 mike  1.2        myself->exit_self(0);
274               }
275 mday  1.30    catch(...)
276               {
277 mday  1.32       Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
278 mday  1.33 		    "Unknown  Exception Caught - EXITING");
279 mday  1.30       PEG_METHOD_EXIT();
280                  myself->exit_self(0);
281               }
282               
283 mike  1.2     if(sleep_sem == 0 || deadlock_timer == 0)
284 kumpf 1.14    {
285 mday  1.32       Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
286 mday  1.33 		    "NULL Semaphore  - EXITING");
287 kumpf 1.14       PEG_METHOD_EXIT();
288 mike  1.2        throw NullPointer();
289 kumpf 1.14    }
290 mike  1.2  
291               while(pool->_dying < 1)
292               {
293 mday  1.32       Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
294 mday  1.33 		    "ThreadPool::_loop - waiting on semaphore");
295 mike  1.2        sleep_sem->wait();
296 mday  1.32       Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
297 mday  1.33 		    "ThreadPool::_loop - awakened from semaphore");
298 mike  1.2        // when we awaken we reside on the running queue, not the pool queue
299                  if(pool->_dying > 0)
300            	 break;
301 mday  1.32       
302 chip  1.11 
303 mike  1.5        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
304                  void *parm = 0;
305 mike  1.2  
306 chip  1.11       try
307 mike  1.2        {
308            	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
309            	    myself->reference_tsd("work func");
310            	 myself->dereference_tsd();
311            	 parm = myself->reference_tsd("work parm");
312            	 myself->dereference_tsd();
313 mday  1.13 	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
314            	 myself->dereference_tsd();
315            
316 mike  1.2        }
317 mike  1.6        catch(IPCException &)
318 mike  1.2        {
319 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
320 mday  1.33 		       "IPCException Caught - EXITING");
321 kumpf 1.14 	 PEG_METHOD_EXIT();
322 mike  1.2  	 myself->exit_self(0);
323                  }
324 chip  1.11 
325 mike  1.2        if(_work == 0)
326 kumpf 1.14       {
327 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
328 mday  1.33 		       "NULL work pointer - EXITING");
329 kumpf 1.14          PEG_METHOD_EXIT();
330 mike  1.2  	 throw NullPointer();
331 kumpf 1.14       }
332 kumpf 1.24 
333                  if(_work ==
334                     (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
335                  {
336 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
337 mday  1.33 		       "Calling the Undertaker");
338 mday  1.23 	 _work(parm);
339 kumpf 1.24       }
340            
341 mike  1.2        gettimeofday(deadlock_timer, NULL);
342 mday  1.20       try 
343                  {
344 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
345 mday  1.33 		       "ThreadPool::_loop - calling work routine");
346 mday  1.20 	 _work(parm);
347 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
348 mday  1.33 		       "ThreadPool::_loop - returned from work routine");
349 mday  1.20       }
350                  catch(...)
351                  {
352 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
353 mday  1.33 		       "Unknown  Exception Caught - EXITING");
354 mday  1.20 	 gettimeofday(deadlock_timer, NULL);
355                  }
356                  gettimeofday(deadlock_timer, NULL);
357 mday  1.13       if( blocking_sem != 0 )
358            	 blocking_sem->signal();
359                  
360 chip  1.11       // put myself back onto the available list
361                  try
362 mike  1.2        {
363            	 pool->_running.remove((void *)myself);
364            	 pool->_link_pool(myself);
365                  }
366 mike  1.6        catch(IPCException &)
367 mike  1.2        {
368 mday  1.32 	 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4, 
369 mday  1.33 		       "IPCException Caught - EXITING");
370 kumpf 1.14 	 PEG_METHOD_EXIT();
371 mike  1.2  	 myself->exit_self(0);
372                  }
373               }
374               // wait to be awakend by the thread pool destructor
375               sleep_sem->wait();
376               myself->test_cancel();
377 kumpf 1.14 
378               PEG_METHOD_EXIT();
379 mike  1.2     myself->exit_self(0);
380               return((PEGASUS_THREAD_RETURN)0);
381            }
382            
383            void ThreadPool::allocate_and_awaken(void *parm,
384            				     PEGASUS_THREAD_RETURN \
385 mday  1.13 				     (PEGASUS_THREAD_CDECL *work)(void *), 
386            				     Semaphore *blocking)
387            
388 mike  1.2     throw(IPCException)
389            {
390 kumpf 1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
391 mike  1.2     struct timeval start;
392               gettimeofday(&start, NULL);
393 chip  1.11 
394 mike  1.2     Thread *th = _pool.remove_first();
395 mday  1.12    
396 mday  1.7     // wait for the right interval and try again
397 mday  1.33    while (th == 0 && _dying < 1)
398 mike  1.2     {
399 mday  1.12       _check_deadlock(&start) ;
400                  
401 mday  1.21       if(_max_threads == 0 || _current_threads < _max_threads)
402 mday  1.33 	{
403            	  th = _init_thread();
404            	  continue;
405            	}
406            
407                  pegasus_sleep(1);
408 mday  1.7        th = _pool.remove_first();
409               }
410 chip  1.11 
411            
412 mike  1.2     if(_dying < 1)
413               {
414                  // initialize the thread data with the work function and parameters
415 kumpf 1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
416                      "Initializing thread with work function and parameters: parm = %p",
417                      parm);
418            
419 kumpf 1.15       th->delete_tsd("work func");
420 chip  1.11       th->put_tsd("work func", NULL,
421 mike  1.2  		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
422            		  (void *)work);
423 kumpf 1.15       th->delete_tsd("work parm");
424 mike  1.2        th->put_tsd("work parm", NULL, sizeof(void *), parm);
425 kumpf 1.15       th->delete_tsd("blocking sem");
426 mday  1.13       if(blocking != 0 )
427            	 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
428                  
429 chip  1.11       // put the thread on the running list
430 mike  1.2        _running.insert_first(th);
431            
432                  // signal the thread's sleep semaphore to awaken it
433                  Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
434            
435                  if(sleep_sem == 0)
436                  {
437            	 th->dereference_tsd();
438 kumpf 1.14          PEG_METHOD_EXIT();
439 mike  1.2  	 throw NullPointer();
440                  }
441 kumpf 1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
442 mike  1.2        sleep_sem->signal();
443                  th->dereference_tsd();
444               }
445               else
446                  _pool.insert_first(th);
447 kumpf 1.14 
448               PEG_METHOD_EXIT();
449 mike  1.2  }
450            
451            // caller is responsible for only calling this routine during slack periods
452            // but should call it at least once per _deadlock_detect with the running q
453            // and at least once per _deallocate_wait for the pool q
454            
455 mday  1.12 Uint32 ThreadPool::kill_dead_threads(void)
456 mike  1.2  	 throw(IPCException)
457            {
458               struct timeval now;
459               gettimeofday(&now, NULL);
460 mday  1.12    Uint32 bodies = 0;
461               
462 mike  1.2     // first go thread the dead q and clean it up as much as possible
463               while(_dead.count() > 0)
464               {
465 mday  1.32 
466 kumpf 1.31 #if !defined(PEGASUS_PLATFORM_HPUX_ACC) && !defined(PEGASUS_PLATFORM_LINUX_IA64_GNU)
467 mday  1.30       PEGASUS_STD(cout) << "ThreadPool:: removing and joining dead thread" << PEGASUS_STD(endl);
468 kumpf 1.31 #endif
469 mike  1.2        Thread *dead = _dead.remove_first();
470                  if(dead == 0)
471            	 throw NullPointer();
472 mday  1.30       dead->join();
473 mike  1.2        delete dead;
474               }
475 chip  1.11 
476               DQueue<Thread> * map[2] =
477 mike  1.2        {
478            	 &_pool, &_running
479                  };
480 chip  1.11 
481            
482 mike  1.2     DQueue<Thread> *q = 0;
483               int i = 0;
484               AtomicInt needed(0);
485 chip  1.11 
486 kumpf 1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
487               // This change prevents the thread pool from killing "hung" threads.
488               // The definition of a "hung" thread is one that has been on the run queue
489               // for longer than the time interval set when the thread pool was created.
490               // Cancelling "hung" threads has proven to be problematic.
491            
492               // With this change the thread pool will not cancel "hung" threads.  This
493               // may prevent a crash depending upon the state of the "hung" thread.  In
494               // the case that the thread is actually hung, this change causes the
495               // thread resources not to be reclaimed.
496            
497               // Idle threads, those that have not executed a routine for a time
498               // interval, continue to be destroyed.  This is normal and should not
499               // cause any problems.
500               for( ; i < 1; i++)
501            #else
502 mday  1.30    for( ; i < 2; i++)
503 kumpf 1.31 #endif
504 mday  1.21    { 
505                  q = map[i];
506 mike  1.2        if(q->count() > 0 )
507                  {
508 chip  1.11 	 try
509 mike  1.2  	 {
510            	    q->try_lock();
511            	 }
512 mday  1.18 	 catch(...)
513 mike  1.2  	 {
514 mday  1.18 	    return bodies;
515 mike  1.2  	 }
516            
517            	 struct timeval dt = { 0, 0 };
518            	 struct timeval *dtp;
519            	 Thread *th = 0;
520            	 th = q->next(th);
521            	 while (th != 0 )
522            	 {
523 chip  1.11 	    try
524 mike  1.2  	    {
525            	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
526            	    }
527 mday  1.18 	    catch(...)
528 mike  1.2  	    {
529 kumpf 1.25 	       q->unlock();
530 mday  1.18 	       return bodies;
531 mike  1.2  	    }
532 chip  1.11 	
533 mike  1.2  	    if(dtp != 0)
534            	    {
535            	       memcpy(&dt, dtp, sizeof(struct timeval));
536            	    }
537            	    th->dereference_tsd();
538            	    struct timeval deadlock_timeout;
539 mday  1.18 	    Boolean too_long;
540            	    if( i == 0)
541            	    {
542            	       too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
543            	    }
544            	    else 
545            	    {
546 mday  1.22 	       too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
547 mday  1.18 	    }
548            	    
549            	    if( true == too_long)
550 mike  1.2  	    {
551            	       // if we are deallocating from the pool, escape if we are
552 chip  1.11 	       // down to the minimum thread count
553 mday  1.13 	       _current_threads--;
554 mday  1.18 	       if( _current_threads.value() < (Uint32)_min_threads )
555 mike  1.2  	       {
556 mday  1.13 		  if( i == 0)
557 mike  1.2  		  {
558 mday  1.13 		     _current_threads++;
559 mike  1.2  		     th = q->next(th);
560            		     continue;
561            		  }
562 chip  1.11 		  else
563 mike  1.2  		  {
564 chip  1.11 		     // we are killing a hung thread and we will drop below the
565 mike  1.2  		     // minimum. create another thread to make up for the one
566            		     // we are about to kill
567            		     needed++;
568            		  }
569            	       }
570 chip  1.11 	
571 mike  1.2  	       th = q->remove_no_lock((void *)th);
572 chip  1.11 	
573 mike  1.2  	       if(th != 0)
574            	       {
575 mday  1.30 		  if( i == 0 )
576 mike  1.2  		  {
577 mday  1.30 		     th->delete_tsd("work func");
578            		     th->put_tsd("work func", NULL,
579            				 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
580            				 (void *)&_undertaker);
581            		     th->delete_tsd("work parm");
582            		     th->put_tsd("work parm", NULL, sizeof(void *), th);
583            		     
584            		     // signal the thread's sleep semaphore to awaken it
585            		     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
586            		     
587            		     if(sleep_sem == 0)
588            		     {
589            			q->unlock();
590            			th->dereference_tsd();
591            			throw NullPointer();
592            		     }
593            		     
594            		     bodies++;
595 mike  1.2  		     th->dereference_tsd();
596 mday  1.30 		     _dead.insert_first(th);
597            		     sleep_sem->signal();
598            		     th = 0;
599            		  }
600            		  else 
601            		  {
602            		     // deadlocked threads
603 kumpf 1.31 #if !defined(PEGASUS_PLATFORM_HPUX_ACC) && !defined(PEGASUS_PLATFORM_LINUX_IA64_GNU)
604 mday  1.30 		     PEGASUS_STD(cout) << "Killing a deadlocked thread" << PEGASUS_STD(endl);
605 kumpf 1.31 #endif
606 mday  1.30 		     th->cancel();
607            		     delete th;
608 mike  1.2  		  }
609            	       }
610            	    }
611            	    th = q->next(th);
612 mday  1.20 	    pegasus_sleep(1);
613 mike  1.2  	 }
614            	 q->unlock();
615            	 while (needed.value() > 0)
616            	 {
617            	    _link_pool(_init_thread());
618            	    needed--;
619 mday  1.20 	    pegasus_sleep(0);
620 mike  1.2  	 }
621                  }
622               }
623 mday  1.18     return bodies; 
624 mike  1.2  }
625            
626 mday  1.12 
627 mike  1.2  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
628            {
629 mday  1.22    // never time out if the interval is zero
630               if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
631                  return false;
632               
633 mday  1.13    struct timeval now, finish, remaining;
634               Uint32 usec;
635 mday  1.33    pegasus_gettimeofday(&now);
636 mday  1.13 
637               finish.tv_sec = start->tv_sec + interval->tv_sec;
638               usec = start->tv_usec + interval->tv_usec;
639               finish.tv_sec += (usec / 1000000);
640               usec %= 1000000;
641               finish.tv_usec = usec;
642                
643               if ( timeval_subtract(&remaining, &finish, &now) )
644 mike  1.2        return true;
645               else
646                  return false;
647            }
648            
649            PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
650            {
651 mday  1.30    exit_thread((PEGASUS_THREAD_RETURN)1);
652               return (PEGASUS_THREAD_RETURN)1;
653 mike  1.2  }
654 mday  1.19 
655            
656             void ThreadPool::_sleep_sem_del(void *p)
657            {
658               if(p != 0)
659               {
660                  delete (Semaphore *)p;
661               }
662            }
663            
664             void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
665            {
666               if (true == check_time(start, &_deadlock_detect))
667                  throw Deadlock(pegasus_thread_self());
668               return;
669            }
670            
671            
672             Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
673            {
674               return(check_time(start, &_deadlock_detect));
675 mday  1.19 }
676            
677             Boolean ThreadPool::_check_dealloc(struct timeval *start)
678            {
679               return(check_time(start, &_deallocate_wait));
680            }
681            
682             Thread *ThreadPool::_init_thread(void) throw(IPCException)
683            {
684 mday  1.32    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
685 mday  1.19    Thread *th = (Thread *) new Thread(_loop, this, false);
686               // allocate a sleep semaphore and pass it in the thread context
687               // initial count is zero, loop function will sleep until
688               // we signal the semaphore
689               Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
690               th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
691               
692               struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
693               th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
694               // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
695               th->run();
696               _current_threads++;
697               pegasus_yield();
698 mday  1.32    PEG_METHOD_EXIT();
699 mday  1.19    
700               return th;
701            }
702            
703             void ThreadPool::_link_pool(Thread *th) throw(IPCException)
704            {
705               if(th == 0)
706                  throw NullPointer();
707               _pool.insert_first(th);
708            }
709            
710 mike  1.2  
711            
712            PEGASUS_NAMESPACE_END
713            

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2