(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.71 //%2004////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.56 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 mike  1.2  //
 10            // Permission is hereby granted, free of charge, to any person obtaining a copy
 11 chip  1.11 // of this software and associated documentation files (the "Software"), to
 12            // deal in the Software without restriction, including without limitation the
 13            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 14 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 15            // furnished to do so, subject to the following conditions:
 16 kumpf 1.17 // 
 17 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 18 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 19            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 20 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 21            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 22            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 23 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 24            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 25            //
 26            //==============================================================================
 27            //
 28            // Author: Mike Day (mdday@us.ibm.com)
 29            //
 30            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 31 chip  1.11 //              added nsk platform support
 32 kumpf 1.59 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 33 a.arora 1.64 //              Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 34 mike    1.2  //
 35              //%/////////////////////////////////////////////////////////////////////////////
 36              
 37              #include "Thread.h"
 38 kumpf   1.68 #include <exception>
 39 mike    1.2  #include <Pegasus/Common/IPC.h>
 40 kumpf   1.14 #include <Pegasus/Common/Tracer.h>
 41 mike    1.2  
 42              #if defined(PEGASUS_OS_TYPE_WINDOWS)
 43 chip    1.11 # include "ThreadWindows.cpp"
 44 mike    1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 45              # include "ThreadUnix.cpp"
 46              #elif defined(PEGASUS_OS_TYPE_NSK)
 47              # include "ThreadNsk.cpp"
 48              #else
 49              # error "Unsupported platform"
 50              #endif
 51              
 52 kumpf   1.69 PEGASUS_USING_STD;
 53 mike    1.2  PEGASUS_NAMESPACE_BEGIN
 54              
 55 mday    1.42 
 56 chip    1.11 void thread_data::default_delete(void * data)
 57              {
 58 mike    1.2     if( data != NULL)
 59 chip    1.11       ::operator delete(data);
 60 mike    1.2  }
 61              
 62 chuck   1.43 // l10n start
 63              void language_delete(void * data)
 64              {
 65                 if( data != NULL)
 66                 {
 67 a.arora 1.64       AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
 68 chuck   1.43    }
 69              }
 70              // l10n end
 71              
 72 mike    1.2  Boolean Thread::_signals_blocked = false;
 73 chuck   1.37 // l10n
 74 marek   1.63 #ifndef PEGASUS_OS_ZOS
 75 w.otsuka 1.71.2.2 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key =
 76                       PEGASUS_THREAD_KEY_TYPE(-1);
 77 marek    1.63     #else
 78                   PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
 79                   #endif
 80 chuck    1.37     Boolean Thread::_key_initialized = false;
 81 chuck    1.41     Boolean Thread::_key_error = false;
 82 chuck    1.37     
 83 mike     1.2      
 84                   void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 85                   {
 86 a.arora  1.64         AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
 87 a.arora  1.65         _cleanup.insert_first(cu.get());
 88 a.arora  1.64         cu.release();
 89 mike     1.2          return;
 90                   }
 91 chip     1.11     	
 92 mike     1.2      void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 93                   {
 94 a.arora  1.64         AutoPtr<cleanup_handler> cu ;
 95 chip     1.11         try
 96                       {
 97 a.arora  1.64     	cu.reset(_cleanup.remove_first());
 98 mike     1.2          }
 99 chip     1.11         catch(IPCException&)
100 mike     1.2          {
101 chip     1.11     	PEGASUS_ASSERT(0);
102 mike     1.2           }
103                       if(execute == true)
104                   	cu->execute();
105                   }
106 kumpf    1.71.2.3 		
107 mike     1.2      
108 kumpf    1.8      //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
109 mike     1.2      
110                   
111 chip     1.11     #ifndef PEGASUS_THREAD_EXIT_NATIVE
112                   void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113                   {
114                       // execute the cleanup stack and then return
115 mike     1.2         while( _cleanup.count() )
116                      {
117 chip     1.11            try
118                          {
119                   	   cleanup_pop(true);
120                          }
121                          catch(IPCException&)
122                          {
123                   	  PEGASUS_ASSERT(0);
124                   	  break;
125 mike     1.2             }
126                      }
127                      _exit_code = exit_code;
128                      exit_thread(exit_code);
129 mday     1.4         _handle.thid = 0;
130 mike     1.2      }
131                   
132                   
133                   #endif
134                   
135 chuck    1.37     // l10n start
136 chuck    1.39     Sint8 Thread::initializeKey()
137                   {
138                      PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139                      if (!Thread::_key_initialized)
140                      {
141 chuck    1.41     	if (Thread::_key_error)
142                   	{
143                          		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144                   	        	  "Thread: ERROR - thread key error"); 
145                   		return -1;
146                   	}
147                   
148 chuck    1.39     	if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149                   	{
150                           	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151                   	        	  "Thread: able to create a thread key");   
152                   	   	Thread::_key_initialized = true;	
153                   	}
154                   	else
155                   	{
156                          		Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157                   	        	  "Thread: ERROR - unable to create a thread key"); 
158 chuck    1.41     	   	Thread::_key_error = true;
159 chuck    1.39     		return -1;
160                   	}
161                      }
162                   
163                      PEG_METHOD_EXIT();
164                      return 0;  
165                   }
166                   
167 chuck    1.37     Thread * Thread::getCurrent()
168                   {
169 chuck    1.39         PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");	
170 chuck    1.40         if (Thread::initializeKey() != 0)
171 chuck    1.39         {
172                   	return NULL;  
173                       }
174 chuck    1.38         PEG_METHOD_EXIT();  
175 chuck    1.39         return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
176                   }
177                   
178                   void Thread::setCurrent(Thread * thrd)
179                   {
180                      PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181                      if (Thread::initializeKey() == 0)
182                      {
183                      	if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184                   								 (void *) thrd) == 0)
185                           {
186                           	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187                   	        	  "Successful set Thread * into thread specific storage");   
188                           }
189                           else
190                           {
191                           	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192                   	        	  "ERROR: got error setting Thread * into thread specific storage");   
193                           }
194                      }
195                      PEG_METHOD_EXIT();  
196 chuck    1.37     }
197                   
198                   AcceptLanguages * Thread::getLanguages()
199                   {
200 chuck    1.39         PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");		
201 chuck    1.37         
202                   	Thread * curThrd = Thread::getCurrent();
203                   	if (curThrd == NULL)
204                   		return NULL;
205                      	AcceptLanguages * acceptLangs =
206                      		 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207                   	curThrd->dereference_tsd();
208                       PEG_METHOD_EXIT(); 	
209                   	return acceptLangs;
210                   }
211                   
212                   void Thread::setLanguages(AcceptLanguages *langs) //l10n
213                   {
214 chuck    1.39        PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
215 chuck    1.37        		
216                      Thread * currentThrd = Thread::getCurrent();
217                      if (currentThrd != NULL)
218                      {
219                      		// deletes the old tsd and creates a new one
220                   		currentThrd->put_tsd("acceptLanguages",
221 chuck    1.43     			language_delete, 
222 chuck    1.37     			sizeof(AcceptLanguages *),
223                   			langs);   		
224                      }
225                      
226                      PEG_METHOD_EXIT();    		
227                   }
228                   
229                   void Thread::clearLanguages() //l10n
230                   {
231 chuck    1.39        PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
232 chuck    1.37        	
233                      Thread * currentThrd = Thread::getCurrent();
234                      if (currentThrd != NULL)
235                      {
236                      		// deletes the old tsd
237                   		currentThrd->delete_tsd("acceptLanguages");   		
238                      }
239                      
240                      PEG_METHOD_EXIT();   		
241                   }
242                   // l10n end      
243                   
244 kumpf    1.57     #if 0
245 mday     1.52     // two special synchronization classes for ThreadPool
246 kumpf    1.57     //
247 mday     1.52     
248 kumpf    1.57     class timed_mutex
249 mday     1.52     {
250                      public:
251                         timed_mutex(Mutex* mut, int msec)
252 kumpf    1.57              :_mut(mut)
253 mday     1.52           {
254 kumpf    1.57              _mut->timed_lock(msec, pegasus_thread_self());
255 mday     1.52           }
256                         ~timed_mutex(void)
257                         {
258 kumpf    1.57              _mut->unlock();
259 mday     1.52           }
260                         Mutex* _mut;
261                   };
262 kumpf    1.57     #endif
263 mday     1.52     
264                   class try_mutex
265                   {
266                      public:
267                         try_mutex(Mutex* mut)
268                   	 :_mut(mut)
269                         {
270                   	 _mut->try_lock(pegasus_thread_self());
271                         }
272                         ~try_mutex(void)
273                         {
274                   	 _mut->unlock();
275                         }
276                         
277                         Mutex* _mut;
278                   };
279                   
280 mday     1.58     class auto_int
281                   {
282                      public:
283                         auto_int(AtomicInt* num)
284                   	 : _int(num)
285                         {
286                   	 _int->operator++();
287                         }
288                         ~auto_int(void)
289                         {
290                   	 _int->operator--();
291                         }
292                         AtomicInt *_int;
293                   };
294                   
295                   
296                   AtomicInt _idle_control;
297 mday     1.52     
298 mday     1.20     DQueue<ThreadPool> ThreadPool::_pools(true);
299                   
300                   void ThreadPool::kill_idle_threads(void)
301                   {
302                      static struct timeval now, last = {0, 0};
303                      
304                      pegasus_gettimeofday(&now);
305                      if(now.tv_sec - last.tv_sec > 5)
306                      {
307                         _pools.lock();
308                         ThreadPool *p = _pools.next(0);
309                         while(p != 0)
310                         {
311                   	 try 
312                   	 {
313                   	    p->kill_dead_threads();
314                   	 }
315                   	 catch(...)
316                   	 {
317                   	 }
318                   	 p = _pools.next(p);
319 mday     1.20           }
320                         _pools.unlock();
321                         pegasus_gettimeofday(&last);
322                      }
323                   }
324                   
325                   
326 mike     1.2      ThreadPool::ThreadPool(Sint16 initial_size,
327 kumpf    1.8      		       const Sint8 *key,
328 mike     1.2      		       Sint16 min,
329                   		       Sint16 max,
330                   		       struct timeval & alloc_wait,
331 chip     1.11     		       struct timeval & dealloc_wait,
332 mike     1.2      		       struct timeval & deadlock_detect)
333                      : _max_threads(max), _min_threads(min),
334 mday     1.12          _current_threads(0),
335                        _pool(true), _running(true),
336 mike     1.2           _dead(true), _dying(0)
337                   {
338                      _allocate_wait.tv_sec = alloc_wait.tv_sec;
339                      _allocate_wait.tv_usec = alloc_wait.tv_usec;
340 chip     1.11        _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
341 mike     1.2         _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
342                      _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
343                      _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
344                      memset(_key, 0x00, 17);
345                      if(key != 0)
346                         strncpy(_key, key, 16);
347 mday     1.21        if(_max_threads > 0 && _max_threads < initial_size)
348 mike     1.2            _max_threads = initial_size;
349                      if(_min_threads > initial_size)
350                         _min_threads = initial_size;
351 chip     1.11     
352 mike     1.2         int i;
353                      for(i = 0; i < initial_size; i++)
354                      {
355                         _link_pool(_init_thread());
356                      }
357 mday     1.20        _pools.insert_last(this);
358 mike     1.2      }
359                   
360                   ThreadPool::~ThreadPool(void)
361                   {
362 kumpf    1.57        PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
363 mday     1.35        try 
364 mday     1.47        {      
365 kumpf    1.57           // Set the dying flag so all thread know the destructor has been entered
366 mday     1.58           _dying++;
367                         
368 mday     1.52           // remove from the global pools list 
369 mday     1.35           _pools.remove(this);
370 mday     1.52     
371 kumpf    1.71.2.6       while(_current_threads.value() > 0)
372 mike     1.2            {
373 kumpf    1.71.2.6          Thread* thread = _pool.remove_first();
374                            if (thread != 0)
375 kumpf    1.57              {
376 kumpf    1.71.2.6             _cleanupThread(thread);
377                               _current_threads--;
378 kumpf    1.57              }
379                   
380                            else
381                            {
382 kumpf    1.71.2.6             pegasus_yield();
383 kumpf    1.57              }
384 mday     1.35           }
385 mike     1.2         }
386 mday     1.35        catch(...)
387 mike     1.2         {
388                      }
389                   }
390                   
391 chip     1.11     // make this static to the class
392 mike     1.2      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
393                   {
394 kumpf    1.14        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
395                   
396 mike     1.2         Thread *myself = (Thread *)parm;
397                      if(myself == 0)
398 kumpf    1.14        {
399 kumpf    1.57           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
400                             "ThreadPool::_loop: Thread pointer is null");
401 kumpf    1.14           PEG_METHOD_EXIT();
402 mike     1.2            throw NullPointer();
403 kumpf    1.14        }
404 chuck    1.37        
405                   // l10n
406                      // Set myself into thread specific storage
407 chuck    1.38        // This will allow code to get its own Thread
408 chuck    1.39        Thread::setCurrent(myself);	
409                   
410 mike     1.2         ThreadPool *pool = (ThreadPool *)myself->get_parm();
411 kumpf    1.14        if(pool == 0 ) 
412                      {
413 kumpf    1.57           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
414                             "ThreadPool::_loop: ThreadPool pointer is null");
415 kumpf    1.14           PEG_METHOD_EXIT();
416 mike     1.2            throw NullPointer();
417 kumpf    1.14        }
418 mday     1.52        
419 mike     1.5         Semaphore *sleep_sem = 0;
420 mday     1.13        Semaphore *blocking_sem = 0;
421                      
422 mike     1.5         struct timeval *deadlock_timer = 0;
423 mday     1.47        
424 chip     1.11        try
425 mike     1.2         {
426                         sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
427                         myself->dereference_tsd();
428 kumpf    1.71.2.6       PEGASUS_ASSERT(sleep_sem != 0);
429                   
430 mike     1.2            deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
431 mday     1.22           myself->dereference_tsd(); 
432 kumpf    1.71.2.6       PEGASUS_ASSERT(deadlock_timer != 0);
433 mike     1.2         }
434 mday     1.30        catch(...)
435                      {
436 kumpf    1.57           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
437 konrad.r 1.67     		    "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
438 kumpf    1.71.2.6       PEGASUS_ASSERT(false);
439                         pool->_pool.remove(myself);
440                         pool->_current_threads--;
441 mday     1.30           PEG_METHOD_EXIT();
442 kumpf    1.71.2.6       return((PEGASUS_THREAD_RETURN)1);
443 mday     1.30        }
444                      
445 mday     1.54        while(1)
446 mike     1.2         {
447 mday     1.52           try 
448                         {
449 kumpf    1.71.2.6 	 sleep_sem->wait();
450 konrad.r 1.67           }
451 kumpf    1.71.2.6       catch(...)
452 mday     1.52           {
453 kumpf    1.57              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
454 konrad.r 1.67     	   "ThreadPool::_loop: failure on sleep_sem->wait().");
455 kumpf    1.71.2.6          PEGASUS_ASSERT(false);
456                            pool->_pool.remove(myself);
457                            pool->_current_threads--;
458 mday     1.52     	 PEG_METHOD_EXIT();
459 kumpf    1.71.2.6 	 return((PEGASUS_THREAD_RETURN)1);
460 mday     1.52           }
461                         
462 mike     1.2            // when we awaken we reside on the running queue, not the pool queue
463 konrad.r 1.67           /* Hence no need to move the thread to the _dead queue, as the _running
464                          * queue is only dused by kill_dead_threads which makes sure that the
465                          * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
466                          * before killing it.
467                          */
468 mday     1.47           
469 mike     1.5            PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
470                         void *parm = 0;
471 kumpf    1.71.2.6       Semaphore* blocking_sem = 0;
472 mike     1.2      
473 chip     1.11           try
474 mike     1.2            {
475                   	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
476                   	    myself->reference_tsd("work func");
477                   	 myself->dereference_tsd();
478                   	 parm = myself->reference_tsd("work parm");
479                   	 myself->dereference_tsd();
480 mday     1.13     	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
481                   	 myself->dereference_tsd();
482                   
483 mike     1.2            }
484 kumpf    1.71.2.6       catch(...)
485 mike     1.2            {
486 kumpf    1.57              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
487                              "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
488 kumpf    1.71.2.6          PEGASUS_ASSERT(false);
489                            pool->_pool.remove(myself);
490                            pool->_current_threads--;
491 kumpf    1.14     	 PEG_METHOD_EXIT();
492 kumpf    1.71.2.6 	 return((PEGASUS_THREAD_RETURN)1);
493 mike     1.2            }
494 mday     1.52           
495 mike     1.2            if(_work == 0)
496 kumpf    1.14           {
497 kumpf    1.57              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
498 kumpf    1.71.2.6            "ThreadPool::_loop: work func is 0, meaning we should exit.");
499                   	 break;
500 kumpf    1.24           }
501                   
502 mike     1.2            gettimeofday(deadlock_timer, NULL);
503 kumpf    1.57     
504 kumpf    1.71.2.6       try 
505 mday     1.20           {
506 kumpf    1.71.2.6          PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
507                               "Worker started");
508                            _work(parm);
509                            PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
510                               "Worker finished");
511                         }
512                         catch(Exception & e)
513                         {
514                            PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
515                               String("Exception from _work in ThreadPool::_loop: ") +
516                                  e.getMessage());
517                         }
518 kumpf    1.68     #if !defined(PEGASUS_OS_LSB)
519 kumpf    1.71.2.6       catch (exception& e)
520                         {
521                            PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
522                               String("Exception from _work in ThreadPool::_loop: ") +
523                                  e.what());
524                         }
525 kumpf    1.68     #endif
526 kumpf    1.71.2.6       catch(...)
527                         {
528                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
529                              "ThreadPool::_loop: execution of _work failed.");
530                         }
531 chuck    1.37           
532 chip     1.11           // put myself back onto the available list
533                         try
534 mike     1.2            {
535 kumpf    1.71.2.6 	 gettimeofday(deadlock_timer, NULL);
536                   	 if( blocking_sem != 0 )
537                   	    blocking_sem->signal();
538                   
539                            Boolean removed = pool->_running.remove((void *)myself);
540                            PEGASUS_ASSERT(removed);
541                   
542                            pool->_pool.insert_first(myself);
543 mike     1.2            }
544 mday     1.52           catch(...)
545 mike     1.2            {
546 kumpf    1.57             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
547                                "ThreadPool::_loop: Adding thread to idle pool failed.");
548 kumpf    1.71.2.6          PEGASUS_ASSERT(false);
549                            pool->_current_threads--;
550 kumpf    1.14     	 PEG_METHOD_EXIT();
551 kumpf    1.71.2.6 	 return((PEGASUS_THREAD_RETURN)1);
552 mike     1.2            }
553 mday     1.51           
554 mike     1.2         }
555 s.hills  1.49     
556 kumpf    1.14        PEG_METHOD_EXIT();
557 mike     1.2         return((PEGASUS_THREAD_RETURN)0);
558                   }
559                   
560 denise.eckstein 1.71.2.5 ThreadStatus ThreadPool::allocate_and_awaken(void *parm,
561                                                                  PEGASUS_THREAD_RETURN \
562                                                                  (PEGASUS_THREAD_CDECL *work)(void *),
563                                                                  Semaphore *blocking)
564                          
565 mike            1.2      {
566 kumpf           1.14        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
567 kumpf           1.57     
568                             // Allocate_and_awaken will not run if the _dying flag is set.
569                             // Once the lock is acquired, ~ThreadPool will not change
570                             // the value of _dying until the lock is released.
571 mday            1.47        
572 kumpf           1.57        try
573 mday            1.47        {
574 kumpf           1.57           if (_dying.value())
575 mday            1.47           {
576 kumpf           1.57              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
577                                    "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
578 denise.eckstein 1.71.2.5          return PEGASUS_THREAD_UNAVAILABLE;
579 kumpf           1.57           }
580                                struct timeval start;
581                                gettimeofday(&start, NULL);
582                                Thread *th = 0;
583                             
584 mday            1.47           th = _pool.remove_first();
585                             
586 kumpf           1.59           if (th == 0)
587 kumpf           1.57           {
588                                   // will throw an IPCException& 
589                                   _check_deadlock(&start) ;
590 mday            1.12           
591 kumpf           1.57              if(_max_threads == 0 || _current_threads < _max_threads)
592                                   {
593                          	    th = _init_thread();
594                                   }
595 kumpf           1.59           }
596                          
597                                if (th == 0)
598                                {
599 kumpf           1.60             // ATTN-DME-P3-20031103: This trace message should not be
600                                  // be labeled TRC_DISCARDED_DATA, because it does not
601                                  // necessarily imply that a failure has occurred.  However,
602                                  // this label is being used temporarily to help isolate
603                                  // the cause of client timeout problems.
604                          
605                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
606                                     "ThreadPool::allocate_and_awaken: Insufficient resources: "
607                                     " pool = %s, running threads = %d, idle threads = %d, dead threads = %d ",
608                                     _key, _running.count(), _pool.count(), _dead.count());
609 denise.eckstein 1.71.2.5          return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
610 mday            1.47           }
611 chip            1.11     
612 mike            1.2            // initialize the thread data with the work function and parameters
613 kumpf           1.14           Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
614 kumpf           1.57              "Initializing thread with work function and parameters: parm = %p",
615 kumpf           1.14               parm);
616                          
617 kumpf           1.15           th->delete_tsd("work func");
618 chip            1.11           th->put_tsd("work func", NULL,
619 mike            1.2      		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
620                          		  (void *)work);
621 kumpf           1.15           th->delete_tsd("work parm");
622 mike            1.2            th->put_tsd("work parm", NULL, sizeof(void *), parm);
623 kumpf           1.15           th->delete_tsd("blocking sem");
624 mday            1.13           if(blocking != 0 )
625 kumpf           1.57                th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
626 mday            1.47     
627 kumpf           1.57           // put the thread on the running list
628                                _running.insert_first(th);
629 mike            1.2      
630                                // signal the thread's sleep semaphore to awaken it
631 kumpf           1.57           Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
632 denise.eckstein 1.71.2.5       PEGASUS_ASSERT(sleep_sem != 0);
633                          
634 kumpf           1.57           Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
635                                sleep_sem->signal();
636                                th->dereference_tsd();
637 mike            1.2         }
638 kumpf           1.57        catch (...)
639 mday            1.47        {
640 kumpf           1.57           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
641                                    "ThreadPool::allocate_and_awaken: Operation Failed.");
642                                PEG_METHOD_EXIT();
643 kumpf           1.59           // ATTN: Error result has not yet been defined
644 denise.eckstein 1.71.2.5       return PEGASUS_THREAD_SETUP_FAILURE;
645 mday            1.47        }
646 kumpf           1.14        PEG_METHOD_EXIT();
647 denise.eckstein 1.71.2.5    return PEGASUS_THREAD_OK;
648 mike            1.2      }
649                          
650                          // caller is responsible for only calling this routine during slack periods
651                          // but should call it at least once per _deadlock_detect with the running q
652                          // and at least once per _deallocate_wait for the pool q
653                          
654 mday            1.12     Uint32 ThreadPool::kill_dead_threads(void)
655 mike            1.2      	 throw(IPCException)
656                          {
657 konrad.r        1.67        PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
658 kumpf           1.57     
659 kumpf           1.71.2.6    Uint32 numThreadsCleanedUp = 0;
660                          
661                              Uint32 numIdleThreads = _pool.count();
662                              for (Uint32 i = 0; i < numIdleThreads; i++)
663 kumpf           1.57         {
664 kumpf           1.71.2.6         // Do not dip below the minimum thread count
665                                  if (_current_threads.value() <= (Uint32)_min_threads)
666                                  {
667                                      break;
668                                  }
669                          
670                                  Thread* thread = _pool.remove_last();
671                          
672                                  // If there are no more threads in the _pool queue, we're done.
673                                  if (thread == 0)
674                                  {
675                                      break;
676                                  }
677                          
678                                  struct timeval* lastActivityTime;
679                                  try
680                                  {
681                                      lastActivityTime = (struct timeval *)thread->try_reference_tsd(
682                                          "deadlock timer");
683                                      PEGASUS_ASSERT(lastActivityTime != 0);
684                                  }
685 kumpf           1.71.2.6         catch (...)
686                                  {
687                                      PEGASUS_ASSERT(false);
688                                      _pool.insert_last(thread);
689                                      break;
690                                  }
691                          
692                                  Boolean cleanupThisThread =
693                                      check_time(lastActivityTime, &_deallocate_wait);
694                                  thread->dereference_tsd();
695                          
696                                  if (cleanupThisThread)
697                                  {
698                                      _cleanupThread(thread);
699                                      _current_threads--;
700                                      numThreadsCleanedUp++;
701                                  }
702                                  else
703                                  {
704                                      _pool.insert_first(thread);
705                                  }
706 kumpf           1.57         }
707 kumpf           1.71.2.6 
708                              PEG_METHOD_EXIT();
709                              return numThreadsCleanedUp;
710 mike            1.2      }
711                          
712 kumpf           1.71.2.6 void ThreadPool::_cleanupThread(Thread* th)
713                          {
714                              PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
715                          
716                              // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
717                              th->delete_tsd("work func");
718                              th->put_tsd(
719                                  "work func", NULL,
720                                  sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
721                                  (void *) 0);
722                              th->delete_tsd("work parm");
723                              th->put_tsd("work parm", NULL, sizeof(void *), 0);
724                          
725                              // signal the thread's sleep semaphore to awaken it
726                              Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
727                              PEGASUS_ASSERT(sleep_sem != 0);
728                              sleep_sem->signal();
729                              th->dereference_tsd();
730                          
731                              th->join();
732                              delete th;
733 kumpf           1.71.2.6 
734                              PEG_METHOD_EXIT();
735                          }
736 mday            1.12     
737 mike            1.2      Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
738                          {
739 mday            1.22        // never time out if the interval is zero
740                             if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
741                                return false;
742                             
743 mday            1.55        struct timeval now , finish , remaining ;
744 mday            1.13        Uint32 usec;
745 mday            1.33        pegasus_gettimeofday(&now);
746 mday            1.36        /* remove valgrind error */
747                             pegasus_gettimeofday(&remaining);
748                             
749 mday            1.13     
750                             finish.tv_sec = start->tv_sec + interval->tv_sec;
751                             usec = start->tv_usec + interval->tv_usec;
752                             finish.tv_sec += (usec / 1000000);
753                             usec %= 1000000;
754                             finish.tv_usec = usec;
755                              
756                             if ( timeval_subtract(&remaining, &finish, &now) )
757 mike            1.2            return true;
758                             else
759                                return false;
760                          }
761                          
762                          PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
763                          {
764 konrad.r        1.67        
765                             PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
766 mday            1.30        exit_thread((PEGASUS_THREAD_RETURN)1);
767 konrad.r        1.67        PEG_METHOD_EXIT();
768 mday            1.30        return (PEGASUS_THREAD_RETURN)1;
769 mike            1.2      }
770 mday            1.19     
771 konrad.r        1.67     PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
772                          {
773                            PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
774                            ThreadPool *pool = (ThreadPool *)t->get_parm();
775                            if(pool == 0 ) {
776                              Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
777                          		  "Could not obtain the pool information from the Thread.", t);
778                          
779                                return (PEGASUS_THREAD_RETURN)1;
780                            }
781                            if (pool->_pool.exists(t))
782                              {
783                                if (pool->_pool.remove( (void *) t) != 0)
784                          	{
785                              	Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
786                          		"Moving thread %p", t);
787                          	/* We are moving the thread to the _running queue b/c
788                          	_only_ kill_dead_threads has enough logic to take care 
789                          	of cleaning up the threads.*/
790                          
791                          	  pool->_running.insert_first( t );	  
792 konrad.r        1.67     	}
793                                else
794                          	{
795                          	  Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
796                          			"Could not move Thread %p from _pool to _runing queue.", t);
797                          	  return (PEGASUS_THREAD_RETURN)1;
798                          	}
799                              }
800                          
801                            else if (pool->_running.exists(t))
802                              {
803                          	 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
804                          			"Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
805                          	  return (PEGASUS_THREAD_RETURN)1;
806                              }
807                            if (!pool->_dead.exists(t)) 
808                              {
809                                Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
810                          		    "Thread is not on any queue! Moving it to the running queue.");
811                                pool->_running.insert_first( t );	
812                              }
813 konrad.r        1.67       PEG_METHOD_EXIT();
814                            return (PEGASUS_THREAD_RETURN)0;
815                          }
816 mday            1.19     
817                           void ThreadPool::_sleep_sem_del(void *p)
818                          {
819                             if(p != 0)
820                             {
821                                delete (Semaphore *)p;
822                             }
823                          }
824                          
825                           void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
826                          {
827                             if (true == check_time(start, &_deadlock_detect))
828                                throw Deadlock(pegasus_thread_self());
829                             return;
830                          }
831                          
832                          
833                           Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
834                          {
835                             return(check_time(start, &_deadlock_detect));
836                          }
837 mday            1.19     
838                           Boolean ThreadPool::_check_dealloc(struct timeval *start)
839                          {
840                             return(check_time(start, &_deallocate_wait));
841                          }
842                          
843                           Thread *ThreadPool::_init_thread(void) throw(IPCException)
844                          {
845 konrad.r        1.67       PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
846 mday            1.19        Thread *th = (Thread *) new Thread(_loop, this, false);
847                             // allocate a sleep semaphore and pass it in the thread context
848                             // initial count is zero, loop function will sleep until
849                             // we signal the semaphore
850                             Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
851                             th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
852                             
853                             struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
854 mday            1.35        pegasus_gettimeofday(dldt);
855                             
856 mday            1.19        th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
857                             // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
858 denise.eckstein 1.71.2.5 
859                             if (th->run() != PEGASUS_THREAD_OK)
860 kumpf           1.59        {
861 denise.eckstein 1.71.2.5        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
862                                    "Could not create thread. Error code is %d.", errno);
863                                  delete th;
864                                  return 0;
865 kumpf           1.59        }
866 mday            1.19        _current_threads++;
867                             pegasus_yield();
868 denise.eckstein 1.71.2.5 
869                             PEG_METHOD_EXIT();
870 mday            1.19        return th;
871                          }
872                          
873                           void ThreadPool::_link_pool(Thread *th) throw(IPCException)
874                          {
875                             if(th == 0)
876 kumpf           1.57        {
877                                Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
878                                    "ThreadPool::_link_pool: Thread pointer is null.");
879 mday            1.19           throw NullPointer();
880 kumpf           1.57        }
881 mday            1.47        try 
882                             {
883                                _pool.insert_first(th);
884                             }
885                             catch(...)
886                             {
887 kumpf           1.57           Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
888                                    "ThreadPool::_link_pool: _pool.insert_first failed.");
889 mday            1.47        }
890 mday            1.19     }
891 mike            1.2      
892                          
893                          PEGASUS_NAMESPACE_END
894                          

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2