(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.56 //%2003////////////////////////////////////////////////////////////////////////
   2 mike  1.2  //
   3 karl  1.56 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
   4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
   6            // IBM Corp.; EMC Corporation, The Open Group.
   7 mike  1.2  //
   8            // Permission is hereby granted, free of charge, to any person obtaining a copy
   9 chip  1.11 // of this software and associated documentation files (the "Software"), to
  10            // deal in the Software without restriction, including without limitation the
  11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  12 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
  13            // furnished to do so, subject to the following conditions:
  14 kumpf 1.17 // 
  15 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  16 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  18 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  21 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23            //
  24            //==============================================================================
  25            //
  26            // Author: Mike Day (mdday@us.ibm.com)
  27            //
  28            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
  29 chip  1.11 //              added nsk platform support
  30 kumpf 1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
  31 a.arora 1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
  32 mike    1.2  //
  33              //%/////////////////////////////////////////////////////////////////////////////
  34              
  35              #include "Thread.h"
  36              #include <Pegasus/Common/IPC.h>
  37 kumpf   1.14 #include <Pegasus/Common/Tracer.h>
  38 mike    1.2  
  39              #if defined(PEGASUS_OS_TYPE_WINDOWS)
  40 chip    1.11 # include "ThreadWindows.cpp"
  41 mike    1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
  42              # include "ThreadUnix.cpp"
  43              #elif defined(PEGASUS_OS_TYPE_NSK)
  44              # include "ThreadNsk.cpp"
  45              #else
  46              # error "Unsupported platform"
  47              #endif
  48              
  49              PEGASUS_NAMESPACE_BEGIN
  50              
  51 mday    1.42 
  52 chip    1.11 void thread_data::default_delete(void * data)
  53              {
  54 mike    1.2     if( data != NULL)
  55 chip    1.11       ::operator delete(data);
  56 mike    1.2  }
  57              
  58 chuck   1.43 // l10n start
  59              void language_delete(void * data)
  60              {
  61                 if( data != NULL)
  62                 {
  63 a.arora 1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
  64 chuck   1.43    }
  65              }
  66              // l10n end
  67              
  68 mike    1.2  Boolean Thread::_signals_blocked = false;
  69 chuck   1.37 // l10n
  70 marek   1.63 #ifndef PEGASUS_OS_ZOS
  71 mday    1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
  72 marek   1.63 #else
  73              PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
  74              #endif
  75 chuck   1.37 Boolean Thread::_key_initialized = false;
  76 chuck   1.41 Boolean Thread::_key_error = false;
  77 chuck   1.37 
  78 mike    1.2  
  79              // for non-native implementations
  80 chip    1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
  81 mike    1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
  82              {
  83 a.arora 1.64     AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
  84 chip    1.11     try
  85                  {
  86 a.arora 1.64 	_cleanup.insert_first(cu.get());
  87 chip    1.11     }
  88                  catch(IPCException&)
  89 mike    1.2      {
  90 chip    1.11 	throw;
  91 mike    1.2      }
  92 a.arora 1.64     cu.release();
  93 mike    1.2      return;
  94              }
  95 chip    1.11 	
  96 mike    1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
  97              {
  98 a.arora 1.64     AutoPtr<cleanup_handler> cu ;
  99 chip    1.11     try
 100                  {
 101 a.arora 1.64 	cu.reset(_cleanup.remove_first());
 102 mike    1.2      }
 103 chip    1.11     catch(IPCException&)
 104 mike    1.2      {
 105 chip    1.11 	PEGASUS_ASSERT(0);
 106 mike    1.2       }
 107                  if(execute == true)
 108              	cu->execute();
 109              }
 110 chip    1.11 		
 111 mike    1.2  #endif
 112              
 113              
 114 kumpf   1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 115 mike    1.2  
 116              
 117 chip    1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
 118              void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
 119              {
 120                  // execute the cleanup stack and then return
 121 mike    1.2     while( _cleanup.count() )
 122                 {
 123 chip    1.11        try
 124                     {
 125              	   cleanup_pop(true);
 126                     }
 127                     catch(IPCException&)
 128                     {
 129              	  PEGASUS_ASSERT(0);
 130              	  break;
 131 mike    1.2         }
 132                 }
 133                 _exit_code = exit_code;
 134                 exit_thread(exit_code);
 135 mday    1.4     _handle.thid = 0;
 136 mike    1.2  }
 137              
 138              
 139              #endif
 140              
 141 chuck   1.37 // l10n start
 142 chuck   1.39 Sint8 Thread::initializeKey()
 143              {
 144                 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
 145                 if (!Thread::_key_initialized)
 146                 {
 147 chuck   1.41 	if (Thread::_key_error)
 148              	{
 149                     		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 150              	        	  "Thread: ERROR - thread key error"); 
 151              		return -1;
 152              	}
 153              
 154 chuck   1.39 	if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
 155              	{
 156                      	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 157              	        	  "Thread: able to create a thread key");   
 158              	   	Thread::_key_initialized = true;	
 159              	}
 160              	else
 161              	{
 162                     		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 163              	        	  "Thread: ERROR - unable to create a thread key"); 
 164 chuck   1.41 	   	Thread::_key_error = true;
 165 chuck   1.39 		return -1;
 166              	}
 167                 }
 168              
 169                 PEG_METHOD_EXIT();
 170                 return 0;  
 171              }
 172              
 173 chuck   1.37 Thread * Thread::getCurrent()
 174              {
 175 chuck   1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");	
 176 chuck   1.40     if (Thread::initializeKey() != 0)
 177 chuck   1.39     {
 178              	return NULL;  
 179                  }
 180 chuck   1.38     PEG_METHOD_EXIT();  
 181 chuck   1.39     return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
 182              }
 183              
 184              void Thread::setCurrent(Thread * thrd)
 185              {
 186                 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
 187                 if (Thread::initializeKey() == 0)
 188                 {
 189                 	if (pegasus_set_thread_specific(Thread::_platform_thread_key,
 190              								 (void *) thrd) == 0)
 191                      {
 192                      	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 193              	        	  "Successful set Thread * into thread specific storage");   
 194                      }
 195                      else
 196                      {
 197                      	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 198              	        	  "ERROR: got error setting Thread * into thread specific storage");   
 199                      }
 200                 }
 201                 PEG_METHOD_EXIT();  
 202 chuck   1.37 }
 203              
 204              AcceptLanguages * Thread::getLanguages()
 205              {
 206 chuck   1.39     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");		
 207 chuck   1.37     
 208              	Thread * curThrd = Thread::getCurrent();
 209              	if (curThrd == NULL)
 210              		return NULL;
 211                 	AcceptLanguages * acceptLangs =
 212                 		 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
 213              	curThrd->dereference_tsd();
 214                  PEG_METHOD_EXIT(); 	
 215              	return acceptLangs;
 216              }
 217              
 218              void Thread::setLanguages(AcceptLanguages *langs) //l10n
 219              {
 220 chuck   1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
 221 chuck   1.37    		
 222                 Thread * currentThrd = Thread::getCurrent();
 223                 if (currentThrd != NULL)
 224                 {
 225                 		// deletes the old tsd and creates a new one
 226              		currentThrd->put_tsd("acceptLanguages",
 227 chuck   1.43 			language_delete, 
 228 chuck   1.37 			sizeof(AcceptLanguages *),
 229              			langs);   		
 230                 }
 231                 
 232                 PEG_METHOD_EXIT();    		
 233              }
 234              
 235              void Thread::clearLanguages() //l10n
 236              {
 237 chuck   1.39    PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
 238 chuck   1.37    	
 239                 Thread * currentThrd = Thread::getCurrent();
 240                 if (currentThrd != NULL)
 241                 {
 242                 		// deletes the old tsd
 243              		currentThrd->delete_tsd("acceptLanguages");   		
 244                 }
 245                 
 246                 PEG_METHOD_EXIT();   		
 247              }
 248              // l10n end      
 249              
 250 kumpf   1.57 #if 0
 251 mday    1.52 // two special synchronization classes for ThreadPool
 252 kumpf   1.57 //
 253 mday    1.52 
 254 kumpf   1.57 class timed_mutex
 255 mday    1.52 {
 256                 public:
 257                    timed_mutex(Mutex* mut, int msec)
 258 kumpf   1.57          :_mut(mut)
 259 mday    1.52       {
 260 kumpf   1.57          _mut->timed_lock(msec, pegasus_thread_self());
 261 mday    1.52       }
 262                    ~timed_mutex(void)
 263                    {
 264 kumpf   1.57          _mut->unlock();
 265 mday    1.52       }
 266                    Mutex* _mut;
 267              };
 268 kumpf   1.57 #endif
 269 mday    1.52 
 270              class try_mutex
 271              {
 272                 public:
 273                    try_mutex(Mutex* mut)
 274              	 :_mut(mut)
 275                    {
 276              	 _mut->try_lock(pegasus_thread_self());
 277                    }
 278                    ~try_mutex(void)
 279                    {
 280              	 _mut->unlock();
 281                    }
 282                    
 283                    Mutex* _mut;
 284              };
 285              
 286 mday    1.58 class auto_int
 287              {
 288                 public:
 289                    auto_int(AtomicInt* num)
 290              	 : _int(num)
 291                    {
 292              	 _int->operator++();
 293                    }
 294                    ~auto_int(void)
 295                    {
 296              	 _int->operator--();
 297                    }
 298                    AtomicInt *_int;
 299              };
 300              
 301              
 302              AtomicInt _idle_control;
 303 mday    1.52 
 304 mday    1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
 305              
 306              void ThreadPool::kill_idle_threads(void)
 307              {
 308                 static struct timeval now, last = {0, 0};
 309                 
 310                 pegasus_gettimeofday(&now);
 311                 if(now.tv_sec - last.tv_sec > 5)
 312                 {
 313                    _pools.lock();
 314                    ThreadPool *p = _pools.next(0);
 315                    while(p != 0)
 316                    {
 317              	 try 
 318              	 {
 319              	    p->kill_dead_threads();
 320              	 }
 321              	 catch(...)
 322              	 {
 323              	 }
 324              	 p = _pools.next(p);
 325 mday    1.20       }
 326                    _pools.unlock();
 327                    pegasus_gettimeofday(&last);
 328                 }
 329              }
 330              
 331              
 332 mike    1.2  ThreadPool::ThreadPool(Sint16 initial_size,
 333 kumpf   1.8  		       const Sint8 *key,
 334 mike    1.2  		       Sint16 min,
 335              		       Sint16 max,
 336              		       struct timeval & alloc_wait,
 337 chip    1.11 		       struct timeval & dealloc_wait,
 338 mike    1.2  		       struct timeval & deadlock_detect)
 339                 : _max_threads(max), _min_threads(min),
 340 mday    1.12      _current_threads(0),
 341                   _pool(true), _running(true),
 342 mike    1.2       _dead(true), _dying(0)
 343              {
 344                 _allocate_wait.tv_sec = alloc_wait.tv_sec;
 345                 _allocate_wait.tv_usec = alloc_wait.tv_usec;
 346 chip    1.11    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
 347 mike    1.2     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
 348                 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
 349                 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
 350                 memset(_key, 0x00, 17);
 351                 if(key != 0)
 352                    strncpy(_key, key, 16);
 353 mday    1.21    if(_max_threads > 0 && _max_threads < initial_size)
 354 mike    1.2        _max_threads = initial_size;
 355                 if(_min_threads > initial_size)
 356                    _min_threads = initial_size;
 357 chip    1.11 
 358 mike    1.2     int i;
 359                 for(i = 0; i < initial_size; i++)
 360                 {
 361                    _link_pool(_init_thread());
 362                 }
 363 mday    1.20    _pools.insert_last(this);
 364 mike    1.2  }
 365              
 366 chip    1.11 
 367 mday    1.52 // Note:   <<< Fri Oct 17 09:19:03 2003 mdd >>>
 368              // the pegasus_yield() calls that preceed each th->join() are to 
 369              // give a thread on the running list a chance to reach a cancellation
 370              // point before the join 
 371 mike    1.2  
 372              ThreadPool::~ThreadPool(void)
 373              {
 374 kumpf   1.57    PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
 375 mday    1.35    try 
 376 mday    1.47    {      
 377 kumpf   1.57       // Set the dying flag so all thread know the destructor has been entered
 378 mday    1.58       _dying++;
 379                    
 380 mday    1.52       // remove from the global pools list 
 381 mday    1.35       _pools.remove(this);
 382 mday    1.52 
 383                    // start with idle threads. 
 384 mday    1.35       Thread *th = 0;
 385                    th = _pool.remove_first();
 386 mday    1.52       Semaphore* sleep_sem;
 387                    
 388 mday    1.35       while(th != 0)
 389 mike    1.2        {
 390 mday    1.52 	 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 391 kumpf   1.57          PEGASUS_ASSERT(sleep_sem != 0);
 392              
 393 mday    1.35 	 if(sleep_sem == 0)
 394              	 {
 395              	    th->dereference_tsd();
 396              	 }
 397 kumpf   1.57          else
 398                       {
 399              	    // Signal to get the thread out of the work loop.
 400              	    sleep_sem->signal();
 401 mday    1.52 
 402 kumpf   1.57 	    // Signal to get the thread past the end. See the comment
 403              	    // "wait to be awakend by the thread pool destructor"
 404              	    // Note: the current implementation of Thread for Windows
 405              	    // does not implement "pthread" cancelation points so this
 406              	    // is needed.
 407              	    sleep_sem->signal();
 408              	    th->dereference_tsd();
 409              	    th->cancel();
 410              	    th->join();
 411              	    delete th;
 412                       }
 413 mday    1.35 	 th = _pool.remove_first();
 414 mike    1.2        }
 415 kumpf   1.57 
 416 mday    1.58       while(_idle_control.value())
 417              	 pegasus_yield();
 418                    
 419 mday    1.47       th = _dead.remove_first();
 420 mday    1.35       while(th != 0)
 421                    {
 422 mday    1.52 	 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 423 kumpf   1.57          PEGASUS_ASSERT(sleep_sem != 0);
 424 mday    1.58 	 
 425 mday    1.47 	 if(sleep_sem == 0)
 426              	 {
 427              	    th->dereference_tsd();
 428              	 }
 429 kumpf   1.57          else
 430                       {
 431                          //ATTN-DME-P3-20030322: _dead queue processing in
 432                          //ThreadPool::~ThreadPool is inconsistent with the
 433                          //processing in kill_dead_threads.  Is this correct?
 434 mday    1.58 	    
 435 kumpf   1.57 	    // signal the thread's sleep semaphore
 436              	    sleep_sem->signal();
 437              	    sleep_sem->signal();
 438              	    th->dereference_tsd();	 
 439              	    th->cancel();
 440              	    th->join();
 441              	    delete th;
 442                       }
 443 mday    1.47 	 th = _dead.remove_first();
 444 mday    1.35       }
 445 mday    1.52 
 446 mday    1.47       {
 447 mday    1.52 	 th = _running.remove_first();
 448              	 while(th != 0)
 449              	 {	 
 450              	    // signal the thread's sleep semaphore
 451              
 452              	    sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 453 kumpf   1.57             PEGASUS_ASSERT(sleep_sem != 0);
 454              
 455 mday    1.52 	    if(sleep_sem == 0 )
 456              	    {
 457              	       th->dereference_tsd();
 458              	    }
 459 kumpf   1.57             else
 460                          {
 461              	       sleep_sem->signal();
 462              	       sleep_sem->signal();
 463              	       th->dereference_tsd();
 464              	       th->cancel();
 465              	       pegasus_yield();
 466 mday    1.52 	    
 467 kumpf   1.57 	       th->join();
 468              	       delete th;
 469              	    }
 470 mday    1.52 	    th = _running.remove_first();
 471 kumpf   1.57          }
 472                    }  
 473 mike    1.2     }
 474 mday    1.52    
 475 mday    1.35    catch(...)
 476 mike    1.2     {
 477                 }
 478              }
 479              
 480 chip    1.11 // make this static to the class
 481 mike    1.2  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
 482              {
 483 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
 484              
 485 mike    1.2     Thread *myself = (Thread *)parm;
 486                 if(myself == 0)
 487 kumpf   1.14    {
 488 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 489                        "ThreadPool::_loop: Thread pointer is null");
 490 kumpf   1.14       PEG_METHOD_EXIT();
 491 mike    1.2        throw NullPointer();
 492 kumpf   1.14    }
 493 chuck   1.37    
 494              // l10n
 495                 // Set myself into thread specific storage
 496 chuck   1.38    // This will allow code to get its own Thread
 497 chuck   1.39    Thread::setCurrent(myself);	
 498              
 499 mike    1.2     ThreadPool *pool = (ThreadPool *)myself->get_parm();
 500 kumpf   1.14    if(pool == 0 ) 
 501                 {
 502 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 503                        "ThreadPool::_loop: ThreadPool pointer is null");
 504 kumpf   1.14       PEG_METHOD_EXIT();
 505 mike    1.2        throw NullPointer();
 506 kumpf   1.14    }
 507 mday    1.52    if(pool->_dying.value())
 508                 {
 509 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 510                        "ThreadPool::_loop: ThreadPool is dying(1)");
 511 mday    1.52       PEG_METHOD_EXIT();
 512                    return((PEGASUS_THREAD_RETURN)0);
 513                 }
 514                 
 515 mike    1.5     Semaphore *sleep_sem = 0;
 516 mday    1.13    Semaphore *blocking_sem = 0;
 517                 
 518 mike    1.5     struct timeval *deadlock_timer = 0;
 519 mday    1.47    
 520 chip    1.11    try
 521 mike    1.2     {
 522                    sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
 523                    myself->dereference_tsd();
 524                    deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
 525 mday    1.22       myself->dereference_tsd(); 
 526 mike    1.2     }
 527 mday    1.52 
 528 mday    1.30    catch(...)
 529                 {
 530 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 531                        "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer");
 532 mday    1.30       PEG_METHOD_EXIT();
 533 mday    1.52       return((PEGASUS_THREAD_RETURN)0);
 534 mday    1.30    }
 535                 
 536 mike    1.2     if(sleep_sem == 0 || deadlock_timer == 0)
 537 kumpf   1.14    {
 538 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 539                        "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
 540 kumpf   1.14       PEG_METHOD_EXIT();
 541 mday    1.52       return((PEGASUS_THREAD_RETURN)0);
 542 kumpf   1.14    }
 543 mike    1.2  
 544 mday    1.54    while(1)
 545 mike    1.2     {
 546 mday    1.58       if(pool->_dying.value())
 547              	 break;
 548                    
 549 mday    1.52       try 
 550                    {
 551              	 sleep_sem->wait();
 552                    }
 553                    catch(IPCException& )
 554                    {
 555 kumpf   1.57          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 556                        "ThreadPool::_loop: failure on sleep_sem->wait().");
 557 mday    1.52 	 PEG_METHOD_EXIT();
 558              	 return((PEGASUS_THREAD_RETURN)0);
 559                    }
 560                    
 561 mike    1.2        // when we awaken we reside on the running queue, not the pool queue
 562 mday    1.47       
 563 mike    1.5        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
 564                    void *parm = 0;
 565 mike    1.2  
 566 chip    1.11       try
 567 mike    1.2        {
 568              	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
 569              	    myself->reference_tsd("work func");
 570              	 myself->dereference_tsd();
 571              	 parm = myself->reference_tsd("work parm");
 572              	 myself->dereference_tsd();
 573 mday    1.13 	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
 574              	 myself->dereference_tsd();
 575              
 576 mike    1.2        }
 577 mike    1.6        catch(IPCException &)
 578 mike    1.2        {
 579 kumpf   1.57          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 580                         "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
 581 kumpf   1.14 	 PEG_METHOD_EXIT();
 582 mday    1.52 	 return((PEGASUS_THREAD_RETURN)0);
 583 mike    1.2        }
 584 mday    1.52       
 585 mike    1.2        if(_work == 0)
 586 kumpf   1.14       {
 587 kumpf   1.57          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 588                         "ThreadPool::_loop: work func is null.");
 589 kumpf   1.14          PEG_METHOD_EXIT();
 590 kumpf   1.57 	 return((PEGASUS_THREAD_RETURN)0);
 591 kumpf   1.14       }
 592 kumpf   1.24 
 593                    if(_work ==
 594                       (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
 595                    {
 596 kumpf   1.57          PEG_METHOD_EXIT();
 597 mday    1.23 	 _work(parm);
 598 kumpf   1.24       }
 599              
 600 mike    1.2        gettimeofday(deadlock_timer, NULL);
 601 kumpf   1.57 
 602                    if (pool->_dying.value() == 0)
 603 mday    1.20       {
 604 kumpf   1.57          try 
 605                       {
 606              	    _work(parm);
 607                       }
 608 kumpf   1.59          catch(Exception & e)
 609                       {
 610                          PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 611                             String("Exception from _work in ThreadPool::_loop: ") +
 612                                e.getMessage());
 613                          PEG_METHOD_EXIT();
 614                          return((PEGASUS_THREAD_RETURN)0);
 615                       }
 616 kumpf   1.57          catch(...)
 617                       {
 618                          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 619                            "ThreadPool::_loop: execution of _work failed.");
 620                          PEG_METHOD_EXIT();
 621              	    return((PEGASUS_THREAD_RETURN)0);
 622                       }
 623                     }
 624 chuck   1.37       
 625 chip    1.11       // put myself back onto the available list
 626                    try
 627 mike    1.2        {
 628 mday    1.47 	 if(pool->_dying.value() == 0)
 629              	 {
 630              	    gettimeofday(deadlock_timer, NULL);
 631              	    if( blocking_sem != 0 )
 632              	       blocking_sem->signal();
 633                    
 634 s.hills 1.53 	    // If we are not on _running then ~ThreadPool has removed
 635              	    // us and now "owns" our pointer.
 636 kumpf   1.57 	    if ( pool->_running.remove((void *)myself) != 0 )
 637                          {
 638              	       pool->_pool.insert_first(myself);
 639                          }
 640                          else
 641                          {
 642 kumpf   1.60                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 643                                "ThreadPool::_loop: Failed to remove thread from running queue.");
 644 kumpf   1.57                PEG_METHOD_EXIT();
 645 mday    1.54 	       return((PEGASUS_THREAD_RETURN)0);
 646 kumpf   1.57             }
 647 mday    1.47 	 }
 648              	 else
 649              	 {
 650              	    PEG_METHOD_EXIT();
 651              	    return((PEGASUS_THREAD_RETURN)0);
 652              	 }
 653 mike    1.2        }
 654 mday    1.52       catch(...)
 655 mike    1.2        {
 656 kumpf   1.57         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 657                           "ThreadPool::_loop: Adding thread to idle pool failed.");
 658 kumpf   1.14 	 PEG_METHOD_EXIT();
 659 mday    1.47 	 return((PEGASUS_THREAD_RETURN)0);
 660 mike    1.2        }
 661 mday    1.51       
 662 mike    1.2     }
 663 s.hills 1.49 
 664                 // TODO: Why is this needed? Why not just continue?
 665 mike    1.2     // wait to be awakend by the thread pool destructor
 666 mday    1.50    //sleep_sem->wait();
 667 s.hills 1.49 
 668 mike    1.2     myself->test_cancel();
 669 kumpf   1.14 
 670                 PEG_METHOD_EXIT();
 671 mike    1.2     return((PEGASUS_THREAD_RETURN)0);
 672              }
 673              
 674 kumpf   1.59 Boolean ThreadPool::allocate_and_awaken(void *parm,
 675              				        PEGASUS_THREAD_RETURN \
 676              				        (PEGASUS_THREAD_CDECL *work)(void *), 
 677              				        Semaphore *blocking)
 678 mike    1.2     throw(IPCException)
 679              {
 680 kumpf   1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
 681 kumpf   1.57 
 682                 // Allocate_and_awaken will not run if the _dying flag is set.
 683                 // Once the lock is acquired, ~ThreadPool will not change
 684                 // the value of _dying until the lock is released.
 685 mday    1.47    
 686 kumpf   1.57    try
 687 mday    1.47    {
 688 kumpf   1.57       if (_dying.value())
 689 mday    1.47       {
 690 kumpf   1.57          Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 691                        "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
 692 kumpf   1.59          // ATTN: Error result has not yet been defined
 693                       return true;
 694 kumpf   1.57       }
 695                    struct timeval now;
 696                    struct timeval start;
 697                    gettimeofday(&start, NULL);
 698                    Thread *th = 0;
 699                 
 700 mday    1.47       th = _pool.remove_first();
 701                 
 702 kumpf   1.59       if (th == 0)
 703 kumpf   1.57       {
 704                       // will throw an IPCException& 
 705                       _check_deadlock(&start) ;
 706 mday    1.12       
 707 kumpf   1.57          if(_max_threads == 0 || _current_threads < _max_threads)
 708                       {
 709              	    th = _init_thread();
 710                       }
 711 kumpf   1.59       }
 712              
 713                    if (th == 0)
 714                    {
 715 kumpf   1.60         // ATTN-DME-P3-20031103: This trace message should not be
 716                      // be labeled TRC_DISCARDED_DATA, because it does not
 717                      // necessarily imply that a failure has occurred.  However,
 718                      // this label is being used temporarily to help isolate
 719                      // the cause of client timeout problems.
 720              
 721                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 722                         "ThreadPool::allocate_and_awaken: Insufficient resources: "
 723                         " pool = %s, running threads = %d, idle threads = %d, dead threads = %d ",
 724                         _key, _running.count(), _pool.count(), _dead.count());
 725 kumpf   1.59          return false;
 726 mday    1.47       }
 727 chip    1.11 
 728 mike    1.2        // initialize the thread data with the work function and parameters
 729 kumpf   1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
 730 kumpf   1.57          "Initializing thread with work function and parameters: parm = %p",
 731 kumpf   1.14           parm);
 732              
 733 kumpf   1.15       th->delete_tsd("work func");
 734 chip    1.11       th->put_tsd("work func", NULL,
 735 mike    1.2  		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
 736              		  (void *)work);
 737 kumpf   1.15       th->delete_tsd("work parm");
 738 mike    1.2        th->put_tsd("work parm", NULL, sizeof(void *), parm);
 739 kumpf   1.15       th->delete_tsd("blocking sem");
 740 mday    1.13       if(blocking != 0 )
 741 kumpf   1.57            th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
 742 mday    1.47 
 743 kumpf   1.57       // put the thread on the running list
 744                    _running.insert_first(th);
 745 mike    1.2  
 746                    // signal the thread's sleep semaphore to awaken it
 747 kumpf   1.57       Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 748 mday    1.47 	 
 749 kumpf   1.57       if(sleep_sem == 0)
 750 mike    1.2        {
 751 kumpf   1.57          th->dereference_tsd();
 752                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 753                         "ThreadPool::allocate_and_awaken: thread data is corrupted.");
 754                       PEG_METHOD_EXIT();
 755                       throw NullPointer();
 756 mike    1.2        }
 757 kumpf   1.57       Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
 758                    sleep_sem->signal();
 759                    th->dereference_tsd();
 760 mike    1.2     }
 761 kumpf   1.57    catch (...)
 762 mday    1.47    {
 763 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 764                        "ThreadPool::allocate_and_awaken: Operation Failed.");
 765                    PEG_METHOD_EXIT();
 766 kumpf   1.59       // ATTN: Error result has not yet been defined
 767                    return true;
 768 mday    1.47    }
 769 kumpf   1.14    PEG_METHOD_EXIT();
 770 kumpf   1.59    return true;
 771 mike    1.2  }
 772              
 773              // caller is responsible for only calling this routine during slack periods
 774              // but should call it at least once per _deadlock_detect with the running q
 775              // and at least once per _deallocate_wait for the pool q
 776              
 777 mday    1.12 Uint32 ThreadPool::kill_dead_threads(void)
 778 mike    1.2  	 throw(IPCException)
 779              {
 780 kumpf   1.57    // Since the kill_dead_threads, ThreadPool or allocate_and_awaken 
 781                 // manipulate the threads on the ThreadPool queues, they should never 
 782                 // be allowed to run at the same time. 
 783              
 784 mday    1.58    // << Thu Oct 23 14:41:02 2003 mdd >> 
 785                 // not true, the queues are thread safe. they are syncrhonized. 
 786 kumpf   1.57 
 787 mday    1.58    auto_int do_not_destruct(&_idle_control);
 788                 
 789 kumpf   1.57    try
 790 mday    1.47    {
 791 kumpf   1.57       if (_dying.value())
 792 mday    1.47       {
 793 kumpf   1.57          return 0;
 794 mday    1.47       }
 795 mday    1.58       
 796 kumpf   1.57       struct timeval now;
 797                    gettimeofday(&now, NULL);
 798                    Uint32 bodies = 0;
 799                 
 800                    // first go thread the dead q and clean it up as much as possible
 801                    try 
 802 mday    1.47       {
 803 mday    1.58          while(_dying.value() == 0 && _dead.count() > 0)
 804 kumpf   1.57          {
 805              	    Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
 806                          Thread *dead = _dead.remove_first();
 807 mday    1.58 	    
 808              	    if(dead )
 809              	    {
 810              	       dead->join();
 811              	       delete dead;
 812              	    }
 813 kumpf   1.57          }
 814                    }
 815                    catch(...)
 816                    {
 817 mday    1.47       }
 818                 
 819 mday    1.58       if (_dying.value())
 820                    {
 821                       return 0;
 822                    }
 823 mday    1.47    
 824 kumpf   1.57       DQueue<Thread> * map[2] =
 825                       {
 826                    	    &_pool, &_running
 827                       };
 828 chip    1.11 
 829              
 830 kumpf   1.57       DQueue<Thread> *q = 0;
 831                    int i = 0;
 832                    AtomicInt needed(0);
 833 mday    1.58       Thread *th = 0;
 834                    internal_dq idq;
 835                    
 836 kumpf   1.62 #ifdef PEGASUS_KILL_LONG_RUNNING_THREADS
 837                    // Defining PEGASUS_KILL_LONG_RUNNING_THREADS causes the thread pool
 838                    // to kill threads that are on the _running queue longer than the
 839                    // _deadlock_detect time interval specified for the thread pool.
 840                    // Cancelling long-running threads has proven to be problematic and
 841                    // may cause a crash depending on the state of the thread when it is
 842                    // killed.  Use this option with care.
 843                    for( ; i < 2; i++)
 844              #else
 845 kumpf   1.57       for( ; i < 1; i++)
 846 kumpf   1.31 #endif
 847 mday    1.52       {
 848 kumpf   1.57          q = map[i];
 849                       if(q->count() > 0 )
 850                       {
 851 chip    1.11 	    try
 852 mike    1.2  	    {
 853 kumpf   1.57 	       q->try_lock();
 854 mike    1.2  	    }
 855 mday    1.18 	    catch(...)
 856 mike    1.2  	    {
 857 mday    1.18 	       return bodies;
 858 mike    1.2  	    }
 859 kumpf   1.57 
 860              	    struct timeval dt = { 0, 0 };
 861              	    struct timeval *dtp;
 862 mday    1.58 
 863 kumpf   1.57 	    th = q->next(th);
 864              	    while (th != 0 )
 865              	    {
 866              	       try
 867              	       {
 868              	          dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
 869              	       }
 870              	       catch(...)
 871              	       {
 872              	          q->unlock();
 873              	          return bodies;
 874              	       }
 875 chip    1.11 	
 876 kumpf   1.57 	       if(dtp != 0)
 877              	       {
 878              	          memcpy(&dt, dtp, sizeof(struct timeval));
 879              	       }
 880              	       th->dereference_tsd();
 881              	       struct timeval deadlock_timeout;
 882              	       Boolean too_long;
 883              	       if( i == 0)
 884              	       {
 885              	          too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
 886              	       }
 887              	       else 
 888              	       {
 889              	          too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
 890              	       }
 891 mday    1.18 	    
 892 kumpf   1.57 	       if( true == too_long)
 893 mike    1.2  	       {
 894 kumpf   1.57 	          // if we are deallocating from the pool, escape if we are
 895              	          // down to the minimum thread count
 896              	          _current_threads--;
 897              	          if( _current_threads.value() < (Uint32)_min_threads )
 898              	          {
 899              		     if( i == 0)
 900              		     {
 901              		        _current_threads++;
 902              		        th = q->next(th);
 903              		        continue;
 904              		     }
 905              		     else
 906              		     {
 907              		        // we are killing a hung thread and we will drop below the
 908              		        // minimum. create another thread to make up for the one
 909              		        // we are about to kill
 910              		        needed++;
 911              		     }
 912              	          }
 913 chip    1.11 	
 914 kumpf   1.57 	          th = q->remove_no_lock((void *)th);
 915 mday    1.58 		  idq.insert_first((void*)th);
 916 mike    1.2  	       }
 917 kumpf   1.57 	       th = q->next(th);
 918 mike    1.2  	    }
 919 kumpf   1.57 	    q->unlock();
 920                       }
 921 mday    1.58 
 922              	 th = (Thread*)idq.remove_last();
 923              	 while(th != 0)
 924              	 {
 925              	    if( i == 0 )
 926              	    {
 927              	       th->delete_tsd("work func");
 928              	       th->put_tsd("work func", NULL,
 929              			   sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
 930              			   (void *)&_undertaker);
 931              	       th->delete_tsd("work parm");
 932              	       th->put_tsd("work parm", NULL, sizeof(void *), th);
 933              	       
 934              	       // signal the thread's sleep semaphore to awaken it
 935              	       Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
 936 kumpf   1.61 	       PEGASUS_ASSERT(sleep_sem != 0);
 937 mday    1.58 	       
 938              	       bodies++;
 939              	       th->dereference_tsd();
 940 kumpf   1.59 	       // Putting thread on _dead queue delays availability to others
 941              	       //_dead.insert_first(th);
 942 mday    1.58 	       sleep_sem->signal();
 943 kumpf   1.59 	       th->join();  // Note: Clean up the thread here rather than
 944              	       delete th;   // leave it sitting unused on the _dead queue
 945 mday    1.58 	       th = 0;
 946              	    }
 947              	    else 
 948              	    {
 949              	       // deadlocked threads
 950 kumpf   1.62                struct timeval deadlock_timeout;
 951                             Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
 952                                           "A thread has run longer than %u seconds and "
 953                                               "will be cancelled.",
 954                                           Uint32(_deadlock_detect.tv_sec));
 955                             Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
 956                                           Logger::SEVERE,
 957                                           "Common.Thread.CANCEL_LONG_RUNNING_THREAD",
 958                                           "A thread has run longer than {0} seconds and "
 959                                               "will be cancelled.",
 960                                           Uint32(_deadlock_detect.tv_sec));
 961 mday    1.58 	       th->cancel();
 962              	       delete th;
 963              	    }
 964              	    th = (Thread*)idq.remove_last();
 965              	 }
 966 mike    1.2        }
 967 kumpf   1.57 
 968                    while (needed.value() > 0)   {
 969                       _link_pool(_init_thread());
 970                       needed--;
 971                       pegasus_sleep(0);
 972                    }
 973                     return bodies; 
 974                  }
 975 mday    1.58     catch (...)
 976 kumpf   1.57     {
 977                  }
 978                  return 0;
 979 mike    1.2  }
 980              
 981 mday    1.12 
 982 mike    1.2  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
 983              {
 984 mday    1.22    // never time out if the interval is zero
 985                 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
 986                    return false;
 987                 
 988 mday    1.55    struct timeval now , finish , remaining ;
 989 mday    1.13    Uint32 usec;
 990 mday    1.33    pegasus_gettimeofday(&now);
 991 mday    1.36    /* remove valgrind error */
 992                 pegasus_gettimeofday(&remaining);
 993                 
 994 mday    1.13 
 995                 finish.tv_sec = start->tv_sec + interval->tv_sec;
 996                 usec = start->tv_usec + interval->tv_usec;
 997                 finish.tv_sec += (usec / 1000000);
 998                 usec %= 1000000;
 999                 finish.tv_usec = usec;
1000                  
1001                 if ( timeval_subtract(&remaining, &finish, &now) )
1002 mike    1.2        return true;
1003                 else
1004                    return false;
1005              }
1006              
1007              PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
1008              {
1009 mday    1.30    exit_thread((PEGASUS_THREAD_RETURN)1);
1010                 return (PEGASUS_THREAD_RETURN)1;
1011 mike    1.2  }
1012 mday    1.19 
1013              
1014               void ThreadPool::_sleep_sem_del(void *p)
1015              {
1016                 if(p != 0)
1017                 {
1018                    delete (Semaphore *)p;
1019                 }
1020              }
1021              
1022               void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
1023              {
1024                 if (true == check_time(start, &_deadlock_detect))
1025                    throw Deadlock(pegasus_thread_self());
1026                 return;
1027              }
1028              
1029              
1030               Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
1031              {
1032                 return(check_time(start, &_deadlock_detect));
1033 mday    1.19 }
1034              
1035               Boolean ThreadPool::_check_dealloc(struct timeval *start)
1036              {
1037                 return(check_time(start, &_deallocate_wait));
1038              }
1039              
1040               Thread *ThreadPool::_init_thread(void) throw(IPCException)
1041              {
1042                 Thread *th = (Thread *) new Thread(_loop, this, false);
1043                 // allocate a sleep semaphore and pass it in the thread context
1044                 // initial count is zero, loop function will sleep until
1045                 // we signal the semaphore
1046                 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
1047                 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
1048                 
1049                 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
1050 mday    1.35    pegasus_gettimeofday(dldt);
1051                 
1052 mday    1.19    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
1053                 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
1054 chuck   1.37   
1055 kumpf   1.59    if (!th->run())
1056                 {
1057                    delete th;
1058                    return 0;
1059                 }
1060 mday    1.19    _current_threads++;
1061                 pegasus_yield();
1062                 
1063                 return th;
1064              }
1065              
1066               void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1067              {
1068                 if(th == 0)
1069 kumpf   1.57    {
1070                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1071                        "ThreadPool::_link_pool: Thread pointer is null.");
1072 mday    1.19       throw NullPointer();
1073 kumpf   1.57    }
1074 mday    1.47    try 
1075                 {
1076                    _pool.insert_first(th);
1077                 }
1078                 catch(...)
1079                 {
1080 kumpf   1.57       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1081                        "ThreadPool::_link_pool: _pool.insert_first failed.");
1082 mday    1.47    }
1083 mday    1.19 }
1084 mike    1.2  
1085              
1086              PEGASUS_NAMESPACE_END
1087              

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2