(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 chip  1.11 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12 kumpf 1.17 // 
 13 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Day (mdday@us.ibm.com)
 25            //
 26            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 27 chip  1.11 //              added nsk platform support
 28 mike  1.2  //
 29            //%/////////////////////////////////////////////////////////////////////////////
 30            
 31            #include "Thread.h"
 32            #include <Pegasus/Common/IPC.h>
 33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
 34 mike  1.2  
 35            #if defined(PEGASUS_OS_TYPE_WINDOWS)
 36 chip  1.11 # include "ThreadWindows.cpp"
 37 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 38            # include "ThreadUnix.cpp"
 39            #elif defined(PEGASUS_OS_TYPE_NSK)
 40            # include "ThreadNsk.cpp"
 41            #else
 42            # error "Unsupported platform"
 43            #endif
 44            
 45            PEGASUS_NAMESPACE_BEGIN
 46            
 47 chip  1.11 void thread_data::default_delete(void * data)
 48            {
 49 mike  1.2     if( data != NULL)
 50 chip  1.11       ::operator delete(data);
 51 mike  1.2  }
 52            
 53            Boolean Thread::_signals_blocked = false;
 54 mday  1.36.4.1 // l10n
 55                PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 56                Boolean Thread::_key_initialized = false;
 57                
 58 mike  1.2      
 59                // for non-native implementations
 60 chip  1.11     #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 61 mike  1.2      void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 62                {
 63                    cleanup_handler *cu = new cleanup_handler(routine, parm);
 64 chip  1.11         try
 65                    {
 66                	_cleanup.insert_first(cu);
 67                    }
 68                    catch(IPCException&)
 69 mike  1.2          {
 70                	delete cu;
 71 chip  1.11     	throw;
 72 mike  1.2          }
 73                    return;
 74                }
 75 chip  1.11     	
 76 mike  1.2      void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 77                {
 78                    cleanup_handler *cu ;
 79 chip  1.11         try
 80                    {
 81 mike  1.2      	cu = _cleanup.remove_first() ;
 82                    }
 83 chip  1.11         catch(IPCException&)
 84 mike  1.2          {
 85 chip  1.11     	PEGASUS_ASSERT(0);
 86 mike  1.2           }
 87                    if(execute == true)
 88                	cu->execute();
 89                    delete cu;
 90                }
 91 chip  1.11     		
 92 mike  1.2      #endif
 93                
 94                
 95 kumpf 1.8      //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 96 mike  1.2      
 97                
 98 chip  1.11     #ifndef PEGASUS_THREAD_EXIT_NATIVE
 99                void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
100                {
101                    // execute the cleanup stack and then return
102 mike  1.2         while( _cleanup.count() )
103                   {
104 chip  1.11            try
105                       {
106                	   cleanup_pop(true);
107                       }
108                       catch(IPCException&)
109                       {
110                	  PEGASUS_ASSERT(0);
111                	  break;
112 mike  1.2             }
113                   }
114                   _exit_code = exit_code;
115                   exit_thread(exit_code);
116 mday  1.4         _handle.thid = 0;
117 mike  1.2      }
118                
119                
120                #endif
121                
122 mday  1.36.4.1 // l10n start
123                Thread * Thread::getCurrent()
124                {
125                    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::getCurrent");	
126                	if (!Thread::_key_initialized)
127 mday  1.36.4.2 		return NULL;  
128                    PEG_METHOD_EXIT();  
129 mday  1.36.4.1 	return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
130                }
131                
132                AcceptLanguages * Thread::getLanguages()
133                {
134                    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::getLanguages");		
135                    
136                	Thread * curThrd = Thread::getCurrent();
137                	if (curThrd == NULL)
138                		return NULL;
139                   	AcceptLanguages * acceptLangs =
140                   		 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
141                	curThrd->dereference_tsd();
142                    PEG_METHOD_EXIT(); 	
143                	return acceptLangs;
144                }
145                
146                void Thread::setLanguages(AcceptLanguages *langs) //l10n
147                {
148                   PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::setLanguages");
149                   		
150 mday  1.36.4.1    Thread * currentThrd = Thread::getCurrent();
151                   if (currentThrd != NULL)
152                   {
153                   		// deletes the old tsd and creates a new one
154                		currentThrd->put_tsd("acceptLanguages",
155                			thread_data::default_delete, 
156                			sizeof(AcceptLanguages *),
157                			langs);   		
158                   }
159                   
160                   PEG_METHOD_EXIT();    		
161                }
162                
163                void Thread::clearLanguages() //l10n
164                {
165                   PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::clearLanguages");
166                   	
167                   Thread * currentThrd = Thread::getCurrent();
168                   if (currentThrd != NULL)
169                   {
170                   		// deletes the old tsd
171 mday  1.36.4.1 		currentThrd->delete_tsd("acceptLanguages");   		
172                   }
173                   
174                   PEG_METHOD_EXIT();   		
175                }
176                // l10n end      
177                
178 mday  1.20     DQueue<ThreadPool> ThreadPool::_pools(true);
179                
180                
181                void ThreadPool::kill_idle_threads(void)
182                {
183                   static struct timeval now, last = {0, 0};
184                   
185                   pegasus_gettimeofday(&now);
186                   if(now.tv_sec - last.tv_sec > 5)
187                   {
188                      _pools.lock();
189                      ThreadPool *p = _pools.next(0);
190                      while(p != 0)
191                      {
192                	 try 
193                	 {
194                	    p->kill_dead_threads();
195                	 }
196                	 catch(...)
197                	 {
198                	 }
199 mday  1.20     	 p = _pools.next(p);
200                      }
201                      _pools.unlock();
202                      pegasus_gettimeofday(&last);
203                   }
204                }
205                
206                
207 mike  1.2      ThreadPool::ThreadPool(Sint16 initial_size,
208 kumpf 1.8      		       const Sint8 *key,
209 mike  1.2      		       Sint16 min,
210                		       Sint16 max,
211                		       struct timeval & alloc_wait,
212 chip  1.11     		       struct timeval & dealloc_wait,
213 mike  1.2      		       struct timeval & deadlock_detect)
214                   : _max_threads(max), _min_threads(min),
215 mday  1.12          _current_threads(0),
216                     _pool(true), _running(true),
217 mike  1.2           _dead(true), _dying(0)
218                {
219                   _allocate_wait.tv_sec = alloc_wait.tv_sec;
220                   _allocate_wait.tv_usec = alloc_wait.tv_usec;
221 chip  1.11        _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
222 mike  1.2         _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
223                   _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
224                   _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
225                   memset(_key, 0x00, 17);
226                   if(key != 0)
227                      strncpy(_key, key, 16);
228 mday  1.21        if(_max_threads > 0 && _max_threads < initial_size)
229 mike  1.2            _max_threads = initial_size;
230                   if(_min_threads > initial_size)
231                      _min_threads = initial_size;
232 chip  1.11     
233 mike  1.2         int i;
234                   for(i = 0; i < initial_size; i++)
235                   {
236                      _link_pool(_init_thread());
237                   }
238 mday  1.20        _pools.insert_last(this);
239                   
240 mday  1.36.4.1    // l10n
241                   if (!Thread::_key_initialized)
242                   {
243 mday  1.36.4.2 	if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
244                	{
245                        	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
246                	        	  "ThreadPool: able to create a thread key");   
247                	   	Thread::_key_initialized = true;	
248                	}
249                	else
250                	{
251                       		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
252                	        	  "ThreadPool: ERROR - unable to create a thread key");   
253                printf("ThreadPool: ERROR - unable to create a thread key\n");
254                	}
255 mday  1.36.4.1    }
256 mike  1.2      }
257                
258 chip  1.11     
259 mike  1.2      
260                ThreadPool::~ThreadPool(void)
261                {
262 mday  1.35        try 
263 chip  1.11        {
264 mday  1.35           _pools.remove(this);
265                      _dying++;
266                      Thread *th = 0;
267                      th = _pool.remove_first();
268                      while(th != 0)
269 mike  1.2            {
270 mday  1.35     	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
271                	 
272                	 if(sleep_sem == 0)
273                	 {
274                	    th->dereference_tsd();
275                	    throw NullPointer();
276                	 }
277                	 
278                	 sleep_sem->signal();
279                	 sleep_sem->signal();
280 mike  1.2      	 th->dereference_tsd();
281 mday  1.35     	 // signal the thread's sleep semaphore
282                	 th->cancel();
283                	 th->join();
284                	 th->empty_tsd();
285                	 delete th;
286                	 th = _pool.remove_first();
287 mike  1.2            }
288 chip  1.11     
289 mday  1.35           th = _running.remove_first();
290                      while(th != 0)
291                      {
292                	 // signal the thread's sleep semaphore
293                	 th->cancel();
294                	 th->join();
295                	 th->empty_tsd();
296                	 delete th;
297                	 th = _running.remove_first();
298                      }
299 mike  1.2      
300 mday  1.35           th = _dead.remove_first();
301                      while(th != 0)
302                      {
303                	 // signal the thread's sleep semaphore
304                	 th->cancel();
305                	 th->join();
306                	 th->empty_tsd();
307                	 delete th;
308                	 th = _dead.remove_first();
309                      }
310 mike  1.2         }
311 mday  1.35        catch(...)
312 mike  1.2         {
313                   }
314                }
315                
316 chip  1.11     // make this static to the class
317 mike  1.2      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
318                {
319 kumpf 1.14        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
320                
321 mike  1.2         Thread *myself = (Thread *)parm;
322                   if(myself == 0)
323 kumpf 1.14        {
324                      PEG_METHOD_EXIT();
325 mike  1.2            throw NullPointer();
326 kumpf 1.14        }
327 mday  1.36.4.1    
328                // l10n
329                   // Set myself into thread specific storage
330 mday  1.36.4.2    // This will allow code to get its own Thread
331                   if (Thread::_key_initialized)  
332                   {
333                   	if (pegasus_set_thread_specific(Thread::_platform_thread_key,
334                								 (void *) myself) == 0)
335                        {
336                        	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
337                	        	  "just set myself into thread specific storage");   
338                        }
339                        else
340                        {
341                printf("ThreadPool: ERROR setting tls\n");
342                        	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
343                	        	  "ERROR: got error setting thread specific storage");   
344                        }
345                   }
346                   else
347                   {
348                        Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
349                	          "ERROR: thread key is not initialized");   
350                   }
351 mday  1.36.4.1    
352 mike  1.2         ThreadPool *pool = (ThreadPool *)myself->get_parm();
353 kumpf 1.14        if(pool == 0 ) 
354                   {
355                      PEG_METHOD_EXIT();
356 mike  1.2            throw NullPointer();
357 kumpf 1.14        }
358 mike  1.5         Semaphore *sleep_sem = 0;
359 mday  1.13        Semaphore *blocking_sem = 0;
360                   
361 mike  1.5         struct timeval *deadlock_timer = 0;
362 chip  1.11     
363                   try
364 mike  1.2         {
365                      sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
366                      myself->dereference_tsd();
367                      deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
368 mday  1.22           myself->dereference_tsd(); 
369 mike  1.2         }
370 mike  1.6         catch(IPCException &)
371 mike  1.2         {
372 kumpf 1.14           PEG_METHOD_EXIT();
373 mike  1.2            myself->exit_self(0);
374                   }
375 mday  1.30        catch(...)
376                   {
377                      PEG_METHOD_EXIT();
378                      myself->exit_self(0);
379                   }
380                   
381 mike  1.2         if(sleep_sem == 0 || deadlock_timer == 0)
382 kumpf 1.14        {
383                      PEG_METHOD_EXIT();
384 mike  1.2            throw NullPointer();
385 kumpf 1.14        }
386 mike  1.2      
387                   while(pool->_dying < 1)
388                   {
389                      sleep_sem->wait();
390 mday  1.35             
391 mike  1.2            // when we awaken we reside on the running queue, not the pool queue
392                      if(pool->_dying > 0)
393                	 break;
394 chip  1.11     
395 mike  1.5            PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
396                      void *parm = 0;
397 mike  1.2      
398 chip  1.11           try
399 mike  1.2            {
400                	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
401                	    myself->reference_tsd("work func");
402                	 myself->dereference_tsd();
403                	 parm = myself->reference_tsd("work parm");
404                	 myself->dereference_tsd();
405 mday  1.13     	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
406                	 myself->dereference_tsd();
407                
408 mike  1.2            }
409 mike  1.6            catch(IPCException &)
410 mike  1.2            {
411 kumpf 1.14     	 PEG_METHOD_EXIT();
412 mike  1.2      	 myself->exit_self(0);
413                      }
414 chip  1.11     
415 mike  1.2            if(_work == 0)
416 kumpf 1.14           {
417                         PEG_METHOD_EXIT();
418 mike  1.2      	 throw NullPointer();
419 kumpf 1.14           }
420 kumpf 1.24     
421                      if(_work ==
422                         (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
423                      {
424 mday  1.23     	 _work(parm);
425 kumpf 1.24           }
426                
427 mike  1.2            gettimeofday(deadlock_timer, NULL);
428 mday  1.20           try 
429                      {
430                	 _work(parm);
431                      }
432                      catch(...)
433                      {
434                	 gettimeofday(deadlock_timer, NULL);
435                      }
436 mday  1.36.4.1       
437 mday  1.20           gettimeofday(deadlock_timer, NULL);
438 mday  1.13           if( blocking_sem != 0 )
439                	 blocking_sem->signal();
440                      
441 chip  1.11           // put myself back onto the available list
442                      try
443 mike  1.2            {
444                	 pool->_running.remove((void *)myself);
445                	 pool->_link_pool(myself);
446                      }
447 mike  1.6            catch(IPCException &)
448 mike  1.2            {
449 kumpf 1.14     	 PEG_METHOD_EXIT();
450 mike  1.2      	 myself->exit_self(0);
451                      }
452                   }
453                   // wait to be awakend by the thread pool destructor
454                   sleep_sem->wait();
455                   myself->test_cancel();
456 kumpf 1.14     
457                   PEG_METHOD_EXIT();
458 mike  1.2         myself->exit_self(0);
459                   return((PEGASUS_THREAD_RETURN)0);
460                }
461                
462                void ThreadPool::allocate_and_awaken(void *parm,
463                				     PEGASUS_THREAD_RETURN \
464 mday  1.13     				     (PEGASUS_THREAD_CDECL *work)(void *), 
465                				     Semaphore *blocking)
466                
467 mike  1.2         throw(IPCException)
468                {
469 kumpf 1.14        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
470 mike  1.2         struct timeval start;
471                   gettimeofday(&start, NULL);
472 chip  1.11     
473 mike  1.2         Thread *th = _pool.remove_first();
474 mday  1.12        
475 mday  1.7         // wait for the right interval and try again
476 mday  1.33        while (th == 0 && _dying < 1)
477 mike  1.2         {
478 mday  1.12           _check_deadlock(&start) ;
479                      
480 mday  1.21           if(_max_threads == 0 || _current_threads < _max_threads)
481 mday  1.35           {
482                	 th = _init_thread();
483                	 continue;
484                      }
485                      pegasus_yield();
486 mday  1.7            th = _pool.remove_first();
487                   }
488 chip  1.11     
489                
490 mike  1.2         if(_dying < 1)
491                   {
492                      // initialize the thread data with the work function and parameters
493 kumpf 1.14           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
494                          "Initializing thread with work function and parameters: parm = %p",
495                          parm);
496                
497 kumpf 1.15           th->delete_tsd("work func");
498 chip  1.11           th->put_tsd("work func", NULL,
499 mike  1.2      		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
500                		  (void *)work);
501 kumpf 1.15           th->delete_tsd("work parm");
502 mike  1.2            th->put_tsd("work parm", NULL, sizeof(void *), parm);
503 kumpf 1.15           th->delete_tsd("blocking sem");
504 mday  1.13           if(blocking != 0 )
505                	 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
506                      
507 chip  1.11           // put the thread on the running list
508 mike  1.2            _running.insert_first(th);
509                
510                      // signal the thread's sleep semaphore to awaken it
511                      Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
512                
513                      if(sleep_sem == 0)
514                      {
515                	 th->dereference_tsd();
516 kumpf 1.14              PEG_METHOD_EXIT();
517 mike  1.2      	 throw NullPointer();
518                      }
519 kumpf 1.14           Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
520 mike  1.2            sleep_sem->signal();
521                      th->dereference_tsd();
522                   }
523                   else
524                      _pool.insert_first(th);
525 kumpf 1.14     
526                   PEG_METHOD_EXIT();
527 mike  1.2      }
528                
529                // caller is responsible for only calling this routine during slack periods
530                // but should call it at least once per _deadlock_detect with the running q
531                // and at least once per _deallocate_wait for the pool q
532                
533 mday  1.12     Uint32 ThreadPool::kill_dead_threads(void)
534 mike  1.2      	 throw(IPCException)
535                {
536                   struct timeval now;
537                   gettimeofday(&now, NULL);
538 mday  1.12        Uint32 bodies = 0;
539                   
540 mike  1.2         // first go thread the dead q and clean it up as much as possible
541                   while(_dead.count() > 0)
542                   {
543 mday  1.34           Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
544 mike  1.2            Thread *dead = _dead.remove_first();
545                      if(dead == 0)
546                	 throw NullPointer();
547 mday  1.30           dead->join();
548 mike  1.2            delete dead;
549                   }
550 chip  1.11     
551                   DQueue<Thread> * map[2] =
552 mike  1.2            {
553                	 &_pool, &_running
554                      };
555 chip  1.11     
556                
557 mike  1.2         DQueue<Thread> *q = 0;
558                   int i = 0;
559                   AtomicInt needed(0);
560 chip  1.11     
561 kumpf 1.31     #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
562                   // This change prevents the thread pool from killing "hung" threads.
563                   // The definition of a "hung" thread is one that has been on the run queue
564                   // for longer than the time interval set when the thread pool was created.
565                   // Cancelling "hung" threads has proven to be problematic.
566                
567                   // With this change the thread pool will not cancel "hung" threads.  This
568                   // may prevent a crash depending upon the state of the "hung" thread.  In
569                   // the case that the thread is actually hung, this change causes the
570                   // thread resources not to be reclaimed.
571                
572                   // Idle threads, those that have not executed a routine for a time
573                   // interval, continue to be destroyed.  This is normal and should not
574                   // cause any problems.
575                   for( ; i < 1; i++)
576                #else
577 mday  1.30        for( ; i < 2; i++)
578 kumpf 1.31     #endif
579 mday  1.21        { 
580                      q = map[i];
581 mike  1.2            if(q->count() > 0 )
582                      {
583 chip  1.11     	 try
584 mike  1.2      	 {
585                	    q->try_lock();
586                	 }
587 mday  1.18     	 catch(...)
588 mike  1.2      	 {
589 mday  1.18     	    return bodies;
590 mike  1.2      	 }
591                
592                	 struct timeval dt = { 0, 0 };
593                	 struct timeval *dtp;
594                	 Thread *th = 0;
595                	 th = q->next(th);
596                	 while (th != 0 )
597                	 {
598 chip  1.11     	    try
599 mike  1.2      	    {
600                	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
601                	    }
602 mday  1.18     	    catch(...)
603 mike  1.2      	    {
604 kumpf 1.25     	       q->unlock();
605 mday  1.18     	       return bodies;
606 mike  1.2      	    }
607 chip  1.11     	
608 mike  1.2      	    if(dtp != 0)
609                	    {
610                	       memcpy(&dt, dtp, sizeof(struct timeval));
611                	    }
612                	    th->dereference_tsd();
613                	    struct timeval deadlock_timeout;
614 mday  1.18     	    Boolean too_long;
615                	    if( i == 0)
616                	    {
617                	       too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
618                	    }
619                	    else 
620                	    {
621 mday  1.22     	       too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
622 mday  1.18     	    }
623                	    
624                	    if( true == too_long)
625 mike  1.2      	    {
626                	       // if we are deallocating from the pool, escape if we are
627 chip  1.11     	       // down to the minimum thread count
628 mday  1.13     	       _current_threads--;
629 mday  1.18     	       if( _current_threads.value() < (Uint32)_min_threads )
630 mike  1.2      	       {
631 mday  1.13     		  if( i == 0)
632 mike  1.2      		  {
633 mday  1.13     		     _current_threads++;
634 mike  1.2      		     th = q->next(th);
635                		     continue;
636                		  }
637 chip  1.11     		  else
638 mike  1.2      		  {
639 chip  1.11     		     // we are killing a hung thread and we will drop below the
640 mike  1.2      		     // minimum. create another thread to make up for the one
641                		     // we are about to kill
642                		     needed++;
643                		  }
644                	       }
645 chip  1.11     	
646 mike  1.2      	       th = q->remove_no_lock((void *)th);
647 chip  1.11     	
648 mike  1.2      	       if(th != 0)
649                	       {
650 mday  1.30     		  if( i == 0 )
651 mike  1.2      		  {
652 mday  1.30     		     th->delete_tsd("work func");
653                		     th->put_tsd("work func", NULL,
654                				 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
655                				 (void *)&_undertaker);
656                		     th->delete_tsd("work parm");
657                		     th->put_tsd("work parm", NULL, sizeof(void *), th);
658                		     
659                		     // signal the thread's sleep semaphore to awaken it
660                		     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
661                		     
662                		     if(sleep_sem == 0)
663                		     {
664                			q->unlock();
665                			th->dereference_tsd();
666                			throw NullPointer();
667                		     }
668                		     
669                		     bodies++;
670 mike  1.2      		     th->dereference_tsd();
671 mday  1.30     		     _dead.insert_first(th);
672                		     sleep_sem->signal();
673                		     th = 0;
674                		  }
675                		  else 
676                		  {
677                		     // deadlocked threads
678 mday  1.34     		     Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
679 mday  1.30     		     th->cancel();
680                		     delete th;
681 mike  1.2      		  }
682                	       }
683                	    }
684                	    th = q->next(th);
685 mday  1.20     	    pegasus_sleep(1);
686 mike  1.2      	 }
687                	 q->unlock();
688                	 while (needed.value() > 0)
689                	 {
690                	    _link_pool(_init_thread());
691                	    needed--;
692 mday  1.20     	    pegasus_sleep(0);
693 mike  1.2      	 }
694                      }
695                   }
696 mday  1.18         return bodies; 
697 mike  1.2      }
698                
699 mday  1.12     
700 mike  1.2      Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
701                {
702 mday  1.22        // never time out if the interval is zero
703                   if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
704                      return false;
705                   
706 mday  1.36        struct timeval now, finish, remaining ;
707 mday  1.13        Uint32 usec;
708 mday  1.33        pegasus_gettimeofday(&now);
709 mday  1.36        /* remove valgrind error */
710                   pegasus_gettimeofday(&remaining);
711                   
712 mday  1.13     
713                   finish.tv_sec = start->tv_sec + interval->tv_sec;
714                   usec = start->tv_usec + interval->tv_usec;
715                   finish.tv_sec += (usec / 1000000);
716                   usec %= 1000000;
717                   finish.tv_usec = usec;
718                    
719                   if ( timeval_subtract(&remaining, &finish, &now) )
720 mike  1.2            return true;
721                   else
722                      return false;
723                }
724                
725                PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
726                {
727 mday  1.30        exit_thread((PEGASUS_THREAD_RETURN)1);
728                   return (PEGASUS_THREAD_RETURN)1;
729 mike  1.2      }
730 mday  1.19     
731                
732                 void ThreadPool::_sleep_sem_del(void *p)
733                {
734                   if(p != 0)
735                   {
736                      delete (Semaphore *)p;
737                   }
738                }
739                
740                 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
741                {
742                   if (true == check_time(start, &_deadlock_detect))
743                      throw Deadlock(pegasus_thread_self());
744                   return;
745                }
746                
747                
748                 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
749                {
750                   return(check_time(start, &_deadlock_detect));
751 mday  1.19     }
752                
753                 Boolean ThreadPool::_check_dealloc(struct timeval *start)
754                {
755                   return(check_time(start, &_deallocate_wait));
756                }
757                
758                 Thread *ThreadPool::_init_thread(void) throw(IPCException)
759                {
760                   Thread *th = (Thread *) new Thread(_loop, this, false);
761                   // allocate a sleep semaphore and pass it in the thread context
762                   // initial count is zero, loop function will sleep until
763                   // we signal the semaphore
764                   Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
765                   th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
766                   
767                   struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
768 mday  1.35        pegasus_gettimeofday(dldt);
769                   
770 mday  1.19        th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
771                   // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
772 mday  1.36.4.1   
773 mday  1.19        th->run();
774                   _current_threads++;
775                   pegasus_yield();
776                   
777                   return th;
778                }
779                
780                 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
781                {
782                   if(th == 0)
783                      throw NullPointer();
784                   _pool.insert_first(th);
785                }
786 mike  1.2      
787                
788                PEGASUS_NAMESPACE_END
789                

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2