(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 chip  1.11 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12 kumpf 1.17 // 
 13 chip  1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 chip  1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Day (mdday@us.ibm.com)
 25            //
 26            // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 27 chip  1.11 //              added nsk platform support
 28 mike  1.2  //
 29            //%/////////////////////////////////////////////////////////////////////////////
 30            
 31            #include "Thread.h"
 32            #include <Pegasus/Common/IPC.h>
 33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
 34 mike  1.2  
 35            #if defined(PEGASUS_OS_TYPE_WINDOWS)
 36 chip  1.11 # include "ThreadWindows.cpp"
 37 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
 38            # include "ThreadUnix.cpp"
 39            #elif defined(PEGASUS_OS_TYPE_NSK)
 40            # include "ThreadNsk.cpp"
 41            #else
 42            # error "Unsupported platform"
 43            #endif
 44            
 45            PEGASUS_NAMESPACE_BEGIN
 46            
 47 chip  1.11 void thread_data::default_delete(void * data)
 48            {
 49 mike  1.2     if( data != NULL)
 50 chip  1.11       ::operator delete(data);
 51 mike  1.2  }
 52            
 53            Boolean Thread::_signals_blocked = false;
 54            
 55            // for non-native implementations
 56 chip  1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
 57 mike  1.2  void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 58            {
 59                cleanup_handler *cu = new cleanup_handler(routine, parm);
 60 chip  1.11     try
 61                {
 62            	_cleanup.insert_first(cu);
 63                }
 64                catch(IPCException&)
 65 mike  1.2      {
 66            	delete cu;
 67 chip  1.11 	throw;
 68 mike  1.2      }
 69                return;
 70            }
 71 chip  1.11 	
 72 mike  1.2  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 73            {
 74                cleanup_handler *cu ;
 75 chip  1.11     try
 76                {
 77 mike  1.2  	cu = _cleanup.remove_first() ;
 78                }
 79 chip  1.11     catch(IPCException&)
 80 mike  1.2      {
 81 chip  1.11 	PEGASUS_ASSERT(0);
 82 mike  1.2       }
 83                if(execute == true)
 84            	cu->execute();
 85                delete cu;
 86            }
 87 chip  1.11 		
 88 mike  1.2  #endif
 89            
 90            
 91 kumpf 1.8  //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 92 mike  1.2  
 93            
 94 chip  1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
 95            void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
 96            {
 97                // execute the cleanup stack and then return
 98 mike  1.2     while( _cleanup.count() )
 99               {
100 chip  1.11        try
101                   {
102            	   cleanup_pop(true);
103                   }
104                   catch(IPCException&)
105                   {
106            	  PEGASUS_ASSERT(0);
107            	  break;
108 mike  1.2         }
109               }
110               _exit_code = exit_code;
111               exit_thread(exit_code);
112 mday  1.4     _handle.thid = 0;
113 mike  1.2  }
114            
115            
116            #endif
117            
118 mday  1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
119            
120            
121            void ThreadPool::kill_idle_threads(void)
122            {
123               static struct timeval now, last = {0, 0};
124               
125               pegasus_gettimeofday(&now);
126               if(now.tv_sec - last.tv_sec > 5)
127               {
128                  _pools.lock();
129                  ThreadPool *p = _pools.next(0);
130                  while(p != 0)
131                  {
132            	 try 
133            	 {
134            	    p->kill_dead_threads();
135            	 }
136            	 catch(...)
137            	 {
138            	 }
139 mday  1.20 	 p = _pools.next(p);
140                  }
141                  _pools.unlock();
142                  pegasus_gettimeofday(&last);
143               }
144            }
145            
146            
147 mike  1.2  ThreadPool::ThreadPool(Sint16 initial_size,
148 kumpf 1.8  		       const Sint8 *key,
149 mike  1.2  		       Sint16 min,
150            		       Sint16 max,
151            		       struct timeval & alloc_wait,
152 chip  1.11 		       struct timeval & dealloc_wait,
153 mike  1.2  		       struct timeval & deadlock_detect)
154               : _max_threads(max), _min_threads(min),
155 mday  1.12      _current_threads(0),
156                 _pool(true), _running(true),
157 mike  1.2       _dead(true), _dying(0)
158            {
159               _allocate_wait.tv_sec = alloc_wait.tv_sec;
160               _allocate_wait.tv_usec = alloc_wait.tv_usec;
161 chip  1.11    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
162 mike  1.2     _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
163               _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
164               _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
165               memset(_key, 0x00, 17);
166               if(key != 0)
167                  strncpy(_key, key, 16);
168 mday  1.21    if(_max_threads > 0 && _max_threads < initial_size)
169 mike  1.2        _max_threads = initial_size;
170               if(_min_threads > initial_size)
171                  _min_threads = initial_size;
172 chip  1.11 
173 mike  1.2     int i;
174               for(i = 0; i < initial_size; i++)
175               {
176                  _link_pool(_init_thread());
177               }
178 mday  1.20    _pools.insert_last(this);
179               
180 mike  1.2  }
181            
182 chip  1.11 
183 mike  1.2  
184            ThreadPool::~ThreadPool(void)
185            {
186 mday  1.35    try 
187 chip  1.11    {
188 mday  1.35       _pools.remove(this);
189                  _dying++;
190                  Thread *th = 0;
191                  th = _pool.remove_first();
192                  while(th != 0)
193 mike  1.2        {
194 mday  1.35 	 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
195            	 
196            	 if(sleep_sem == 0)
197            	 {
198            	    th->dereference_tsd();
199            	    throw NullPointer();
200            	 }
201            	 
202            	 sleep_sem->signal();
203            	 sleep_sem->signal();
204 mike  1.2  	 th->dereference_tsd();
205 mday  1.35 	 // signal the thread's sleep semaphore
206            	 th->cancel();
207            	 th->join();
208            	 th->empty_tsd();
209            	 delete th;
210            	 th = _pool.remove_first();
211 mike  1.2        }
212 chip  1.11 
213 mday  1.35       th = _running.remove_first();
214                  while(th != 0)
215                  {
216            	 // signal the thread's sleep semaphore
217            	 th->cancel();
218            	 th->join();
219            	 th->empty_tsd();
220            	 delete th;
221            	 th = _running.remove_first();
222                  }
223 mike  1.2  
224 mday  1.35       th = _dead.remove_first();
225                  while(th != 0)
226                  {
227            	 // signal the thread's sleep semaphore
228            	 th->cancel();
229            	 th->join();
230            	 th->empty_tsd();
231            	 delete th;
232            	 th = _dead.remove_first();
233                  }
234 mike  1.2     }
235 mday  1.35    catch(...)
236 mike  1.2     {
237               }
238            }
239            
240 chip  1.11 // make this static to the class
241 mike  1.2  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
242            {
243 kumpf 1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
244            
245 mike  1.2     Thread *myself = (Thread *)parm;
246               if(myself == 0)
247 kumpf 1.14    {
248                  PEG_METHOD_EXIT();
249 mike  1.2        throw NullPointer();
250 kumpf 1.14    }
251 mike  1.2     ThreadPool *pool = (ThreadPool *)myself->get_parm();
252 kumpf 1.14    if(pool == 0 ) 
253               {
254                  PEG_METHOD_EXIT();
255 mike  1.2        throw NullPointer();
256 kumpf 1.14    }
257 mike  1.5     Semaphore *sleep_sem = 0;
258 mday  1.13    Semaphore *blocking_sem = 0;
259               
260 mike  1.5     struct timeval *deadlock_timer = 0;
261 chip  1.11 
262               try
263 mike  1.2     {
264                  sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
265                  myself->dereference_tsd();
266                  deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
267 mday  1.22       myself->dereference_tsd(); 
268 mike  1.2     }
269 mike  1.6     catch(IPCException &)
270 mike  1.2     {
271 kumpf 1.14       PEG_METHOD_EXIT();
272 mike  1.2        myself->exit_self(0);
273               }
274 mday  1.30    catch(...)
275               {
276                  PEG_METHOD_EXIT();
277                  myself->exit_self(0);
278               }
279               
280 mike  1.2     if(sleep_sem == 0 || deadlock_timer == 0)
281 kumpf 1.14    {
282                  PEG_METHOD_EXIT();
283 mike  1.2        throw NullPointer();
284 kumpf 1.14    }
285 mike  1.2  
286               while(pool->_dying < 1)
287               {
288                  sleep_sem->wait();
289 mday  1.35         
290 mike  1.2        // when we awaken we reside on the running queue, not the pool queue
291                  if(pool->_dying > 0)
292            	 break;
293 chip  1.11 
294 mike  1.5        PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
295                  void *parm = 0;
296 mike  1.2  
297 chip  1.11       try
298 mike  1.2        {
299            	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
300            	    myself->reference_tsd("work func");
301            	 myself->dereference_tsd();
302            	 parm = myself->reference_tsd("work parm");
303            	 myself->dereference_tsd();
304 mday  1.13 	 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
305            	 myself->dereference_tsd();
306            
307 mike  1.2        }
308 mike  1.6        catch(IPCException &)
309 mike  1.2        {
310 kumpf 1.14 	 PEG_METHOD_EXIT();
311 mike  1.2  	 myself->exit_self(0);
312                  }
313 chip  1.11 
314 mike  1.2        if(_work == 0)
315 kumpf 1.14       {
316                     PEG_METHOD_EXIT();
317 mike  1.2  	 throw NullPointer();
318 kumpf 1.14       }
319 kumpf 1.24 
320                  if(_work ==
321                     (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
322                  {
323 mday  1.23 	 _work(parm);
324 kumpf 1.24       }
325            
326 mike  1.2        gettimeofday(deadlock_timer, NULL);
327 mday  1.20       try 
328                  {
329            	 _work(parm);
330                  }
331                  catch(...)
332                  {
333            	 gettimeofday(deadlock_timer, NULL);
334                  }
335                  gettimeofday(deadlock_timer, NULL);
336 mday  1.13       if( blocking_sem != 0 )
337            	 blocking_sem->signal();
338                  
339 chip  1.11       // put myself back onto the available list
340                  try
341 mike  1.2        {
342            	 pool->_running.remove((void *)myself);
343            	 pool->_link_pool(myself);
344                  }
345 mike  1.6        catch(IPCException &)
346 mike  1.2        {
347 kumpf 1.14 	 PEG_METHOD_EXIT();
348 mike  1.2  	 myself->exit_self(0);
349                  }
350               }
351               // wait to be awakend by the thread pool destructor
352               sleep_sem->wait();
353               myself->test_cancel();
354 kumpf 1.14 
355               PEG_METHOD_EXIT();
356 mike  1.2     myself->exit_self(0);
357               return((PEGASUS_THREAD_RETURN)0);
358            }
359            
360            void ThreadPool::allocate_and_awaken(void *parm,
361            				     PEGASUS_THREAD_RETURN \
362 mday  1.13 				     (PEGASUS_THREAD_CDECL *work)(void *), 
363            				     Semaphore *blocking)
364            
365 mike  1.2     throw(IPCException)
366            {
367 kumpf 1.14    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
368 mike  1.2     struct timeval start;
369               gettimeofday(&start, NULL);
370 chip  1.11 
371 mike  1.2     Thread *th = _pool.remove_first();
372 mday  1.12    
373 mday  1.7     // wait for the right interval and try again
374 mday  1.33    while (th == 0 && _dying < 1)
375 mike  1.2     {
376 mday  1.12       _check_deadlock(&start) ;
377                  
378 mday  1.21       if(_max_threads == 0 || _current_threads < _max_threads)
379 mday  1.35       {
380            	 th = _init_thread();
381            	 continue;
382                  }
383                  pegasus_yield();
384 mday  1.7        th = _pool.remove_first();
385               }
386 chip  1.11 
387            
388 mike  1.2     if(_dying < 1)
389               {
390                  // initialize the thread data with the work function and parameters
391 kumpf 1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
392                      "Initializing thread with work function and parameters: parm = %p",
393                      parm);
394            
395 kumpf 1.15       th->delete_tsd("work func");
396 chip  1.11       th->put_tsd("work func", NULL,
397 mike  1.2  		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
398            		  (void *)work);
399 kumpf 1.15       th->delete_tsd("work parm");
400 mike  1.2        th->put_tsd("work parm", NULL, sizeof(void *), parm);
401 kumpf 1.15       th->delete_tsd("blocking sem");
402 mday  1.13       if(blocking != 0 )
403            	 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
404                  
405 chip  1.11       // put the thread on the running list
406 mike  1.2        _running.insert_first(th);
407            
408                  // signal the thread's sleep semaphore to awaken it
409                  Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
410            
411                  if(sleep_sem == 0)
412                  {
413            	 th->dereference_tsd();
414 kumpf 1.14          PEG_METHOD_EXIT();
415 mike  1.2  	 throw NullPointer();
416                  }
417 kumpf 1.14       Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
418 mike  1.2        sleep_sem->signal();
419                  th->dereference_tsd();
420               }
421               else
422                  _pool.insert_first(th);
423 kumpf 1.14 
424               PEG_METHOD_EXIT();
425 mike  1.2  }
426            
427            // caller is responsible for only calling this routine during slack periods
428            // but should call it at least once per _deadlock_detect with the running q
429            // and at least once per _deallocate_wait for the pool q
430            
431 mday  1.12 Uint32 ThreadPool::kill_dead_threads(void)
432 mike  1.2  	 throw(IPCException)
433            {
434               struct timeval now;
435               gettimeofday(&now, NULL);
436 mday  1.12    Uint32 bodies = 0;
437               
438 mike  1.2     // first go thread the dead q and clean it up as much as possible
439               while(_dead.count() > 0)
440               {
441 mday  1.34       Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
442 mike  1.2        Thread *dead = _dead.remove_first();
443                  if(dead == 0)
444            	 throw NullPointer();
445 mday  1.30       dead->join();
446 mike  1.2        delete dead;
447               }
448 chip  1.11 
449               DQueue<Thread> * map[2] =
450 mike  1.2        {
451            	 &_pool, &_running
452                  };
453 chip  1.11 
454            
455 mike  1.2     DQueue<Thread> *q = 0;
456               int i = 0;
457               AtomicInt needed(0);
458 chip  1.11 
459 kumpf 1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
460               // This change prevents the thread pool from killing "hung" threads.
461               // The definition of a "hung" thread is one that has been on the run queue
462               // for longer than the time interval set when the thread pool was created.
463               // Cancelling "hung" threads has proven to be problematic.
464            
465               // With this change the thread pool will not cancel "hung" threads.  This
466               // may prevent a crash depending upon the state of the "hung" thread.  In
467               // the case that the thread is actually hung, this change causes the
468               // thread resources not to be reclaimed.
469            
470               // Idle threads, those that have not executed a routine for a time
471               // interval, continue to be destroyed.  This is normal and should not
472               // cause any problems.
473               for( ; i < 1; i++)
474            #else
475 mday  1.30    for( ; i < 2; i++)
476 kumpf 1.31 #endif
477 mday  1.21    { 
478                  q = map[i];
479 mike  1.2        if(q->count() > 0 )
480                  {
481 chip  1.11 	 try
482 mike  1.2  	 {
483            	    q->try_lock();
484            	 }
485 mday  1.18 	 catch(...)
486 mike  1.2  	 {
487 mday  1.18 	    return bodies;
488 mike  1.2  	 }
489            
490            	 struct timeval dt = { 0, 0 };
491            	 struct timeval *dtp;
492            	 Thread *th = 0;
493            	 th = q->next(th);
494            	 while (th != 0 )
495            	 {
496 chip  1.11 	    try
497 mike  1.2  	    {
498            	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
499            	    }
500 mday  1.18 	    catch(...)
501 mike  1.2  	    {
502 kumpf 1.25 	       q->unlock();
503 mday  1.18 	       return bodies;
504 mike  1.2  	    }
505 chip  1.11 	
506 mike  1.2  	    if(dtp != 0)
507            	    {
508            	       memcpy(&dt, dtp, sizeof(struct timeval));
509            	    }
510            	    th->dereference_tsd();
511            	    struct timeval deadlock_timeout;
512 mday  1.18 	    Boolean too_long;
513            	    if( i == 0)
514            	    {
515            	       too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
516            	    }
517            	    else 
518            	    {
519 mday  1.22 	       too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
520 mday  1.18 	    }
521            	    
522            	    if( true == too_long)
523 mike  1.2  	    {
524            	       // if we are deallocating from the pool, escape if we are
525 chip  1.11 	       // down to the minimum thread count
526 mday  1.13 	       _current_threads--;
527 mday  1.18 	       if( _current_threads.value() < (Uint32)_min_threads )
528 mike  1.2  	       {
529 mday  1.13 		  if( i == 0)
530 mike  1.2  		  {
531 mday  1.13 		     _current_threads++;
532 mike  1.2  		     th = q->next(th);
533            		     continue;
534            		  }
535 chip  1.11 		  else
536 mike  1.2  		  {
537 chip  1.11 		     // we are killing a hung thread and we will drop below the
538 mike  1.2  		     // minimum. create another thread to make up for the one
539            		     // we are about to kill
540            		     needed++;
541            		  }
542            	       }
543 chip  1.11 	
544 mike  1.2  	       th = q->remove_no_lock((void *)th);
545 chip  1.11 	
546 mike  1.2  	       if(th != 0)
547            	       {
548 mday  1.30 		  if( i == 0 )
549 mike  1.2  		  {
550 mday  1.30 		     th->delete_tsd("work func");
551            		     th->put_tsd("work func", NULL,
552            				 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
553            				 (void *)&_undertaker);
554            		     th->delete_tsd("work parm");
555            		     th->put_tsd("work parm", NULL, sizeof(void *), th);
556            		     
557            		     // signal the thread's sleep semaphore to awaken it
558            		     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
559            		     
560            		     if(sleep_sem == 0)
561            		     {
562            			q->unlock();
563            			th->dereference_tsd();
564            			throw NullPointer();
565            		     }
566            		     
567            		     bodies++;
568 mike  1.2  		     th->dereference_tsd();
569 mday  1.30 		     _dead.insert_first(th);
570            		     sleep_sem->signal();
571            		     th = 0;
572            		  }
573            		  else 
574            		  {
575            		     // deadlocked threads
576 mday  1.34 		     Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
577 mday  1.30 		     th->cancel();
578            		     delete th;
579 mike  1.2  		  }
580            	       }
581            	    }
582            	    th = q->next(th);
583 mday  1.20 	    pegasus_sleep(1);
584 mike  1.2  	 }
585            	 q->unlock();
586            	 while (needed.value() > 0)
587            	 {
588            	    _link_pool(_init_thread());
589            	    needed--;
590 mday  1.20 	    pegasus_sleep(0);
591 mike  1.2  	 }
592                  }
593               }
594 mday  1.18     return bodies; 
595 mike  1.2  }
596            
597 mday  1.12 
598 mike  1.2  Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
599            {
600 mday  1.22    // never time out if the interval is zero
601               if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
602                  return false;
603               
604 mday  1.36    struct timeval now, finish, remaining ;
605 mday  1.13    Uint32 usec;
606 mday  1.33    pegasus_gettimeofday(&now);
607 mday  1.36    /* remove valgrind error */
608               pegasus_gettimeofday(&remaining);
609               
610 mday  1.13 
611               finish.tv_sec = start->tv_sec + interval->tv_sec;
612               usec = start->tv_usec + interval->tv_usec;
613               finish.tv_sec += (usec / 1000000);
614               usec %= 1000000;
615               finish.tv_usec = usec;
616                
617               if ( timeval_subtract(&remaining, &finish, &now) )
618 mike  1.2        return true;
619               else
620                  return false;
621            }
622            
623            PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
624            {
625 mday  1.30    exit_thread((PEGASUS_THREAD_RETURN)1);
626               return (PEGASUS_THREAD_RETURN)1;
627 mike  1.2  }
628 mday  1.19 
629            
630             void ThreadPool::_sleep_sem_del(void *p)
631            {
632               if(p != 0)
633               {
634                  delete (Semaphore *)p;
635               }
636            }
637            
638             void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
639            {
640               if (true == check_time(start, &_deadlock_detect))
641                  throw Deadlock(pegasus_thread_self());
642               return;
643            }
644            
645            
646             Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
647            {
648               return(check_time(start, &_deadlock_detect));
649 mday  1.19 }
650            
651             Boolean ThreadPool::_check_dealloc(struct timeval *start)
652            {
653               return(check_time(start, &_deallocate_wait));
654            }
655            
656             Thread *ThreadPool::_init_thread(void) throw(IPCException)
657            {
658               Thread *th = (Thread *) new Thread(_loop, this, false);
659               // allocate a sleep semaphore and pass it in the thread context
660               // initial count is zero, loop function will sleep until
661               // we signal the semaphore
662               Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
663               th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
664               
665               struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
666 mday  1.35    pegasus_gettimeofday(dldt);
667               
668 mday  1.19    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
669               // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
670               th->run();
671               _current_threads++;
672               pegasus_yield();
673               
674               return th;
675            }
676            
677             void ThreadPool::_link_pool(Thread *th) throw(IPCException)
678            {
679               if(th == 0)
680                  throw NullPointer();
681               _pool.insert_first(th);
682            }
683            
684 mike  1.2  
685            
686            PEGASUS_NAMESPACE_END
687            

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2