(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
   2           //
   3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
   4            // The Open Group, Tivoli Systems
   5 mike  1.2  //
   6            // Permission is hereby granted, free of charge, to any person obtaining a copy
   7 chip  1.11 // of this software and associated documentation files (the "Software"), to
   8            // deal in the Software without restriction, including without limitation the
   9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
  11            // furnished to do so, subject to the following conditions:
  12 kumpf 1.17 // 
  13 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  16 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21            //
  22            //==============================================================================
  23            //
  24            // Author: Mike Day (mdday@us.ibm.com)
  25            //
  26            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
  27 chip  1.11 //              added nsk platform support
  28 mike  1.2  //
  29            //%/////////////////////////////////////////////////////////////////////////////
  30            
  31            #include "Thread.h"
  32            #include <Pegasus/Common/IPC.h>
  33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
  34 mike  1.2  
  35            #if defined(PEGASUS_OS_TYPE_WINDOWS)
  36 chip  1.11 # include "ThreadWindows.cpp"
  37 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
  38            # include "ThreadUnix.cpp"
  39            #elif defined(PEGASUS_OS_TYPE_NSK)
  40            # include "ThreadNsk.cpp"
  41            #else
  42            # error "Unsupported platform"
  43            #endif
  44            
  45            PEGASUS_NAMESPACE_BEGIN
  46            
  47 mday  1.42 
  48 chip  1.11 void thread_data::default_delete(void * data)
  49            {
  50 mike  1.2     if( data != NULL)
  51 chip  1.11       ::operator delete(data);
  52 mike  1.2  }
  53            
  54 chuck 1.43 // l10n start
  55            void language_delete(void * data)
  56            {
  57               if( data != NULL)
  58               {
  59                  AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
  60 chuck 1.44       delete al;
  61 chuck 1.43    }
  62            }
  63            // l10n end
  64            
  65 mike  1.2  Boolean Thread::_signals_blocked = false;
  66 chuck 1.37 // l10n
  67 mday  1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
  68 chuck 1.37 Boolean Thread::_key_initialized = false;
  69 chuck 1.41 Boolean Thread::_key_error = false;
  70 chuck 1.37 
  71 mike  1.2  
  72            // for non-native implementations
  73 chip  1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
  74 mike  1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
  75            {
  76                cleanup_handler *cu = new cleanup_handler(routine, parm);
  77 chip  1.11     try
  78                {
  79            	_cleanup.insert_first(cu);
  80                }
  81                catch(IPCException&)
  82 mike  1.2      {
  83            	delete cu;
  84 chip  1.11 	throw;
  85 mike  1.2      }
  86                return;
  87            }
  88 chip  1.11 	
  89 mike  1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
  90            {
  91                cleanup_handler *cu ;
  92 chip  1.11     try
  93                {
  94 mike  1.2  	cu = _cleanup.remove_first() ;
  95                }
  96 chip  1.11     catch(IPCException&)
  97 mike  1.2      {
  98 chip  1.11 	PEGASUS_ASSERT(0);
  99 mike  1.2       }
 100                if(execute == true)
 101            	cu->execute();
 102                delete cu;
 103            }
 104 chip  1.11 		
 105 mike  1.2  #endif
 106            
 107            
 108 kumpf 1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 109 mike  1.2  
 110            
 111 chip  1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
 112            void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
 113            {
 114                // execute the cleanup stack and then return
 115 mike  1.2     while( _cleanup.count() )
 116               {
 117 chip  1.11        try
 118                   {
 119            	   cleanup_pop(true);
 120                   }
 121                   catch(IPCException&)
 122                   {
 123            	  PEGASUS_ASSERT(0);
 124            	  break;
 125 mike  1.2         }
 126               }
 127               _exit_code = exit_code;
 128               exit_thread(exit_code);
 129 mday  1.4     _handle.thid = 0;
 130 mike  1.2  }
 131            
 132            
 133            #endif
 134            
 135 chuck 1.37 // l10n start
 136 chuck 1.39 Sint8 Thread::initializeKey()
 137            {
 138               PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
 139               if (!Thread::_key_initialized)
 140               {
 141 chuck 1.41 	if (Thread::_key_error)
 142            	{
 143                   		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 144            	        	  "Thread: ERROR - thread key error"); 
 145            		return -1;
 146            	}
 147            
 148 chuck 1.39 	if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
 149            	{
 150                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 151            	        	  "Thread: able to create a thread key");   
 152            	   	Thread::_key_initialized = true;	
 153            	}
 154            	else
 155            	{
 156                   		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 157            	        	  "Thread: ERROR - unable to create a thread key"); 
 158 chuck 1.41 	   	Thread::_key_error = true;
 159 chuck 1.39 		return -1;
 160            	}
 161               }
 162            
 163               PEG_METHOD_EXIT();
 164               return 0;  
 165            }
 166            
 167 chuck 1.37 Thread * Thread::getCurrent()
 168            {
 169 chuck 1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");	
 170 chuck 1.40     if (Thread::initializeKey() != 0)
 171 chuck 1.39     {
 172            	return NULL;  
 173                }
 174 chuck 1.38     PEG_METHOD_EXIT();  
 175 chuck 1.39     return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
 176            }
 177            
 178            void Thread::setCurrent(Thread * thrd)
 179            {
 180               PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
 181               if (Thread::initializeKey() == 0)
 182               {
 183               	if (pegasus_set_thread_specific(Thread::_platform_thread_key,
 184            								 (void *) thrd) == 0)
 185                    {
 186                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 187            	        	  "Successful set Thread * into thread specific storage");   
 188                    }
 189                    else
 190                    {
 191                    	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 192            	        	  "ERROR: got error setting Thread * into thread specific storage");   
 193                    }
 194               }
 195               PEG_METHOD_EXIT();  
 196 chuck 1.37 }
 197            
 198            AcceptLanguages * Thread::getLanguages()
 199            {
 200 chuck 1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");		
 201 chuck 1.37     
 202            	Thread * curThrd = Thread::getCurrent();
 203            	if (curThrd == NULL)
 204            		return NULL;
 205               	AcceptLanguages * acceptLangs =
 206               		 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
 207            	curThrd->dereference_tsd();
 208                PEG_METHOD_EXIT(); 	
 209            	return acceptLangs;
 210            }
 211            
 212            void Thread::setLanguages(AcceptLanguages *langs) //l10n
 213            {
 214 chuck 1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
 215 chuck 1.37    		
 216               Thread * currentThrd = Thread::getCurrent();
 217               if (currentThrd != NULL)
 218               {
 219               		// deletes the old tsd and creates a new one
 220            		currentThrd->put_tsd("acceptLanguages",
 221 chuck 1.43 			language_delete, 
 222 chuck 1.37 			sizeof(AcceptLanguages *),
 223            			langs);   		
 224               }
 225               
 226               PEG_METHOD_EXIT();    		
 227            }
 228            
 229            void Thread::clearLanguages() //l10n
 230            {
 231 chuck 1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
 232 chuck 1.37    	
 233               Thread * currentThrd = Thread::getCurrent();
 234               if (currentThrd != NULL)
 235               {
 236               		// deletes the old tsd
 237            		currentThrd->delete_tsd("acceptLanguages");   		
 238               }
 239               
 240               PEG_METHOD_EXIT();   		
 241            }
 242            // l10n end      
 243            
 244 mday  1.52 
 245            
 246            // two special synchronization classes for ThreadPool
 247            // 
 248            
 249            class timed_mutex 
 250            {
 251               public:
 252                  timed_mutex(Mutex* mut, int msec)
 253            	 :_mut(mut)
 254                  {
 255            	 _mut->timed_lock(msec, pegasus_thread_self());
 256                  }
 257                  ~timed_mutex(void)
 258                  {
 259            	 _mut->unlock();
 260                  }
 261                  Mutex* _mut;
 262            };
 263            
 264            
 265 mday  1.52 class try_mutex
 266            {
 267               public:
 268                  try_mutex(Mutex* mut)
 269            	 :_mut(mut)
 270                  {
 271            	 _mut->try_lock(pegasus_thread_self());
 272                  }
 273                  ~try_mutex(void)
 274                  {
 275            	 _mut->unlock();
 276                  }
 277                  
 278                  Mutex* _mut;
 279            };
 280            
 281            
 282 mday  1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
 283            
 284            
 285            void ThreadPool::kill_idle_threads(void)
 286            {
 287               static struct timeval now, last = {0, 0};
 288               
 289               pegasus_gettimeofday(&now);
 290               if(now.tv_sec - last.tv_sec > 5)
 291               {
 292                  _pools.lock();
 293                  ThreadPool *p = _pools.next(0);
 294                  while(p != 0)
 295                  {
 296            	 try 
 297            	 {
 298            	    p->kill_dead_threads();
 299            	 }
 300            	 catch(...)
 301            	 {
 302            	 }
 303 mday  1.20 	 p = _pools.next(p);
 304                  }
 305                  _pools.unlock();
 306                  pegasus_gettimeofday(&last);
 307               }
 308            }
 309            
 310            
 311 mike  1.2  ThreadPool::ThreadPool(Sint16 initial_size,
 312 kumpf 1.8  		       const Sint8 *key,
 313 mike  1.2  		       Sint16 min,
 314            		       Sint16 max,
 315            		       struct timeval & alloc_wait,
 316 chip  1.11 		       struct timeval & dealloc_wait,
 317 mike  1.2  		       struct timeval & deadlock_detect)
 318               : _max_threads(max), _min_threads(min),
 319 mday  1.12      _current_threads(0),
 320                 _pool(true), _running(true),
 321 mike  1.2       _dead(true), _dying(0)
 322            {
 323               _allocate_wait.tv_sec = alloc_wait.tv_sec;
 324               _allocate_wait.tv_usec = alloc_wait.tv_usec;
 325 chip  1.11    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
 326 mike  1.2     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
 327               _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
 328               _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
 329               memset(_key, 0x00, 17);
 330               if(key != 0)
 331                  strncpy(_key, key, 16);
 332 mday  1.21    if(_max_threads > 0 && _max_threads < initial_size)
 333 mike  1.2        _max_threads = initial_size;
 334               if(_min_threads > initial_size)
 335                  _min_threads = initial_size;
 336 chip  1.11 
 337 mike  1.2     int i;
 338               for(i = 0; i < initial_size; i++)
 339               {
 340                  _link_pool(_init_thread());
 341               }
 342 mday  1.20    _pools.insert_last(this);
 343 mike  1.2  }
 344            
 345 chip  1.11 
 346 mday  1.52 // Note:   <<< Fri Oct 17 09:19:03 2003 mdd >>>
 347            // the pegasus_yield() calls that preceed each th->join() are to 
 348            // give a thread on the running list a chance to reach a cancellation
 349            // point before the join 
 350 mike  1.2  
 351            ThreadPool::~ThreadPool(void)
 352            {
 353 mday  1.35    try 
 354 mday  1.47    {      
 355 mday  1.52       // set the dying flag so all thread know the destructor has been entered
 356 mday  1.47       {
 357            	 auto_mutex(&(this->_monitor));
 358            	 _dying++;
 359                  }
 360 mday  1.52       // remove from the global pools list 
 361 mday  1.35       _pools.remove(this);
 362 mday  1.52 
 363                  // start with idle threads. 
 364 mday  1.35       Thread *th = 0;
 365                  th = _pool.remove_first();
 366 mday  1.52       Semaphore* sleep_sem;
 367                  
 368 mday  1.35       while(th != 0)
 369 mike  1.2        {
 370 mday  1.52 	 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 371 mday  1.35 	 if(sleep_sem == 0)
 372            	 {
 373            	    th->dereference_tsd();
 374            	    throw NullPointer();
 375            	 }
 376 mday  1.52 	 
 377 s.hills 1.49 	 // Signal to get the thread out of the work loop.
 378              	 sleep_sem->signal();
 379 mday    1.52 
 380 s.hills 1.49 	 // Signal to get the thread past the end. See the comment
 381              	 // "wait to be awakend by the thread pool destructor"
 382              	 // Note: the current implementation of Thread for Windows
 383              	 // does not implement "pthread" cancelation points so this
 384              	 // is needed.
 385 mday    1.35 	 sleep_sem->signal();
 386 mike    1.2  	 th->dereference_tsd();
 387 mday    1.35 	 th->cancel();
 388              	 th->join();
 389              	 delete th;
 390              	 th = _pool.remove_first();
 391 mike    1.2        }
 392 mday    1.47       th = _dead.remove_first();
 393 mday    1.35       while(th != 0)
 394                    {
 395 mday    1.52 	 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 396              
 397 mday    1.47 	 if(sleep_sem == 0)
 398              	 {
 399              	    th->dereference_tsd();
 400              	    throw NullPointer();
 401              	 }
 402              	 
 403 mday    1.52 	 // signal the thread's sleep semaphore
 404              	 sleep_sem->signal();
 405 mday    1.47 	 sleep_sem->signal();
 406 mday    1.52 	 th->dereference_tsd();	 
 407 mday    1.35 	 th->cancel();
 408              	 th->join();
 409              	 delete th;
 410 mday    1.47 	 th = _dead.remove_first();
 411 mday    1.35       }
 412 mday    1.52 
 413 mday    1.47       {
 414 mday    1.52 	 th = _running.remove_first();
 415              	 while(th != 0)
 416              	 {	 
 417              	    // signal the thread's sleep semaphore
 418              
 419              	    sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 420              	    if(sleep_sem == 0 )
 421              	    {
 422              	       th->dereference_tsd();
 423              	       throw NullPointer();
 424              	    }
 425              	    
 426              	    sleep_sem->signal();
 427              	    sleep_sem->signal();
 428 mday    1.47 	    th->dereference_tsd();
 429 mday    1.52 	    th->cancel();
 430              	    pegasus_yield();
 431              	    
 432              	    th->join();
 433              	    delete th;
 434              	    th = _running.remove_first();
 435 mday    1.47 	 }
 436 mday    1.35       }
 437 mday    1.47       
 438 mike    1.2     }
 439 mday    1.52    
 440 mday    1.35    catch(...)
 441 mike    1.2     {
 442                 }
 443              }
 444              
 445 chip    1.11 // make this static to the class
 446 mike    1.2  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
 447              {
 448 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
 449              
 450 mike    1.2     Thread *myself = (Thread *)parm;
 451                 if(myself == 0)
 452 kumpf   1.14    {
 453                    PEG_METHOD_EXIT();
 454 mike    1.2        throw NullPointer();
 455 kumpf   1.14    }
 456 chuck   1.37    
 457              // l10n
 458                 // Set myself into thread specific storage
 459 chuck   1.38    // This will allow code to get its own Thread
 460 chuck   1.39    Thread::setCurrent(myself);	
 461              
 462 mike    1.2     ThreadPool *pool = (ThreadPool *)myself->get_parm();
 463 kumpf   1.14    if(pool == 0 ) 
 464                 {
 465                    PEG_METHOD_EXIT();
 466 mike    1.2        throw NullPointer();
 467 kumpf   1.14    }
 468 mday    1.52    if(pool->_dying.value())
 469                 {
 470                    PEG_METHOD_EXIT();
 471                    return((PEGASUS_THREAD_RETURN)0);
 472                 }
 473                 
 474 mike    1.5     Semaphore *sleep_sem = 0;
 475 mday    1.13    Semaphore *blocking_sem = 0;
 476                 
 477 mike    1.5     struct timeval *deadlock_timer = 0;
 478 mday    1.47    
 479 chip    1.11    try
 480 mike    1.2     {
 481                    sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
 482                    myself->dereference_tsd();
 483                    deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
 484 mday    1.22       myself->dereference_tsd(); 
 485 mike    1.2     }
 486 mday    1.52 
 487 mday    1.30    catch(...)
 488                 {
 489                    PEG_METHOD_EXIT();
 490 mday    1.52       return((PEGASUS_THREAD_RETURN)0);
 491 mday    1.30    }
 492                 
 493 mike    1.2     if(sleep_sem == 0 || deadlock_timer == 0)
 494 kumpf   1.14    {
 495                    PEG_METHOD_EXIT();
 496 mday    1.52       return((PEGASUS_THREAD_RETURN)0);
 497 kumpf   1.14    }
 498 mike    1.2  
 499 mday    1.47    while(pool->_dying.value() < 1)
 500 mike    1.2     {
 501 mday    1.52       try 
 502                    {
 503              	 sleep_sem->wait();
 504                    }
 505                    catch(IPCException& )
 506                    {
 507              	 PEG_METHOD_EXIT();
 508              	 return((PEGASUS_THREAD_RETURN)0);
 509                    }
 510                    
 511 mike    1.2        // when we awaken we reside on the running queue, not the pool queue
 512 mday    1.52       if(pool->_dying.value())
 513                    {
 514              	 PEG_METHOD_EXIT();
 515              	 return((PEGASUS_THREAD_RETURN)0);
 516                    }
 517                    
 518 mday    1.47       
 519 mike    1.5        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
 520                    void *parm = 0;
 521 mike    1.2  
 522 chip    1.11       try
 523 mike    1.2        {
 524              	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
 525              	    myself->reference_tsd("work func");
 526              	 myself->dereference_tsd();
 527              	 parm = myself->reference_tsd("work parm");
 528              	 myself->dereference_tsd();
 529 mday    1.13 	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
 530              	 myself->dereference_tsd();
 531              
 532 mike    1.2        }
 533 mike    1.6        catch(IPCException &)
 534 mike    1.2        {
 535 kumpf   1.14 	 PEG_METHOD_EXIT();
 536 mday    1.52 	 return((PEGASUS_THREAD_RETURN)0);
 537 mike    1.2        }
 538 mday    1.52       
 539 mike    1.2        if(_work == 0)
 540 kumpf   1.14       {
 541                       PEG_METHOD_EXIT();
 542 mike    1.2  	 throw NullPointer();
 543 kumpf   1.14       }
 544 kumpf   1.24 
 545                    if(_work ==
 546                       (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
 547                    {
 548 mday    1.23 	 _work(parm);
 549 kumpf   1.24       }
 550              
 551 mike    1.2        gettimeofday(deadlock_timer, NULL);
 552 mday    1.20       try 
 553                    {
 554 mday    1.47 	 {
 555 mday    1.52 	    timed_mutex(&(pool->_monitor), 1000);
 556 mday    1.47 	    if(pool->_dying.value())
 557              	    {
 558 mday    1.52 	       _undertaker(parm);
 559 mday    1.47 	    }
 560              	 }
 561 mday    1.20 	 _work(parm);
 562                    }
 563                    catch(...)
 564                    {
 565 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
 566 mday    1.20       }
 567 chuck   1.37       
 568 chip    1.11       // put myself back onto the available list
 569                    try
 570 mike    1.2        {
 571 mday    1.52 	 timed_mutex(&(pool->_monitor), 1000);
 572 mday    1.47 	 if(pool->_dying.value() == 0)
 573              	 {
 574              	    gettimeofday(deadlock_timer, NULL);
 575              	    if( blocking_sem != 0 )
 576              	       blocking_sem->signal();
 577                    
 578 s.hills 1.53 	    // If we are not on _running then ~ThreadPool has removed
 579              	    // us and now "owns" our pointer.
 580              	    if( pool->_running.remove((void *)myself) != 0 )
 581              	        pool->_pool.insert_first(myself);
 582 mday    1.47 	 }
 583              	 else
 584              	 {
 585              	    PEG_METHOD_EXIT();
 586              	    return((PEGASUS_THREAD_RETURN)0);
 587              	 }
 588 mike    1.2        }
 589 mday    1.52       catch(...)
 590 mike    1.2        {
 591 kumpf   1.14 	 PEG_METHOD_EXIT();
 592 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
 593 mike    1.2        }
 594 mday    1.51       
 595 mike    1.2     }
 596 s.hills 1.49 
 597                 // TODO: Why is this needed? Why not just continue?
 598 mike    1.2     // wait to be awakend by the thread pool destructor
 599 mday    1.50    //sleep_sem->wait();
 600 s.hills 1.49 
 601 mike    1.2     myself->test_cancel();
 602 kumpf   1.14 
 603                 PEG_METHOD_EXIT();
 604 mike    1.2     return((PEGASUS_THREAD_RETURN)0);
 605              }
 606              
 607              void ThreadPool::allocate_and_awaken(void *parm,
 608              				     PEGASUS_THREAD_RETURN \
 609 mday    1.13 				     (PEGASUS_THREAD_CDECL *work)(void *), 
 610              				     Semaphore *blocking)
 611              
 612 mike    1.2     throw(IPCException)
 613              {
 614 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
 615 mike    1.2     struct timeval start;
 616                 gettimeofday(&start, NULL);
 617 mday    1.47    Thread *th = 0;
 618                 
 619                 try 
 620                 {
 621 mday    1.52       timed_mutex(&(this->_monitor), 1000);
 622 mday    1.47       if(_dying.value())
 623                    {
 624              	 return;
 625                    }
 626                    th = _pool.remove_first();
 627                 }
 628                 catch(...)
 629                 {
 630                    return;
 631                    
 632                 }
 633                 
 634 mday    1.12    
 635 mday    1.7     // wait for the right interval and try again
 636 mday    1.47    while (th == 0 && _dying.value() < 1)
 637 mike    1.2     {
 638 mday    1.47       // will throw an IPCException& 
 639 mday    1.12       _check_deadlock(&start) ;
 640                    
 641 mday    1.21       if(_max_threads == 0 || _current_threads < _max_threads)
 642 mday    1.35       {
 643              	 th = _init_thread();
 644              	 continue;
 645                    }
 646                    pegasus_yield();
 647 mday    1.47       try
 648                    {
 649 mday    1.52 	 timed_mutex(&(this->_monitor), 1000);
 650 mday    1.47 	 if(_dying.value())
 651              	 {
 652              	    return;
 653              	 }
 654              	 th = _pool.remove_first();
 655                    }
 656                    catch(...)
 657                    {
 658              	 return ;
 659                    }
 660 mday    1.7     }
 661 chip    1.11 
 662 mday    1.47    if(_dying.value() < 1)
 663 mike    1.2     {
 664                    // initialize the thread data with the work function and parameters
 665 kumpf   1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 666                        "Initializing thread with work function and parameters: parm = %p",
 667                        parm);
 668              
 669 kumpf   1.15       th->delete_tsd("work func");
 670 chip    1.11       th->put_tsd("work func", NULL,
 671 mike    1.2  		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
 672              		  (void *)work);
 673 kumpf   1.15       th->delete_tsd("work parm");
 674 mike    1.2        th->put_tsd("work parm", NULL, sizeof(void *), parm);
 675 kumpf   1.15       th->delete_tsd("blocking sem");
 676 mday    1.13       if(blocking != 0 )
 677              	 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
 678 mday    1.47       try 
 679                    {
 680 mday    1.52 	 timed_mutex(&(this->_monitor), 1000);
 681 mday    1.47 	 if(_dying.value())
 682              	 {
 683              	    th->cancel();
 684              	    th->join();
 685              	    delete th;
 686              	    return;
 687              	 }
 688              	 
 689              	 // put the thread on the running list
 690              
 691 mike    1.2  
 692 mday    1.47 	 _running.insert_first(th);
 693 mike    1.2        // signal the thread's sleep semaphore to awaken it
 694 mday    1.47 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 695              	 
 696              	 if(sleep_sem == 0)
 697              	 {
 698              	    th->dereference_tsd();
 699              	    PEG_METHOD_EXIT();
 700              	    throw NullPointer();
 701              	 }
 702              	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
 703              	 sleep_sem->signal();
 704              	 th->dereference_tsd();
 705                    }
 706                    catch(...)
 707 mike    1.2        {
 708 mday    1.47 	 PEG_METHOD_EXIT();
 709              	 return;
 710 mike    1.2        }
 711 mday    1.47       
 712 mike    1.2     }
 713                 else
 714 mday    1.47    {
 715                    th->cancel();
 716                    th->join();
 717                    delete th;
 718                 }
 719                 
 720 kumpf   1.14    PEG_METHOD_EXIT();
 721 mike    1.2  }
 722              
 723              // caller is responsible for only calling this routine during slack periods
 724              // but should call it at least once per _deadlock_detect with the running q
 725              // and at least once per _deallocate_wait for the pool q
 726              
 727 mday    1.12 Uint32 ThreadPool::kill_dead_threads(void)
 728 mike    1.2  	 throw(IPCException)
 729              {
 730                 struct timeval now;
 731                 gettimeofday(&now, NULL);
 732 mday    1.12    Uint32 bodies = 0;
 733                 
 734 mike    1.2     // first go thread the dead q and clean it up as much as possible
 735 mday    1.47    try 
 736                 {
 737 mday    1.52       timed_mutex(&(this->_monitor), 1000);
 738 mday    1.47       if(_dying.value() )
 739                    {
 740              	 return 0;
 741                    }
 742                    
 743                    while(_dead.count() > 0 && _dying.value() == 0 )
 744                    {
 745              	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
 746              	 Thread *dead = _dead.remove_first();
 747              	 
 748              	 if(dead == 0)
 749              	    throw NullPointer();
 750              	 dead->join();
 751              	 delete dead;
 752                    }
 753                 }
 754                 catch(...)
 755 mike    1.2     {
 756                 }
 757 mday    1.47    
 758                 
 759 chip    1.11    DQueue<Thread> * map[2] =
 760 mike    1.2        {
 761              	 &_pool, &_running
 762                    };
 763 chip    1.11 
 764              
 765 mike    1.2     DQueue<Thread> *q = 0;
 766                 int i = 0;
 767                 AtomicInt needed(0);
 768 chip    1.11 
 769 kumpf   1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
 770                 // This change prevents the thread pool from killing "hung" threads.
 771                 // The definition of a "hung" thread is one that has been on the run queue
 772                 // for longer than the time interval set when the thread pool was created.
 773                 // Cancelling "hung" threads has proven to be problematic.
 774              
 775                 // With this change the thread pool will not cancel "hung" threads.  This
 776                 // may prevent a crash depending upon the state of the "hung" thread.  In
 777                 // the case that the thread is actually hung, this change causes the
 778                 // thread resources not to be reclaimed.
 779              
 780                 // Idle threads, those that have not executed a routine for a time
 781                 // interval, continue to be destroyed.  This is normal and should not
 782                 // cause any problems.
 783                 for( ; i < 1; i++)
 784              #else
 785 mday    1.30    for( ; i < 2; i++)
 786 kumpf   1.31 #endif
 787 mday    1.47    {
 788 mday    1.52       try 
 789                    {
 790              	 try_mutex(&(this->_monitor)); 
 791                    }
 792                    catch(IPCException&)
 793                    {
 794              	 return bodies;
 795                    }
 796                    
 797 mday    1.21       q = map[i];
 798 mike    1.2        if(q->count() > 0 )
 799                    {
 800 chip    1.11 	 try
 801 mike    1.2  	 {
 802 mday    1.47 	    if(_dying.value())
 803              	    {
 804              	       return bodies;
 805              	    }
 806              	    
 807 mike    1.2  	    q->try_lock();
 808              	 }
 809 mday    1.18 	 catch(...)
 810 mike    1.2  	 {
 811 mday    1.18 	    return bodies;
 812 mike    1.2  	 }
 813              
 814              	 struct timeval dt = { 0, 0 };
 815              	 struct timeval *dtp;
 816              	 Thread *th = 0;
 817              	 th = q->next(th);
 818              	 while (th != 0 )
 819              	 {
 820 chip    1.11 	    try
 821 mike    1.2  	    {
 822              	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
 823              	    }
 824 mday    1.18 	    catch(...)
 825 mike    1.2  	    {
 826 kumpf   1.25 	       q->unlock();
 827 mday    1.18 	       return bodies;
 828 mike    1.2  	    }
 829 chip    1.11 	
 830 mike    1.2  	    if(dtp != 0)
 831              	    {
 832              	       memcpy(&dt, dtp, sizeof(struct timeval));
 833              	    }
 834              	    th->dereference_tsd();
 835              	    struct timeval deadlock_timeout;
 836 mday    1.18 	    Boolean too_long;
 837              	    if( i == 0)
 838              	    {
 839              	       too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
 840              	    }
 841              	    else 
 842              	    {
 843 mday    1.22 	       too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
 844 mday    1.18 	    }
 845              	    
 846              	    if( true == too_long)
 847 mike    1.2  	    {
 848              	       // if we are deallocating from the pool, escape if we are
 849 chip    1.11 	       // down to the minimum thread count
 850 mday    1.13 	       _current_threads--;
 851 mday    1.18 	       if( _current_threads.value() < (Uint32)_min_threads )
 852 mike    1.2  	       {
 853 mday    1.13 		  if( i == 0)
 854 mike    1.2  		  {
 855 mday    1.13 		     _current_threads++;
 856 mike    1.2  		     th = q->next(th);
 857              		     continue;
 858              		  }
 859 chip    1.11 		  else
 860 mike    1.2  		  {
 861 chip    1.11 		     // we are killing a hung thread and we will drop below the
 862 mike    1.2  		     // minimum. create another thread to make up for the one
 863              		     // we are about to kill
 864              		     needed++;
 865              		  }
 866              	       }
 867 chip    1.11 	
 868 mike    1.2  	       th = q->remove_no_lock((void *)th);
 869 chip    1.11 	
 870 mike    1.2  	       if(th != 0)
 871              	       {
 872 mday    1.30 		  if( i == 0 )
 873 mike    1.2  		  {
 874 mday    1.30 		     th->delete_tsd("work func");
 875              		     th->put_tsd("work func", NULL,
 876              				 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
 877              				 (void *)&_undertaker);
 878              		     th->delete_tsd("work parm");
 879              		     th->put_tsd("work parm", NULL, sizeof(void *), th);
 880              		     
 881              		     // signal the thread's sleep semaphore to awaken it
 882              		     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 883              		     
 884              		     if(sleep_sem == 0)
 885              		     {
 886              			q->unlock();
 887              			th->dereference_tsd();
 888              			throw NullPointer();
 889              		     }
 890              		     
 891              		     bodies++;
 892 mike    1.2  		     th->dereference_tsd();
 893 mday    1.30 		     _dead.insert_first(th);
 894              		     sleep_sem->signal();
 895              		     th = 0;
 896              		  }
 897              		  else 
 898              		  {
 899              		     // deadlocked threads
 900 mday    1.34 		     Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
 901 mday    1.30 		     th->cancel();
 902              		     delete th;
 903 mike    1.2  		  }
 904              	       }
 905              	    }
 906              	    th = q->next(th);
 907 mday    1.52 	    pegasus_yield();
 908 mike    1.2  	 }
 909              	 q->unlock();
 910                    }
 911                 }
 912 mday    1.47    if(_dying.value() )
 913                    return bodies;
 914                 
 915                 while (needed.value() > 0)   {
 916                    _link_pool(_init_thread());
 917                    needed--;
 918                    pegasus_sleep(0);
 919                 }
 920 mday    1.18     return bodies; 
 921 mike    1.2  }
 922              
 923 mday    1.12 
 924 mike    1.2  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
 925              {
 926 mday    1.22    // never time out if the interval is zero
 927                 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
 928                    return false;
 929                 
 930 mday    1.36    struct timeval now, finish, remaining ;
 931 mday    1.13    Uint32 usec;
 932 mday    1.33    pegasus_gettimeofday(&now);
 933 mday    1.36    /* remove valgrind error */
 934                 pegasus_gettimeofday(&remaining);
 935                 
 936 mday    1.13 
 937                 finish.tv_sec = start->tv_sec + interval->tv_sec;
 938                 usec = start->tv_usec + interval->tv_usec;
 939                 finish.tv_sec += (usec / 1000000);
 940                 usec %= 1000000;
 941                 finish.tv_usec = usec;
 942                  
 943                 if ( timeval_subtract(&remaining, &finish, &now) )
 944 mike    1.2        return true;
 945                 else
 946                    return false;
 947              }
 948              
 949              PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
 950              {
 951 mday    1.30    exit_thread((PEGASUS_THREAD_RETURN)1);
 952                 return (PEGASUS_THREAD_RETURN)1;
 953 mike    1.2  }
 954 mday    1.19 
 955              
 956               void ThreadPool::_sleep_sem_del(void *p)
 957              {
 958                 if(p != 0)
 959                 {
 960                    delete (Semaphore *)p;
 961                 }
 962              }
 963              
 964               void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
 965              {
 966                 if (true == check_time(start, &_deadlock_detect))
 967                    throw Deadlock(pegasus_thread_self());
 968                 return;
 969              }
 970              
 971              
 972               Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
 973              {
 974                 return(check_time(start, &_deadlock_detect));
 975 mday    1.19 }
 976              
 977               Boolean ThreadPool::_check_dealloc(struct timeval *start)
 978              {
 979                 return(check_time(start, &_deallocate_wait));
 980              }
 981              
 982               Thread *ThreadPool::_init_thread(void) throw(IPCException)
 983              {
 984                 Thread *th = (Thread *) new Thread(_loop, this, false);
 985                 // allocate a sleep semaphore and pass it in the thread context
 986                 // initial count is zero, loop function will sleep until
 987                 // we signal the semaphore
 988                 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
 989                 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
 990                 
 991                 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
 992 mday    1.35    pegasus_gettimeofday(dldt);
 993                 
 994 mday    1.19    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
 995                 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
 996 chuck   1.37   
 997 mday    1.19    th->run();
 998                 _current_threads++;
 999                 pegasus_yield();
1000                 
1001                 return th;
1002              }
1003              
1004               void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1005              {
1006                 if(th == 0)
1007                    throw NullPointer();
1008 mday    1.47    try 
1009                 {
1010                    
1011 mday    1.52       timed_mutex(&(this->_monitor), 1000);
1012 mday    1.47       if(_dying.value())
1013                    {
1014              	 th->cancel();
1015              	 th->join();
1016              	 delete th;
1017                    }
1018                    
1019                    _pool.insert_first(th);
1020                    
1021                 }
1022                 catch(...)
1023                 {
1024                 }
1025 mday    1.19 }
1026 mike    1.2  
1027              
1028              PEGASUS_NAMESPACE_END
1029              

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2