(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 chip  1.11 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12 kumpf 1.17 // 
 13 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Day (mdday@us.ibm.com)
 25            //
 26            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 27 chip  1.11 //              added nsk platform support
 28 mike  1.2  //
 29            //%/////////////////////////////////////////////////////////////////////////////
 30            
 31            #include "Thread.h"
 32            #include <Pegasus/Common/IPC.h>
 33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
 34 mike  1.2  
 35            #if defined(PEGASUS_OS_TYPE_WINDOWS)
 36 chip  1.11 # include "ThreadWindows.cpp"
 37 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 38            # include "ThreadUnix.cpp"
 39            #elif defined(PEGASUS_OS_TYPE_NSK)
 40            # include "ThreadNsk.cpp"
 41            #else
 42            # error "Unsupported platform"
 43            #endif
 44            
 45            PEGASUS_NAMESPACE_BEGIN
 46            
 47 mday  1.42 
 48 chip  1.11 void thread_data::default_delete(void * data)
 49            {
 50 mike  1.2     if( data != NULL)
 51 chip  1.11       ::operator delete(data);
 52 mike  1.2  }
 53            
 54 chuck 1.43 // l10n start
 55            void language_delete(void * data)
 56            {
 57               if( data != NULL)
 58               {
 59                  AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
 60 chuck 1.44       delete al;
 61 chuck 1.43    }
 62            }
 63            // l10n end
 64            
 65 mike  1.2  Boolean Thread::_signals_blocked = false;
 66 chuck 1.37 // l10n
 67 mday  1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
 68 chuck 1.37 Boolean Thread::_key_initialized = false;
 69 chuck 1.41 Boolean Thread::_key_error = false;
 70 chuck 1.37 
 71 mike  1.2  
 72            // for non-native implementations
 73 chip  1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 74 mike  1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 75            {
 76                cleanup_handler *cu = new cleanup_handler(routine, parm);
 77 chip  1.11     try
 78                {
 79            	_cleanup.insert_first(cu);
 80                }
 81                catch(IPCException&)
 82 mike  1.2      {
 83            	delete cu;
 84 chip  1.11 	throw;
 85 mike  1.2      }
 86                return;
 87            }
 88 chip  1.11 	
 89 mike  1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 90            {
 91                cleanup_handler *cu ;
 92 chip  1.11     try
 93                {
 94 mike  1.2  	cu = _cleanup.remove_first() ;
 95                }
 96 chip  1.11     catch(IPCException&)
 97 mike  1.2      {
 98 chip  1.11 	PEGASUS_ASSERT(0);
 99 mike  1.2       }
100                if(execute == true)
101            	cu->execute();
102                delete cu;
103            }
104 chip  1.11 		
105 mike  1.2  #endif
106            
107            
108 kumpf 1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
109 mike  1.2  
110            
111 chip  1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112            void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113            {
114                // execute the cleanup stack and then return
115 mike  1.2     while( _cleanup.count() )
116               {
117 chip  1.11        try
118                   {
119            	   cleanup_pop(true);
120                   }
121                   catch(IPCException&)
122                   {
123            	  PEGASUS_ASSERT(0);
124            	  break;
125 mike  1.2         }
126               }
127               _exit_code = exit_code;
128               exit_thread(exit_code);
129 mday  1.4     _handle.thid = 0;
130 mike  1.2  }
131            
132            
133            #endif
134            
135 chuck 1.37 // l10n start
136 chuck 1.39 Sint8 Thread::initializeKey()
137            {
138               PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139               if (!Thread::_key_initialized)
140               {
141 chuck 1.41 	if (Thread::_key_error)
142            	{
143                   		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144            	        	  "Thread: ERROR - thread key error"); 
145            		return -1;
146            	}
147            
148 chuck 1.39 	if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149            	{
150                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151            	        	  "Thread: able to create a thread key");   
152            	   	Thread::_key_initialized = true;	
153            	}
154            	else
155            	{
156                   		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157            	        	  "Thread: ERROR - unable to create a thread key"); 
158 chuck 1.41 	   	Thread::_key_error = true;
159 chuck 1.39 		return -1;
160            	}
161               }
162            
163               PEG_METHOD_EXIT();
164               return 0;  
165            }
166            
167 chuck 1.37 Thread * Thread::getCurrent()
168            {
169 chuck 1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");	
170 chuck 1.40     if (Thread::initializeKey() != 0)
171 chuck 1.39     {
172            	return NULL;  
173                }
174 chuck 1.38     PEG_METHOD_EXIT();  
175 chuck 1.39     return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
176            }
177            
178            void Thread::setCurrent(Thread * thrd)
179            {
180               PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181               if (Thread::initializeKey() == 0)
182               {
183               	if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184            								 (void *) thrd) == 0)
185                    {
186                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187            	        	  "Successful set Thread * into thread specific storage");   
188                    }
189                    else
190                    {
191                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192            	        	  "ERROR: got error setting Thread * into thread specific storage");   
193                    }
194               }
195               PEG_METHOD_EXIT();  
196 chuck 1.37 }
197            
198            AcceptLanguages * Thread::getLanguages()
199            {
200 chuck 1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");		
201 chuck 1.37     
202            	Thread * curThrd = Thread::getCurrent();
203            	if (curThrd == NULL)
204            		return NULL;
205               	AcceptLanguages * acceptLangs =
206               		 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207            	curThrd->dereference_tsd();
208                PEG_METHOD_EXIT(); 	
209            	return acceptLangs;
210            }
211            
212            void Thread::setLanguages(AcceptLanguages *langs) //l10n
213            {
214 chuck 1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
215 chuck 1.37    		
216               Thread * currentThrd = Thread::getCurrent();
217               if (currentThrd != NULL)
218               {
219               		// deletes the old tsd and creates a new one
220            		currentThrd->put_tsd("acceptLanguages",
221 chuck 1.43 			language_delete, 
222 chuck 1.37 			sizeof(AcceptLanguages *),
223            			langs);   		
224               }
225               
226               PEG_METHOD_EXIT();    		
227            }
228            
229            void Thread::clearLanguages() //l10n
230            {
231 chuck 1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
232 chuck 1.37    	
233               Thread * currentThrd = Thread::getCurrent();
234               if (currentThrd != NULL)
235               {
236               		// deletes the old tsd
237            		currentThrd->delete_tsd("acceptLanguages");   		
238               }
239               
240               PEG_METHOD_EXIT();   		
241            }
242            // l10n end      
243            
244 mday  1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
245            
246            
247            void ThreadPool::kill_idle_threads(void)
248            {
249               static struct timeval now, last = {0, 0};
250               
251               pegasus_gettimeofday(&now);
252               if(now.tv_sec - last.tv_sec > 5)
253               {
254                  _pools.lock();
255                  ThreadPool *p = _pools.next(0);
256                  while(p != 0)
257                  {
258            	 try 
259            	 {
260            	    p->kill_dead_threads();
261            	 }
262            	 catch(...)
263            	 {
264            	 }
265 mday  1.20 	 p = _pools.next(p);
266                  }
267                  _pools.unlock();
268                  pegasus_gettimeofday(&last);
269               }
270            }
271            
272            
273 mike  1.2  ThreadPool::ThreadPool(Sint16 initial_size,
274 kumpf 1.8  		       const Sint8 *key,
275 mike  1.2  		       Sint16 min,
276            		       Sint16 max,
277            		       struct timeval & alloc_wait,
278 chip  1.11 		       struct timeval & dealloc_wait,
279 mike  1.2  		       struct timeval & deadlock_detect)
280               : _max_threads(max), _min_threads(min),
281 mday  1.12      _current_threads(0),
282                 _pool(true), _running(true),
283 mike  1.2       _dead(true), _dying(0)
284            {
285               _allocate_wait.tv_sec = alloc_wait.tv_sec;
286               _allocate_wait.tv_usec = alloc_wait.tv_usec;
287 chip  1.11    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
288 mike  1.2     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
289               _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
290               _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
291               memset(_key, 0x00, 17);
292               if(key != 0)
293                  strncpy(_key, key, 16);
294 mday  1.21    if(_max_threads > 0 && _max_threads < initial_size)
295 mike  1.2        _max_threads = initial_size;
296               if(_min_threads > initial_size)
297                  _min_threads = initial_size;
298 chip  1.11 
299 mike  1.2     int i;
300               for(i = 0; i < initial_size; i++)
301               {
302                  _link_pool(_init_thread());
303               }
304 mday  1.20    _pools.insert_last(this);
305 mike  1.2  }
306            
307 chip  1.11 
308 mike  1.2  
309            ThreadPool::~ThreadPool(void)
310            {
311 mday  1.47    
312 mday  1.35    try 
313 mday  1.47    {      
314                  {
315            	 auto_mutex(&(this->_monitor));
316            	 _dying++;
317                  }
318                  
319 mday  1.35       _pools.remove(this);
320                  Thread *th = 0;
321                  th = _pool.remove_first();
322                  while(th != 0)
323 mike  1.2        {
324 mday  1.35 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
325            	 
326            	 if(sleep_sem == 0)
327            	 {
328            	    th->dereference_tsd();
329            	    throw NullPointer();
330            	 }
331 mday  1.47 
332 s.hills 1.49 	 // Signal to get the thread out of the work loop.
333              	 sleep_sem->signal();
334              	 // Signal to get the thread past the end. See the comment
335              	 // "wait to be awakend by the thread pool destructor"
336              	 // Note: the current implementation of Thread for Windows
337              	 // does not implement "pthread" cancelation points so this
338              	 // is needed.
339 mday    1.35 	 sleep_sem->signal();
340 s.hills 1.49 
341 mike    1.2  	 th->dereference_tsd();
342 mday    1.35 	 // signal the thread's sleep semaphore
343              	 th->cancel();
344              	 th->join();
345              	 th->empty_tsd();
346              	 delete th;
347              	 th = _pool.remove_first();
348 mike    1.2        }
349 mday    1.47 
350                    th = _dead.remove_first();
351 mday    1.35       while(th != 0)
352                    {
353 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
354              	 
355              	 if(sleep_sem == 0)
356              	 {
357              	    th->dereference_tsd();
358              	    throw NullPointer();
359              	 }
360              	 
361              
362              	 sleep_sem->signal();
363              	 th->dereference_tsd();
364              	 
365 mday    1.35 	 // signal the thread's sleep semaphore
366              	 th->cancel();
367              	 th->join();
368              	 th->empty_tsd();
369              	 delete th;
370 mday    1.47 	 th = _dead.remove_first();
371 mday    1.35       }
372 mday    1.47       {
373              	 
374              	 auto_mutex(&(this->_monitor));
375                    th = _running.remove_first();
376 mday    1.35       while(th != 0)
377 mday    1.47       {	 
378 mday    1.35 	 // signal the thread's sleep semaphore
379 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
380              	 if(sleep_sem == 0 )
381              	 {
382              	    th->dereference_tsd();
383              	    throw NullPointer();
384              	 }
385              	 
386              	 sleep_sem->signal();
387              	 th->dereference_tsd();
388              	 
389 mday    1.35 	 th->cancel();
390 mday    1.47 
391              	 // ensure that th->run() has a chance to execute so that the join will not
392              	 // block
393 mday    1.35 	 th->join();
394              	 th->empty_tsd();
395              	 delete th;
396 mday    1.47 	 th = _running.remove_first();
397                    }
398 mday    1.35       }
399 mday    1.47       
400 mike    1.2     }
401 mday    1.47 
402 mday    1.35    catch(...)
403 mike    1.2     {
404                 }
405 mday    1.47 
406 mike    1.2  }
407              
408 chip    1.11 // make this static to the class
409 mike    1.2  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
410              {
411 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
412              
413 mike    1.2     Thread *myself = (Thread *)parm;
414                 if(myself == 0)
415 kumpf   1.14    {
416                    PEG_METHOD_EXIT();
417 mike    1.2        throw NullPointer();
418 kumpf   1.14    }
419 chuck   1.37    
420              // l10n
421                 // Set myself into thread specific storage
422 chuck   1.38    // This will allow code to get its own Thread
423 chuck   1.39    Thread::setCurrent(myself);	
424              
425 mike    1.2     ThreadPool *pool = (ThreadPool *)myself->get_parm();
426 kumpf   1.14    if(pool == 0 ) 
427                 {
428                    PEG_METHOD_EXIT();
429 mike    1.2        throw NullPointer();
430 kumpf   1.14    }
431 mday    1.47 
432 mike    1.5     Semaphore *sleep_sem = 0;
433 mday    1.13    Semaphore *blocking_sem = 0;
434                 
435 mike    1.5     struct timeval *deadlock_timer = 0;
436 mday    1.47    
437 chip    1.11    try
438 mike    1.2     {
439                    sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
440                    myself->dereference_tsd();
441                    deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
442 mday    1.22       myself->dereference_tsd(); 
443 mike    1.2     }
444 mike    1.6     catch(IPCException &)
445 mike    1.2     {
446 kumpf   1.14       PEG_METHOD_EXIT();
447 mday    1.47       return(0);
448 mike    1.2     }
449 mday    1.30    catch(...)
450                 {
451                    PEG_METHOD_EXIT();
452 mday    1.47       return(0);
453 mday    1.30    }
454                 
455 mike    1.2     if(sleep_sem == 0 || deadlock_timer == 0)
456 kumpf   1.14    {
457                    PEG_METHOD_EXIT();
458 mike    1.2        throw NullPointer();
459 kumpf   1.14    }
460 mike    1.2  
461 mday    1.47    while(pool->_dying.value() < 1)
462 mike    1.2     {
463                    sleep_sem->wait();
464 mday    1.35         
465 mike    1.2        // when we awaken we reside on the running queue, not the pool queue
466 chip    1.11 
467 mday    1.47       
468 mike    1.5        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
469                    void *parm = 0;
470 mike    1.2  
471 chip    1.11       try
472 mike    1.2        {
473              	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
474              	    myself->reference_tsd("work func");
475              	 myself->dereference_tsd();
476              	 parm = myself->reference_tsd("work parm");
477              	 myself->dereference_tsd();
478 mday    1.13 	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
479              	 myself->dereference_tsd();
480              
481 mike    1.2        }
482 mike    1.6        catch(IPCException &)
483 mike    1.2        {
484 kumpf   1.14 	 PEG_METHOD_EXIT();
485 mday    1.47 	 return(0);
486 mike    1.2        }
487 chip    1.11 
488 mike    1.2        if(_work == 0)
489 kumpf   1.14       {
490                       PEG_METHOD_EXIT();
491 mike    1.2  	 throw NullPointer();
492 kumpf   1.14       }
493 kumpf   1.24 
494                    if(_work ==
495                       (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
496                    {
497 mday    1.23 	 _work(parm);
498 kumpf   1.24       }
499              
500 mike    1.2        gettimeofday(deadlock_timer, NULL);
501 mday    1.20       try 
502                    {
503 mday    1.47 	 {
504              	    auto_mutex(&(pool->_monitor));
505              	    if(pool->_dying.value())
506              	    {
507              	       break;
508              	    }
509              	 }
510 mday    1.20 	 _work(parm);
511                    }
512                    catch(...)
513                    {
514 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
515 mday    1.20       }
516 chuck   1.37       
517 mday    1.47 
518 mday    1.13       
519 chip    1.11       // put myself back onto the available list
520                    try
521 mike    1.2        {
522 mday    1.47 	 auto_mutex(&(pool->_monitor));
523              	 if(pool->_dying.value() == 0)
524              	 {
525              	    gettimeofday(deadlock_timer, NULL);
526              	    if( blocking_sem != 0 )
527              	       blocking_sem->signal();
528                    
529              	    pool->_running.remove((void *)myself);
530              	    pool->_pool.insert_first(myself);
531              	 }
532              	 else
533              	 {
534              	    PEG_METHOD_EXIT();
535              	    return((PEGASUS_THREAD_RETURN)0);
536              	 }
537 mike    1.2        }
538 mike    1.6        catch(IPCException &)
539 mike    1.2        {
540 kumpf   1.14 	 PEG_METHOD_EXIT();
541 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
542 mike    1.2        }
543                 }
544 s.hills 1.49 
545                 // TODO: Why is this needed? Why not just continue?
546 mike    1.2     // wait to be awakend by the thread pool destructor
547 mday    1.50    //sleep_sem->wait();
548 s.hills 1.49 
549 mike    1.2     myself->test_cancel();
550 kumpf   1.14 
551                 PEG_METHOD_EXIT();
552 mike    1.2     myself->exit_self(0);
553                 return((PEGASUS_THREAD_RETURN)0);
554              }
555              
556              void ThreadPool::allocate_and_awaken(void *parm,
557              				     PEGASUS_THREAD_RETURN \
558 mday    1.13 				     (PEGASUS_THREAD_CDECL *work)(void *), 
559              				     Semaphore *blocking)
560              
561 mike    1.2     throw(IPCException)
562              {
563 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
564 mike    1.2     struct timeval start;
565                 gettimeofday(&start, NULL);
566 mday    1.47    Thread *th = 0;
567                 
568                 try 
569                 {
570                    auto_mutex(&(this->_monitor));
571                    if(_dying.value())
572                    {
573              	 return;
574                    }
575                    th = _pool.remove_first();
576                 }
577                 catch(...)
578                 {
579                    return;
580                    
581                 }
582                 
583 mday    1.12    
584 mday    1.7     // wait for the right interval and try again
585 mday    1.47    while (th == 0 && _dying.value() < 1)
586 mike    1.2     {
587 mday    1.47       // will throw an IPCException& 
588 mday    1.12       _check_deadlock(&start) ;
589                    
590 mday    1.21       if(_max_threads == 0 || _current_threads < _max_threads)
591 mday    1.35       {
592              	 th = _init_thread();
593              	 continue;
594                    }
595                    pegasus_yield();
596 mday    1.47       try
597                    {
598              	 auto_mutex(&(this->_monitor));
599              	 if(_dying.value())
600              	 {
601              	    return;
602              	 }
603              	 th = _pool.remove_first();
604                    }
605                    catch(...)
606                    {
607              	 return ;
608                    }
609 mday    1.7     }
610 chip    1.11 
611 mday    1.47    if(_dying.value() < 1)
612 mike    1.2     {
613                    // initialize the thread data with the work function and parameters
614 kumpf   1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
615                        "Initializing thread with work function and parameters: parm = %p",
616                        parm);
617              
618 kumpf   1.15       th->delete_tsd("work func");
619 chip    1.11       th->put_tsd("work func", NULL,
620 mike    1.2  		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
621              		  (void *)work);
622 kumpf   1.15       th->delete_tsd("work parm");
623 mike    1.2        th->put_tsd("work parm", NULL, sizeof(void *), parm);
624 kumpf   1.15       th->delete_tsd("blocking sem");
625 mday    1.13       if(blocking != 0 )
626              	 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
627 mday    1.47       try 
628                    {
629              	 auto_mutex(&(this->_monitor));
630              	 if(_dying.value())
631              	 {
632              	    th->cancel();
633              	    th->join();
634              	    delete th;
635              	    return;
636              	 }
637              	 
638              	 // put the thread on the running list
639              
640 mike    1.2  
641 mday    1.47 	 _running.insert_first(th);
642 mike    1.2        // signal the thread's sleep semaphore to awaken it
643 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
644              	 
645              	 if(sleep_sem == 0)
646              	 {
647              	    th->dereference_tsd();
648              	    PEG_METHOD_EXIT();
649              	    throw NullPointer();
650              	 }
651              	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
652              	 sleep_sem->signal();
653              	 th->dereference_tsd();
654                    }
655                    catch(...)
656 mike    1.2        {
657 mday    1.47 	 PEG_METHOD_EXIT();
658              	 return;
659 mike    1.2        }
660 mday    1.47       
661 mike    1.2     }
662                 else
663 mday    1.47    {
664                    th->cancel();
665                    th->join();
666                    delete th;
667                 }
668                 
669 kumpf   1.14    PEG_METHOD_EXIT();
670 mike    1.2  }
671              
672              // caller is responsible for only calling this routine during slack periods
673              // but should call it at least once per _deadlock_detect with the running q
674              // and at least once per _deallocate_wait for the pool q
675              
676 mday    1.12 Uint32 ThreadPool::kill_dead_threads(void)
677 mike    1.2  	 throw(IPCException)
678              {
679                 struct timeval now;
680                 gettimeofday(&now, NULL);
681 mday    1.12    Uint32 bodies = 0;
682                 
683 mike    1.2     // first go thread the dead q and clean it up as much as possible
684 mday    1.47    try 
685                 {
686                    auto_mutex(&(this->_monitor));
687                    if(_dying.value() )
688                    {
689              	 return 0;
690                    }
691                    
692                    while(_dead.count() > 0 && _dying.value() == 0 )
693                    {
694              	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
695              	 Thread *dead = _dead.remove_first();
696              	 
697              	 if(dead == 0)
698              	    throw NullPointer();
699              	 dead->join();
700              	 delete dead;
701                    }
702                 }
703                 catch(...)
704 mike    1.2     {
705                 }
706 mday    1.47    
707                 
708 chip    1.11    DQueue<Thread> * map[2] =
709 mike    1.2        {
710              	 &_pool, &_running
711                    };
712 chip    1.11 
713              
714 mike    1.2     DQueue<Thread> *q = 0;
715                 int i = 0;
716                 AtomicInt needed(0);
717 chip    1.11 
718 kumpf   1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
719                 // This change prevents the thread pool from killing "hung" threads.
720                 // The definition of a "hung" thread is one that has been on the run queue
721                 // for longer than the time interval set when the thread pool was created.
722                 // Cancelling "hung" threads has proven to be problematic.
723              
724                 // With this change the thread pool will not cancel "hung" threads.  This
725                 // may prevent a crash depending upon the state of the "hung" thread.  In
726                 // the case that the thread is actually hung, this change causes the
727                 // thread resources not to be reclaimed.
728              
729                 // Idle threads, those that have not executed a routine for a time
730                 // interval, continue to be destroyed.  This is normal and should not
731                 // cause any problems.
732                 for( ; i < 1; i++)
733              #else
734 mday    1.30    for( ; i < 2; i++)
735 kumpf   1.31 #endif
736 mday    1.47    {
737                    auto_mutex(&(this->_monitor)); 
738 mday    1.21       q = map[i];
739 mike    1.2        if(q->count() > 0 )
740                    {
741 chip    1.11 	 try
742 mike    1.2  	 {
743 mday    1.47 	    if(_dying.value())
744              	    {
745              	       return bodies;
746              	    }
747              	    
748 mike    1.2  	    q->try_lock();
749              	 }
750 mday    1.18 	 catch(...)
751 mike    1.2  	 {
752 mday    1.18 	    return bodies;
753 mike    1.2  	 }
754              
755              	 struct timeval dt = { 0, 0 };
756              	 struct timeval *dtp;
757              	 Thread *th = 0;
758              	 th = q->next(th);
759              	 while (th != 0 )
760              	 {
761 chip    1.11 	    try
762 mike    1.2  	    {
763              	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
764              	    }
765 mday    1.18 	    catch(...)
766 mike    1.2  	    {
767 kumpf   1.25 	       q->unlock();
768 mday    1.18 	       return bodies;
769 mike    1.2  	    }
770 chip    1.11 	
771 mike    1.2  	    if(dtp != 0)
772              	    {
773              	       memcpy(&dt, dtp, sizeof(struct timeval));
774              	    }
775              	    th->dereference_tsd();
776              	    struct timeval deadlock_timeout;
777 mday    1.18 	    Boolean too_long;
778              	    if( i == 0)
779              	    {
780              	       too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
781              	    }
782              	    else 
783              	    {
784 mday    1.22 	       too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
785 mday    1.18 	    }
786              	    
787              	    if( true == too_long)
788 mike    1.2  	    {
789              	       // if we are deallocating from the pool, escape if we are
790 chip    1.11 	       // down to the minimum thread count
791 mday    1.13 	       _current_threads--;
792 mday    1.18 	       if( _current_threads.value() < (Uint32)_min_threads )
793 mike    1.2  	       {
794 mday    1.13 		  if( i == 0)
795 mike    1.2  		  {
796 mday    1.13 		     _current_threads++;
797 mike    1.2  		     th = q->next(th);
798              		     continue;
799              		  }
800 chip    1.11 		  else
801 mike    1.2  		  {
802 chip    1.11 		     // we are killing a hung thread and we will drop below the
803 mike    1.2  		     // minimum. create another thread to make up for the one
804              		     // we are about to kill
805              		     needed++;
806              		  }
807              	       }
808 chip    1.11 	
809 mike    1.2  	       th = q->remove_no_lock((void *)th);
810 chip    1.11 	
811 mike    1.2  	       if(th != 0)
812              	       {
813 mday    1.30 		  if( i == 0 )
814 mike    1.2  		  {
815 mday    1.30 		     th->delete_tsd("work func");
816              		     th->put_tsd("work func", NULL,
817              				 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
818              				 (void *)&_undertaker);
819              		     th->delete_tsd("work parm");
820              		     th->put_tsd("work parm", NULL, sizeof(void *), th);
821              		     
822              		     // signal the thread's sleep semaphore to awaken it
823              		     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
824              		     
825              		     if(sleep_sem == 0)
826              		     {
827              			q->unlock();
828              			th->dereference_tsd();
829              			throw NullPointer();
830              		     }
831              		     
832              		     bodies++;
833 mike    1.2  		     th->dereference_tsd();
834 mday    1.30 		     _dead.insert_first(th);
835              		     sleep_sem->signal();
836              		     th = 0;
837              		  }
838              		  else 
839              		  {
840              		     // deadlocked threads
841 mday    1.34 		     Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
842 mday    1.30 		     th->cancel();
843              		     delete th;
844 mike    1.2  		  }
845              	       }
846              	    }
847              	    th = q->next(th);
848 mday    1.20 	    pegasus_sleep(1);
849 mike    1.2  	 }
850              	 q->unlock();
851                    }
852                 }
853 mday    1.47    if(_dying.value() )
854                    return bodies;
855                 
856                 while (needed.value() > 0)   {
857                    _link_pool(_init_thread());
858                    needed--;
859                    pegasus_sleep(0);
860                 }
861 mday    1.18     return bodies; 
862 mike    1.2  }
863              
864 mday    1.12 
865 mike    1.2  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
866              {
867 mday    1.22    // never time out if the interval is zero
868                 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
869                    return false;
870                 
871 mday    1.36    struct timeval now, finish, remaining ;
872 mday    1.13    Uint32 usec;
873 mday    1.33    pegasus_gettimeofday(&now);
874 mday    1.36    /* remove valgrind error */
875                 pegasus_gettimeofday(&remaining);
876                 
877 mday    1.13 
878                 finish.tv_sec = start->tv_sec + interval->tv_sec;
879                 usec = start->tv_usec + interval->tv_usec;
880                 finish.tv_sec += (usec / 1000000);
881                 usec %= 1000000;
882                 finish.tv_usec = usec;
883                  
884                 if ( timeval_subtract(&remaining, &finish, &now) )
885 mike    1.2        return true;
886                 else
887                    return false;
888              }
889              
890              PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
891              {
892 mday    1.30    exit_thread((PEGASUS_THREAD_RETURN)1);
893                 return (PEGASUS_THREAD_RETURN)1;
894 mike    1.2  }
895 mday    1.19 
896              
897               void ThreadPool::_sleep_sem_del(void *p)
898              {
899                 if(p != 0)
900                 {
901                    delete (Semaphore *)p;
902                 }
903              }
904              
905               void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
906              {
907                 if (true == check_time(start, &_deadlock_detect))
908                    throw Deadlock(pegasus_thread_self());
909                 return;
910              }
911              
912              
913               Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
914              {
915                 return(check_time(start, &_deadlock_detect));
916 mday    1.19 }
917              
918               Boolean ThreadPool::_check_dealloc(struct timeval *start)
919              {
920                 return(check_time(start, &_deallocate_wait));
921              }
922              
923               Thread *ThreadPool::_init_thread(void) throw(IPCException)
924              {
925                 Thread *th = (Thread *) new Thread(_loop, this, false);
926                 // allocate a sleep semaphore and pass it in the thread context
927                 // initial count is zero, loop function will sleep until
928                 // we signal the semaphore
929                 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
930                 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
931                 
932                 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
933 mday    1.35    pegasus_gettimeofday(dldt);
934                 
935 mday    1.19    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
936                 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
937 chuck   1.37   
938 mday    1.19    th->run();
939                 _current_threads++;
940                 pegasus_yield();
941                 
942                 return th;
943              }
944              
945               void ThreadPool::_link_pool(Thread *th) throw(IPCException)
946              {
947                 if(th == 0)
948                    throw NullPointer();
949 mday    1.47    try 
950                 {
951                    
952                    auto_mutex(&(this->_monitor));
953                    if(_dying.value())
954                    {
955              	 th->cancel();
956              	 th->join();
957              	 delete th;
958                    }
959                    
960                    _pool.insert_first(th);
961                    
962                 }
963                 catch(...)
964                 {
965                 }
966 mday    1.19 }
967 mike    1.2  
968              
969              PEGASUS_NAMESPACE_END
970              

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2