(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 chip  1.11 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12 kumpf 1.17 // 
 13 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Day (mdday@us.ibm.com)
 25            //
 26            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 27 chip  1.11 //              added nsk platform support
 28 mike  1.2  //
 29            //%/////////////////////////////////////////////////////////////////////////////
 30            
 31            #include "Thread.h"
 32            #include <Pegasus/Common/IPC.h>
 33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
 34 mike  1.2  
 35            #if defined(PEGASUS_OS_TYPE_WINDOWS)
 36 chip  1.11 # include "ThreadWindows.cpp"
 37 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 38            # include "ThreadUnix.cpp"
 39            #elif defined(PEGASUS_OS_TYPE_NSK)
 40            # include "ThreadNsk.cpp"
 41            #else
 42            # error "Unsupported platform"
 43            #endif
 44            
 45            PEGASUS_NAMESPACE_BEGIN
 46            
 47 mday  1.42 
 48 chip  1.11 void thread_data::default_delete(void * data)
 49            {
 50 mike  1.2     if( data != NULL)
 51 chip  1.11       ::operator delete(data);
 52 mike  1.2  }
 53            
 54 chuck 1.43 // l10n start
 55            void language_delete(void * data)
 56            {
 57               if( data != NULL)
 58               {
 59                  AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
 60 chuck 1.44       delete al;
 61 chuck 1.43    }
 62            }
 63            // l10n end
 64            
 65 mike  1.2  Boolean Thread::_signals_blocked = false;
 66 chuck 1.37 // l10n
 67 mday  1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
 68 chuck 1.37 Boolean Thread::_key_initialized = false;
 69 chuck 1.41 Boolean Thread::_key_error = false;
 70 chuck 1.37 
 71 mike  1.2  
 72            // for non-native implementations
 73 chip  1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 74 mike  1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 75            {
 76                cleanup_handler *cu = new cleanup_handler(routine, parm);
 77 chip  1.11     try
 78                {
 79            	_cleanup.insert_first(cu);
 80                }
 81                catch(IPCException&)
 82 mike  1.2      {
 83            	delete cu;
 84 chip  1.11 	throw;
 85 mike  1.2      }
 86                return;
 87            }
 88 chip  1.11 	
 89 mike  1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 90            {
 91                cleanup_handler *cu ;
 92 chip  1.11     try
 93                {
 94 mike  1.2  	cu = _cleanup.remove_first() ;
 95                }
 96 chip  1.11     catch(IPCException&)
 97 mike  1.2      {
 98 chip  1.11 	PEGASUS_ASSERT(0);
 99 mike  1.2       }
100                if(execute == true)
101            	cu->execute();
102                delete cu;
103            }
104 chip  1.11 		
105 mike  1.2  #endif
106            
107            
108 kumpf 1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
109 mike  1.2  
110            
111 chip  1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112            void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113            {
114                // execute the cleanup stack and then return
115 mike  1.2     while( _cleanup.count() )
116               {
117 chip  1.11        try
118                   {
119            	   cleanup_pop(true);
120                   }
121                   catch(IPCException&)
122                   {
123            	  PEGASUS_ASSERT(0);
124            	  break;
125 mike  1.2         }
126               }
127               _exit_code = exit_code;
128               exit_thread(exit_code);
129 mday  1.4     _handle.thid = 0;
130 mike  1.2  }
131            
132            
133            #endif
134            
135 chuck 1.37 // l10n start
136 chuck 1.39 Sint8 Thread::initializeKey()
137            {
138               PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139               if (!Thread::_key_initialized)
140               {
141 chuck 1.41 	if (Thread::_key_error)
142            	{
143                   		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144            	        	  "Thread: ERROR - thread key error"); 
145            		return -1;
146            	}
147            
148 chuck 1.39 	if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149            	{
150                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151            	        	  "Thread: able to create a thread key");   
152            	   	Thread::_key_initialized = true;	
153            	}
154            	else
155            	{
156                   		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157            	        	  "Thread: ERROR - unable to create a thread key"); 
158 chuck 1.41 	   	Thread::_key_error = true;
159 chuck 1.39 		return -1;
160            	}
161               }
162            
163               PEG_METHOD_EXIT();
164               return 0;  
165            }
166            
167 chuck 1.37 Thread * Thread::getCurrent()
168            {
169 chuck 1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");	
170 chuck 1.40     if (Thread::initializeKey() != 0)
171 chuck 1.39     {
172            	return NULL;  
173                }
174 chuck 1.38     PEG_METHOD_EXIT();  
175 chuck 1.39     return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
176            }
177            
178            void Thread::setCurrent(Thread * thrd)
179            {
180               PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181               if (Thread::initializeKey() == 0)
182               {
183               	if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184            								 (void *) thrd) == 0)
185                    {
186                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187            	        	  "Successful set Thread * into thread specific storage");   
188                    }
189                    else
190                    {
191                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192            	        	  "ERROR: got error setting Thread * into thread specific storage");   
193                    }
194               }
195               PEG_METHOD_EXIT();  
196 chuck 1.37 }
197            
198            AcceptLanguages * Thread::getLanguages()
199            {
200 chuck 1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");		
201 chuck 1.37     
202            	Thread * curThrd = Thread::getCurrent();
203            	if (curThrd == NULL)
204            		return NULL;
205               	AcceptLanguages * acceptLangs =
206               		 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207            	curThrd->dereference_tsd();
208                PEG_METHOD_EXIT(); 	
209            	return acceptLangs;
210            }
211            
212            void Thread::setLanguages(AcceptLanguages *langs) //l10n
213            {
214 chuck 1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
215 chuck 1.37    		
216               Thread * currentThrd = Thread::getCurrent();
217               if (currentThrd != NULL)
218               {
219               		// deletes the old tsd and creates a new one
220            		currentThrd->put_tsd("acceptLanguages",
221 chuck 1.43 			language_delete, 
222 chuck 1.37 			sizeof(AcceptLanguages *),
223            			langs);   		
224               }
225               
226               PEG_METHOD_EXIT();    		
227            }
228            
229            void Thread::clearLanguages() //l10n
230            {
231 chuck 1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
232 chuck 1.37    	
233               Thread * currentThrd = Thread::getCurrent();
234               if (currentThrd != NULL)
235               {
236               		// deletes the old tsd
237            		currentThrd->delete_tsd("acceptLanguages");   		
238               }
239               
240               PEG_METHOD_EXIT();   		
241            }
242            // l10n end      
243            
244 mday  1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
245            
246            
247            void ThreadPool::kill_idle_threads(void)
248            {
249               static struct timeval now, last = {0, 0};
250               
251               pegasus_gettimeofday(&now);
252               if(now.tv_sec - last.tv_sec > 5)
253               {
254                  _pools.lock();
255                  ThreadPool *p = _pools.next(0);
256                  while(p != 0)
257                  {
258            	 try 
259            	 {
260            	    p->kill_dead_threads();
261            	 }
262            	 catch(...)
263            	 {
264            	 }
265 mday  1.20 	 p = _pools.next(p);
266                  }
267                  _pools.unlock();
268                  pegasus_gettimeofday(&last);
269               }
270            }
271            
272            
273 mike  1.2  ThreadPool::ThreadPool(Sint16 initial_size,
274 kumpf 1.8  		       const Sint8 *key,
275 mike  1.2  		       Sint16 min,
276            		       Sint16 max,
277            		       struct timeval & alloc_wait,
278 chip  1.11 		       struct timeval & dealloc_wait,
279 mike  1.2  		       struct timeval & deadlock_detect)
280               : _max_threads(max), _min_threads(min),
281 mday  1.12      _current_threads(0),
282                 _pool(true), _running(true),
283 mike  1.2       _dead(true), _dying(0)
284            {
285               _allocate_wait.tv_sec = alloc_wait.tv_sec;
286               _allocate_wait.tv_usec = alloc_wait.tv_usec;
287 chip  1.11    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
288 mike  1.2     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
289               _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
290               _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
291               memset(_key, 0x00, 17);
292               if(key != 0)
293                  strncpy(_key, key, 16);
294 mday  1.21    if(_max_threads > 0 && _max_threads < initial_size)
295 mike  1.2        _max_threads = initial_size;
296               if(_min_threads > initial_size)
297                  _min_threads = initial_size;
298 chip  1.11 
299 mike  1.2     int i;
300               for(i = 0; i < initial_size; i++)
301               {
302                  _link_pool(_init_thread());
303               }
304 mday  1.20    _pools.insert_last(this);
305 mike  1.2  }
306            
307 chip  1.11 
308 mike  1.2  
309            ThreadPool::~ThreadPool(void)
310            {
311 mday  1.47    
312 mday  1.35    try 
313 mday  1.47    {      
314                  {
315            	 auto_mutex(&(this->_monitor));
316            	 _dying++;
317                  }
318                  
319 mday  1.35       _pools.remove(this);
320                  Thread *th = 0;
321                  th = _pool.remove_first();
322                  while(th != 0)
323 mike  1.2        {
324 mday  1.35 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
325            	 
326            	 if(sleep_sem == 0)
327            	 {
328            	    th->dereference_tsd();
329            	    throw NullPointer();
330            	 }
331 mday  1.47 
332 s.hills 1.49 	 // Signal to get the thread out of the work loop.
333              	 sleep_sem->signal();
334              	 // Signal to get the thread past the end. See the comment
335              	 // "wait to be awakend by the thread pool destructor"
336              	 // Note: the current implementation of Thread for Windows
337              	 // does not implement "pthread" cancelation points so this
338              	 // is needed.
339 mday    1.35 	 sleep_sem->signal();
340 s.hills 1.49 
341 mike    1.2  	 th->dereference_tsd();
342 mday    1.35 	 // signal the thread's sleep semaphore
343              	 th->cancel();
344              	 th->join();
345              	 th->empty_tsd();
346              	 delete th;
347              	 th = _pool.remove_first();
348 mike    1.2        }
349 mday    1.47 
350                    th = _dead.remove_first();
351 mday    1.35       while(th != 0)
352                    {
353 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
354              	 
355              	 if(sleep_sem == 0)
356              	 {
357              	    th->dereference_tsd();
358              	    throw NullPointer();
359              	 }
360              	 
361              
362              	 sleep_sem->signal();
363              	 th->dereference_tsd();
364              	 
365 mday    1.35 	 // signal the thread's sleep semaphore
366              	 th->cancel();
367              	 th->join();
368              	 th->empty_tsd();
369              	 delete th;
370 mday    1.47 	 th = _dead.remove_first();
371 mday    1.35       }
372 mday    1.47       {
373              	 
374              	 auto_mutex(&(this->_monitor));
375                    th = _running.remove_first();
376 mday    1.35       while(th != 0)
377 mday    1.47       {	 
378 mday    1.35 	 // signal the thread's sleep semaphore
379 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
380              	 if(sleep_sem == 0 )
381              	 {
382              	    th->dereference_tsd();
383              	    throw NullPointer();
384              	 }
385              	 
386              	 sleep_sem->signal();
387              	 th->dereference_tsd();
388              	 
389 mday    1.35 	 th->cancel();
390 mday    1.47 
391              	 // ensure that th->run() has a chance to execute so that the join will not
392              	 // block
393 mday    1.35 	 th->join();
394              	 th->empty_tsd();
395              	 delete th;
396 mday    1.47 	 th = _running.remove_first();
397                    }
398 mday    1.35       }
399 mday    1.47       
400 mike    1.2     }
401 mday    1.47 
402 mday    1.35    catch(...)
403 mike    1.2     {
404                 }
405 mday    1.47 
406 mike    1.2  }
407              
408 chip    1.11 // make this static to the class
409 mike    1.2  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
410              {
411 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
412              
413 mike    1.2     Thread *myself = (Thread *)parm;
414                 if(myself == 0)
415 kumpf   1.14    {
416                    PEG_METHOD_EXIT();
417 mike    1.2        throw NullPointer();
418 kumpf   1.14    }
419 chuck   1.37    
420              // l10n
421                 // Set myself into thread specific storage
422 chuck   1.38    // This will allow code to get its own Thread
423 chuck   1.39    Thread::setCurrent(myself);	
424              
425 mike    1.2     ThreadPool *pool = (ThreadPool *)myself->get_parm();
426 kumpf   1.14    if(pool == 0 ) 
427                 {
428                    PEG_METHOD_EXIT();
429 mike    1.2        throw NullPointer();
430 kumpf   1.14    }
431 mday    1.47 
432 mike    1.5     Semaphore *sleep_sem = 0;
433 mday    1.13    Semaphore *blocking_sem = 0;
434                 
435 mike    1.5     struct timeval *deadlock_timer = 0;
436 mday    1.47    
437 chip    1.11    try
438 mike    1.2     {
439                    sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
440                    myself->dereference_tsd();
441                    deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
442 mday    1.22       myself->dereference_tsd(); 
443 mike    1.2     }
444 mike    1.6     catch(IPCException &)
445 mike    1.2     {
446 kumpf   1.14       PEG_METHOD_EXIT();
447 mday    1.47       return(0);
448 mike    1.2     }
449 mday    1.30    catch(...)
450                 {
451                    PEG_METHOD_EXIT();
452 mday    1.47       return(0);
453 mday    1.30    }
454                 
455 mike    1.2     if(sleep_sem == 0 || deadlock_timer == 0)
456 kumpf   1.14    {
457                    PEG_METHOD_EXIT();
458 mike    1.2        throw NullPointer();
459 kumpf   1.14    }
460 mike    1.2  
461 mday    1.47    while(pool->_dying.value() < 1)
462 mike    1.2     {
463                    sleep_sem->wait();
464 mday    1.35         
465 mike    1.2        // when we awaken we reside on the running queue, not the pool queue
466 chip    1.11 
467 mday    1.47       
468 mike    1.5        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
469                    void *parm = 0;
470 mike    1.2  
471 chip    1.11       try
472 mike    1.2        {
473              	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
474              	    myself->reference_tsd("work func");
475              	 myself->dereference_tsd();
476              	 parm = myself->reference_tsd("work parm");
477              	 myself->dereference_tsd();
478 mday    1.13 	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
479              	 myself->dereference_tsd();
480              
481 mike    1.2        }
482 mike    1.6        catch(IPCException &)
483 mike    1.2        {
484 kumpf   1.14 	 PEG_METHOD_EXIT();
485 mday    1.47 	 return(0);
486 mike    1.2        }
487 chip    1.11 
488 mike    1.2        if(_work == 0)
489 kumpf   1.14       {
490                       PEG_METHOD_EXIT();
491 mike    1.2  	 throw NullPointer();
492 kumpf   1.14       }
493 kumpf   1.24 
494                    if(_work ==
495                       (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
496                    {
497 mday    1.23 	 _work(parm);
498 kumpf   1.24       }
499              
500 mike    1.2        gettimeofday(deadlock_timer, NULL);
501 mday    1.20       try 
502                    {
503 mday    1.47 	 {
504              	    auto_mutex(&(pool->_monitor));
505              	    if(pool->_dying.value())
506              	    {
507              	       break;
508              	    }
509              	 }
510 mday    1.20 	 _work(parm);
511                    }
512                    catch(...)
513                    {
514 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
515 mday    1.20       }
516 chuck   1.37       
517 mday    1.47 
518 mday    1.13       
519 chip    1.11       // put myself back onto the available list
520                    try
521 mike    1.2        {
522 mday    1.47 	 auto_mutex(&(pool->_monitor));
523              	 if(pool->_dying.value() == 0)
524              	 {
525              	    gettimeofday(deadlock_timer, NULL);
526              	    if( blocking_sem != 0 )
527              	       blocking_sem->signal();
528                    
529              	    pool->_running.remove((void *)myself);
530              	    pool->_pool.insert_first(myself);
531              	 }
532              	 else
533              	 {
534              	    PEG_METHOD_EXIT();
535              	    return((PEGASUS_THREAD_RETURN)0);
536              	 }
537 mike    1.2        }
538 mike    1.6        catch(IPCException &)
539 mike    1.2        {
540 kumpf   1.14 	 PEG_METHOD_EXIT();
541 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
542 mike    1.2        }
543 mday    1.51       catch(...)
544                    {
545              	 return((PEGASUS_THREAD_RETURN)0);
546                    }
547                    
548 mike    1.2     }
549 s.hills 1.49 
550                 // TODO: Why is this needed? Why not just continue?
551 mike    1.2     // wait to be awakend by the thread pool destructor
552 mday    1.50    //sleep_sem->wait();
553 s.hills 1.49 
554 mike    1.2     myself->test_cancel();
555 kumpf   1.14 
556                 PEG_METHOD_EXIT();
557 mike    1.2     myself->exit_self(0);
558                 return((PEGASUS_THREAD_RETURN)0);
559              }
560              
561              void ThreadPool::allocate_and_awaken(void *parm,
562              				     PEGASUS_THREAD_RETURN \
563 mday    1.13 				     (PEGASUS_THREAD_CDECL *work)(void *), 
564              				     Semaphore *blocking)
565              
566 mike    1.2     throw(IPCException)
567              {
568 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
569 mike    1.2     struct timeval start;
570                 gettimeofday(&start, NULL);
571 mday    1.47    Thread *th = 0;
572                 
573                 try 
574                 {
575                    auto_mutex(&(this->_monitor));
576                    if(_dying.value())
577                    {
578              	 return;
579                    }
580                    th = _pool.remove_first();
581                 }
582                 catch(...)
583                 {
584                    return;
585                    
586                 }
587                 
588 mday    1.12    
589 mday    1.7     // wait for the right interval and try again
590 mday    1.47    while (th == 0 && _dying.value() < 1)
591 mike    1.2     {
592 mday    1.47       // will throw an IPCException& 
593 mday    1.12       _check_deadlock(&start) ;
594                    
595 mday    1.21       if(_max_threads == 0 || _current_threads < _max_threads)
596 mday    1.35       {
597              	 th = _init_thread();
598              	 continue;
599                    }
600                    pegasus_yield();
601 mday    1.47       try
602                    {
603              	 auto_mutex(&(this->_monitor));
604              	 if(_dying.value())
605              	 {
606              	    return;
607              	 }
608              	 th = _pool.remove_first();
609                    }
610                    catch(...)
611                    {
612              	 return ;
613                    }
614 mday    1.7     }
615 chip    1.11 
616 mday    1.47    if(_dying.value() < 1)
617 mike    1.2     {
618                    // initialize the thread data with the work function and parameters
619 kumpf   1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
620                        "Initializing thread with work function and parameters: parm = %p",
621                        parm);
622              
623 kumpf   1.15       th->delete_tsd("work func");
624 chip    1.11       th->put_tsd("work func", NULL,
625 mike    1.2  		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
626              		  (void *)work);
627 kumpf   1.15       th->delete_tsd("work parm");
628 mike    1.2        th->put_tsd("work parm", NULL, sizeof(void *), parm);
629 kumpf   1.15       th->delete_tsd("blocking sem");
630 mday    1.13       if(blocking != 0 )
631              	 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
632 mday    1.47       try 
633                    {
634              	 auto_mutex(&(this->_monitor));
635              	 if(_dying.value())
636              	 {
637              	    th->cancel();
638              	    th->join();
639              	    delete th;
640              	    return;
641              	 }
642              	 
643              	 // put the thread on the running list
644              
645 mike    1.2  
646 mday    1.47 	 _running.insert_first(th);
647 mike    1.2        // signal the thread's sleep semaphore to awaken it
648 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
649              	 
650              	 if(sleep_sem == 0)
651              	 {
652              	    th->dereference_tsd();
653              	    PEG_METHOD_EXIT();
654              	    throw NullPointer();
655              	 }
656              	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
657              	 sleep_sem->signal();
658              	 th->dereference_tsd();
659                    }
660                    catch(...)
661 mike    1.2        {
662 mday    1.47 	 PEG_METHOD_EXIT();
663              	 return;
664 mike    1.2        }
665 mday    1.47       
666 mike    1.2     }
667                 else
668 mday    1.47    {
669                    th->cancel();
670                    th->join();
671                    delete th;
672                 }
673                 
674 kumpf   1.14    PEG_METHOD_EXIT();
675 mike    1.2  }
676              
677              // caller is responsible for only calling this routine during slack periods
678              // but should call it at least once per _deadlock_detect with the running q
679              // and at least once per _deallocate_wait for the pool q
680              
681 mday    1.12 Uint32 ThreadPool::kill_dead_threads(void)
682 mike    1.2  	 throw(IPCException)
683              {
684                 struct timeval now;
685                 gettimeofday(&now, NULL);
686 mday    1.12    Uint32 bodies = 0;
687                 
688 mike    1.2     // first go thread the dead q and clean it up as much as possible
689 mday    1.47    try 
690                 {
691                    auto_mutex(&(this->_monitor));
692                    if(_dying.value() )
693                    {
694              	 return 0;
695                    }
696                    
697                    while(_dead.count() > 0 && _dying.value() == 0 )
698                    {
699              	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
700              	 Thread *dead = _dead.remove_first();
701              	 
702              	 if(dead == 0)
703              	    throw NullPointer();
704              	 dead->join();
705              	 delete dead;
706                    }
707                 }
708                 catch(...)
709 mike    1.2     {
710                 }
711 mday    1.47    
712                 
713 chip    1.11    DQueue<Thread> * map[2] =
714 mike    1.2        {
715              	 &_pool, &_running
716                    };
717 chip    1.11 
718              
719 mike    1.2     DQueue<Thread> *q = 0;
720                 int i = 0;
721                 AtomicInt needed(0);
722 chip    1.11 
723 kumpf   1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
724                 // This change prevents the thread pool from killing "hung" threads.
725                 // The definition of a "hung" thread is one that has been on the run queue
726                 // for longer than the time interval set when the thread pool was created.
727                 // Cancelling "hung" threads has proven to be problematic.
728              
729                 // With this change the thread pool will not cancel "hung" threads.  This
730                 // may prevent a crash depending upon the state of the "hung" thread.  In
731                 // the case that the thread is actually hung, this change causes the
732                 // thread resources not to be reclaimed.
733              
734                 // Idle threads, those that have not executed a routine for a time
735                 // interval, continue to be destroyed.  This is normal and should not
736                 // cause any problems.
737                 for( ; i < 1; i++)
738              #else
739 mday    1.30    for( ; i < 2; i++)
740 kumpf   1.31 #endif
741 mday    1.47    {
742                    auto_mutex(&(this->_monitor)); 
743 mday    1.21       q = map[i];
744 mike    1.2        if(q->count() > 0 )
745                    {
746 chip    1.11 	 try
747 mike    1.2  	 {
748 mday    1.47 	    if(_dying.value())
749              	    {
750              	       return bodies;
751              	    }
752              	    
753 mike    1.2  	    q->try_lock();
754              	 }
755 mday    1.18 	 catch(...)
756 mike    1.2  	 {
757 mday    1.18 	    return bodies;
758 mike    1.2  	 }
759              
760              	 struct timeval dt = { 0, 0 };
761              	 struct timeval *dtp;
762              	 Thread *th = 0;
763              	 th = q->next(th);
764              	 while (th != 0 )
765              	 {
766 chip    1.11 	    try
767 mike    1.2  	    {
768              	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
769              	    }
770 mday    1.18 	    catch(...)
771 mike    1.2  	    {
772 kumpf   1.25 	       q->unlock();
773 mday    1.18 	       return bodies;
774 mike    1.2  	    }
775 chip    1.11 	
776 mike    1.2  	    if(dtp != 0)
777              	    {
778              	       memcpy(&dt, dtp, sizeof(struct timeval));
779              	    }
780              	    th->dereference_tsd();
781              	    struct timeval deadlock_timeout;
782 mday    1.18 	    Boolean too_long;
783              	    if( i == 0)
784              	    {
785              	       too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
786              	    }
787              	    else 
788              	    {
789 mday    1.22 	       too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
790 mday    1.18 	    }
791              	    
792              	    if( true == too_long)
793 mike    1.2  	    {
794              	       // if we are deallocating from the pool, escape if we are
795 chip    1.11 	       // down to the minimum thread count
796 mday    1.13 	       _current_threads--;
797 mday    1.18 	       if( _current_threads.value() < (Uint32)_min_threads )
798 mike    1.2  	       {
799 mday    1.13 		  if( i == 0)
800 mike    1.2  		  {
801 mday    1.13 		     _current_threads++;
802 mike    1.2  		     th = q->next(th);
803              		     continue;
804              		  }
805 chip    1.11 		  else
806 mike    1.2  		  {
807 chip    1.11 		     // we are killing a hung thread and we will drop below the
808 mike    1.2  		     // minimum. create another thread to make up for the one
809              		     // we are about to kill
810              		     needed++;
811              		  }
812              	       }
813 chip    1.11 	
814 mike    1.2  	       th = q->remove_no_lock((void *)th);
815 chip    1.11 	
816 mike    1.2  	       if(th != 0)
817              	       {
818 mday    1.30 		  if( i == 0 )
819 mike    1.2  		  {
820 mday    1.30 		     th->delete_tsd("work func");
821              		     th->put_tsd("work func", NULL,
822              				 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
823              				 (void *)&_undertaker);
824              		     th->delete_tsd("work parm");
825              		     th->put_tsd("work parm", NULL, sizeof(void *), th);
826              		     
827              		     // signal the thread's sleep semaphore to awaken it
828              		     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
829              		     
830              		     if(sleep_sem == 0)
831              		     {
832              			q->unlock();
833              			th->dereference_tsd();
834              			throw NullPointer();
835              		     }
836              		     
837              		     bodies++;
838 mike    1.2  		     th->dereference_tsd();
839 mday    1.30 		     _dead.insert_first(th);
840              		     sleep_sem->signal();
841              		     th = 0;
842              		  }
843              		  else 
844              		  {
845              		     // deadlocked threads
846 mday    1.34 		     Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
847 mday    1.30 		     th->cancel();
848              		     delete th;
849 mike    1.2  		  }
850              	       }
851              	    }
852              	    th = q->next(th);
853 mday    1.20 	    pegasus_sleep(1);
854 mike    1.2  	 }
855              	 q->unlock();
856                    }
857                 }
858 mday    1.47    if(_dying.value() )
859                    return bodies;
860                 
861                 while (needed.value() > 0)   {
862                    _link_pool(_init_thread());
863                    needed--;
864                    pegasus_sleep(0);
865                 }
866 mday    1.18     return bodies; 
867 mike    1.2  }
868              
869 mday    1.12 
870 mike    1.2  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
871              {
872 mday    1.22    // never time out if the interval is zero
873                 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
874                    return false;
875                 
876 mday    1.36    struct timeval now, finish, remaining ;
877 mday    1.13    Uint32 usec;
878 mday    1.33    pegasus_gettimeofday(&now);
879 mday    1.36    /* remove valgrind error */
880                 pegasus_gettimeofday(&remaining);
881                 
882 mday    1.13 
883                 finish.tv_sec = start->tv_sec + interval->tv_sec;
884                 usec = start->tv_usec + interval->tv_usec;
885                 finish.tv_sec += (usec / 1000000);
886                 usec %= 1000000;
887                 finish.tv_usec = usec;
888                  
889                 if ( timeval_subtract(&remaining, &finish, &now) )
890 mike    1.2        return true;
891                 else
892                    return false;
893              }
894              
895              PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
896              {
897 mday    1.30    exit_thread((PEGASUS_THREAD_RETURN)1);
898                 return (PEGASUS_THREAD_RETURN)1;
899 mike    1.2  }
900 mday    1.19 
901              
902               void ThreadPool::_sleep_sem_del(void *p)
903              {
904                 if(p != 0)
905                 {
906                    delete (Semaphore *)p;
907                 }
908              }
909              
910               void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
911              {
912                 if (true == check_time(start, &_deadlock_detect))
913                    throw Deadlock(pegasus_thread_self());
914                 return;
915              }
916              
917              
918               Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
919              {
920                 return(check_time(start, &_deadlock_detect));
921 mday    1.19 }
922              
923               Boolean ThreadPool::_check_dealloc(struct timeval *start)
924              {
925                 return(check_time(start, &_deallocate_wait));
926              }
927              
928               Thread *ThreadPool::_init_thread(void) throw(IPCException)
929              {
930                 Thread *th = (Thread *) new Thread(_loop, this, false);
931                 // allocate a sleep semaphore and pass it in the thread context
932                 // initial count is zero, loop function will sleep until
933                 // we signal the semaphore
934                 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
935                 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
936                 
937                 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
938 mday    1.35    pegasus_gettimeofday(dldt);
939                 
940 mday    1.19    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
941                 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
942 chuck   1.37   
943 mday    1.19    th->run();
944                 _current_threads++;
945                 pegasus_yield();
946                 
947                 return th;
948              }
949              
950               void ThreadPool::_link_pool(Thread *th) throw(IPCException)
951              {
952                 if(th == 0)
953                    throw NullPointer();
954 mday    1.47    try 
955                 {
956                    
957                    auto_mutex(&(this->_monitor));
958                    if(_dying.value())
959                    {
960              	 th->cancel();
961              	 th->join();
962              	 delete th;
963                    }
964                    
965                    _pool.insert_first(th);
966                    
967                 }
968                 catch(...)
969                 {
970                 }
971 mday    1.19 }
972 mike    1.2  
973              
974              PEGASUS_NAMESPACE_END
975              

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2