(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM,
  4           // Compaq Computer Corporation
  5           //
  6           // Permission is hereby granted, free of charge, to any person obtaining a copy
  7           // of this software and associated documentation files (the "Software"), to 
  8           // deal in the Software without restriction, including without limitation the 
  9           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
 10           // sell copies of the Software, and to permit persons to whom the Software is
 11           // furnished to do so, subject to the following conditions:
 12           // 
 13           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 
 14           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
 17           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 18           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 
 19           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21           //
 22 mike  1.2 //==============================================================================
 23           //
 24           // Author: Mike Day (mdday@us.ibm.com)
 25           //
 26           // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 27           //              added nsk platform support  
 28           //
 29           //%/////////////////////////////////////////////////////////////////////////////
 30           
 31           #include "Thread.h"
 32           #include <Pegasus/Common/IPC.h>
 33           
 34           #if defined(PEGASUS_OS_TYPE_WINDOWS)
 35           # include "ThreadWindows.cpp"
 36           #elif defined(PEGASUS_OS_TYPE_UNIX)
 37           # include "ThreadUnix.cpp"
 38           #elif defined(PEGASUS_OS_TYPE_NSK)
 39           # include "ThreadNsk.cpp"
 40           #else
 41           # error "Unsupported platform"
 42           #endif
 43 mike  1.2 
 44           PEGASUS_NAMESPACE_BEGIN
 45           
 46           void thread_data::default_delete(void * data) 
 47           { 
 48              if( data != NULL)
 49                 ::operator delete(data); 
 50           }
 51           
 52           Boolean Thread::_signals_blocked = false;
 53           
 54           // for non-native implementations
 55           #ifndef PEGASUS_THREAD_CLEANUP_NATIVE 
 56           void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 57           {
 58               cleanup_handler *cu = new cleanup_handler(routine, parm);
 59               try 
 60               {
 61           	_cleanup.insert_first(cu); 
 62               } 
 63 mike  1.6     catch(IPCException&) 
 64 mike  1.2     { 
 65           	delete cu;
 66           	throw; 
 67               }
 68               return;
 69           }
 70           	  
 71           void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 72           {
 73               cleanup_handler *cu ;
 74               try 
 75               { 
 76           	cu = _cleanup.remove_first() ;
 77               }
 78 mike  1.6     catch(IPCException&) 
 79 mike  1.2     {
 80           	PEGASUS_ASSERT(0); 
 81                }
 82               if(execute == true)
 83           	cu->execute();
 84               delete cu;
 85           }
 86           		    
 87           #endif
 88           
 89           
 90           //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 91           
 92           
 93           #ifndef PEGASUS_THREAD_EXIT_NATIVE 
 94           void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code) 
 95           { 
 96               // execute the cleanup stack and then return 
 97              while( _cleanup.count() )
 98              {
 99                  try 
100 mike  1.2        { 
101           	   cleanup_pop(true); 
102                  }
103 mike  1.6        catch(IPCException&) 
104 mike  1.2        { 
105           	  PEGASUS_ASSERT(0); 
106           	  break; 
107                  } 
108              }
109              _exit_code = exit_code;
110              exit_thread(exit_code);
111 mday  1.4    _handle.thid = 0;
112 mike  1.2 }
113           
114           
115           #endif
116           
117           ThreadPool::ThreadPool(Sint16 initial_size,
118           		       Sint8 *key,
119           		       Sint16 min,
120           		       Sint16 max,
121           		       struct timeval & alloc_wait,
122           		       struct timeval & dealloc_wait, 
123           		       struct timeval & deadlock_detect)
124              : _max_threads(max), _min_threads(min),
125                _current_threads(0), _waiters(initial_size), 
126                _pool_sem(0), _pool(true), _running(true), 
127                _dead(true), _dying(0)
128           {
129              _allocate_wait.tv_sec = alloc_wait.tv_sec;
130              _allocate_wait.tv_usec = alloc_wait.tv_usec;
131              _deallocate_wait.tv_sec = dealloc_wait.tv_sec; 
132              _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
133 mike  1.2    _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
134              _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
135              memset(_key, 0x00, 17);
136              if(key != 0)
137                 strncpy(_key, key, 16);
138              if(_max_threads < initial_size)
139                 _max_threads = initial_size;
140              if(_min_threads > initial_size)
141                 _min_threads = initial_size;
142              
143              int i;
144              for(i = 0; i < initial_size; i++)
145              {
146                 _link_pool(_init_thread());
147              }
148           }
149           
150              
151           
152           ThreadPool::~ThreadPool(void)
153           {
154 mike  1.2    _dying++;
155              Thread *th = _pool.remove_first();
156              while(th != 0)
157              {      
158                 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
159           
160                 if(sleep_sem == 0)
161                 {
162           	 th->dereference_tsd();
163           	 throw NullPointer();
164                 }
165                 
166                 sleep_sem->signal();
167                 sleep_sem->signal();
168                 th->dereference_tsd();
169                 // signal the thread's sleep semaphore
170                 th->cancel();
171                 th->join();
172                 th->empty_tsd();
173                 delete th;
174                 th = _pool.remove_first();
175 mike  1.2    }
176           
177              th = _running.remove_first();
178              while(th != 0)
179              {
180                 // signal the thread's sleep semaphore
181                 th->cancel();
182                 th->join();
183                 th->empty_tsd();
184                 delete th;
185                 th = _running.remove_first();
186              }
187           
188              th = _dead.remove_first();
189              while(th != 0)
190              {
191                 // signal the thread's sleep semaphore
192                 th->cancel();
193                 th->join();
194                 th->empty_tsd();
195                 delete th;
196 mike  1.2       th = _dead.remove_first();
197              }
198           }
199           
200           // make this static to the class 
201           PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
202           {
203              Thread *myself = (Thread *)parm;
204              if(myself == 0)
205                 throw NullPointer();
206              ThreadPool *pool = (ThreadPool *)myself->get_parm();
207              if(pool == 0 )
208                 throw NullPointer();
209 mike  1.5    Semaphore *sleep_sem = 0;
210              struct timeval *deadlock_timer = 0;
211 mike  1.2    
212              try 
213              {
214                 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
215                 myself->dereference_tsd();
216                 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
217                 myself->dereference_tsd();
218              }
219 mike  1.6    catch(IPCException &)
220 mike  1.2    {
221                 myself->exit_self(0);
222              }
223              if(sleep_sem == 0 || deadlock_timer == 0)
224                 throw NullPointer();
225           
226              while(pool->_dying < 1)
227              {
228                 sleep_sem->wait();
229                 // when we awaken we reside on the running queue, not the pool queue
230                 if(pool->_dying > 0)
231           	 break;
232                
233                 
234 mike  1.5       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
235                 void *parm = 0;
236 mike  1.2 
237                 try 
238                 {
239           	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
240           	    myself->reference_tsd("work func");
241           	 myself->dereference_tsd();
242           	 parm = myself->reference_tsd("work parm");
243           	 myself->dereference_tsd();
244                 }
245 mike  1.6       catch(IPCException &)
246 mike  1.2       {
247           	 myself->exit_self(0);
248                 }
249                 
250                 if(_work == 0)
251           	 throw NullPointer();
252                 gettimeofday(deadlock_timer, NULL);
253                 _work(parm);
254           	    
255                 // put myself back onto the available list 
256                 try 
257                 {
258           	 pool->_running.remove((void *)myself);
259           	 pool->_link_pool(myself);
260                 }
261 mike  1.6       catch(IPCException &)
262 mike  1.2       {
263           	 myself->exit_self(0);
264                 }
265              }
266              // wait to be awakend by the thread pool destructor
267              sleep_sem->wait();
268              myself->test_cancel();
269              myself->exit_self(0);
270              return((PEGASUS_THREAD_RETURN)0);
271           }
272           
273           
274           void ThreadPool::allocate_and_awaken(void *parm,
275           				     PEGASUS_THREAD_RETURN \
276           				     (PEGASUS_THREAD_CDECL *work)(void *))
277              throw(IPCException)
278           {
279              struct timeval start;
280              gettimeofday(&start, NULL);
281              
282              Thread *th = _pool.remove_first();
283 mike  1.2 
284              while (th == 0 && _dying < 1)
285              {
286                 try  // we couldn't get a free thread from the pool
287                 {
288           	 // wait for the right interval and try again
289           	 while(th == 0 && _dying < 1)
290           	 {
291           	    _check_deadlock(&start);
292           	    Uint32 interval = _allocate_wait.tv_sec * 1000;
293           	    if(_allocate_wait.tv_usec > 0)
294           	       interval += (_deallocate_wait.tv_usec / 1000);
295           	    // will throw a timeout if no thread comes free
296           	    _pool_sem.time_wait(interval);
297           	    th = _pool.remove_first();
298           	 }
299                 }
300 mike  1.6       catch(TimeOut &)
301 mike  1.2       {
302           	 if(_current_threads < _max_threads)
303           	 {
304           	    th = _init_thread();
305           	    break;
306           	 } 
307                 } 
308                 // will throw a Deadlock Exception before falling out of the loop
309           
310                 _check_deadlock(&start); 
311                 
312              } // while th == null
313              
314              if(_dying < 1)
315              {
316                 // initialize the thread data with the work function and parameters
317                 th->remove_tsd("work func");
318                 th->put_tsd("work func", NULL, 
319           		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
320           		  (void *)work);
321                 th->remove_tsd("work parm");
322 mike  1.2       th->put_tsd("work parm", NULL, sizeof(void *), parm);
323                 
324                 // put the thread on the running list 
325                 _running.insert_first(th);
326           
327                 // signal the thread's sleep semaphore to awaken it
328                 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
329           
330                 if(sleep_sem == 0)
331                 {
332           	 th->dereference_tsd();
333           	 throw NullPointer();
334                 }
335                 
336                 sleep_sem->signal();
337                 th->dereference_tsd();
338              }
339              else
340                 _pool.insert_first(th);
341           }
342           
343 mike  1.2 // caller is responsible for only calling this routine during slack periods
344           // but should call it at least once per _deadlock_detect with the running q
345           // and at least once per _deallocate_wait for the pool q
346           
347           void ThreadPool::kill_dead_threads(void) 
348           	 throw(IPCException)
349           {
350              struct timeval now;
351              gettimeofday(&now, NULL);
352              
353           
354              // first go thread the dead q and clean it up as much as possible
355              while(_dead.count() > 0)
356              {
357                 Thread *dead = _dead.remove_first();
358                 if(dead == 0)
359           	 throw NullPointer();
360                 if(dead->_handle.thid != 0)
361                 {
362           	 dead->detach();
363           	 destroy_thread(dead->_handle.thid, 0);
364 mike  1.2 	 dead->_handle.thid = 0;
365           	 while(dead->_cleanup.count() )
366           	 {
367           	    // this may throw a permission exception, 
368           	    // which I will remove from the code prior to stabilizing
369           	    dead->cleanup_pop(true);
370           	 }
371                 }
372                 delete dead;
373              }
374              
375              DQueue<Thread> * map[2] = 
376                 {
377           	 &_pool, &_running
378                 };
379              
380              
381              DQueue<Thread> *q = 0;
382              int i = 0;
383              AtomicInt needed(0);
384              
385 mike  1.2    for( q = map[i] ; i < 2; i++, q = map[i])
386              {
387                 if(q->count() > 0 )
388                 {
389           	 try 
390           	 {
391           	    q->try_lock();
392           	 }
393 mike  1.6 	 catch(AlreadyLocked &)
394 mike  1.2 	 {
395           	    q++;
396           	    continue;
397           	 }
398           
399           	 struct timeval dt = { 0, 0 };
400           	 struct timeval *dtp;
401           	 Thread *th = 0;
402           	 th = q->next(th);
403           	 while (th != 0 )
404           	 {
405           	    try 
406           	    {
407           	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
408           	    }
409 mike  1.6 	    catch(AlreadyLocked &)
410 mike  1.2 	    {
411           	       th = q->next(th);
412           	       continue;
413           	    }
414           	 
415           	    if(dtp != 0)
416           	    {
417           	       memcpy(&dt, dtp, sizeof(struct timeval));
418           	    
419           	    }
420           	    th->dereference_tsd();
421           	    struct timeval deadlock_timeout;
422           	    if( true == check_time(&dt, get_deadlock_detect(&deadlock_timeout) ))
423           	    {
424           	       // if we are deallocating from the pool, escape if we are
425           	       // down to the minimum thread count 
426           	       if( _current_threads.value() <= (Uint32)_min_threads )
427           	       {
428           		  if( i == 1)
429           		  {
430           		     th = q->next(th);
431 mike  1.2 		     continue;
432           		  }
433           		  else 
434           		  {
435           		     // we are killing a hung thread and we will drop below the 
436           		     // minimum. create another thread to make up for the one
437           		     // we are about to kill
438           		     needed++;
439           		  }
440           	       }
441           	       
442           	       th = q->remove_no_lock((void *)th);
443           	    
444           	       if(th != 0)
445           	       {
446           		  th->remove_tsd("work func");
447           		  th->put_tsd("work func", NULL, 
448           			      sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
449           			      (void *)&_undertaker);
450           		  th->remove_tsd("work parm");
451           		  th->put_tsd("work parm", NULL, sizeof(void *), th);
452 mike  1.2 	        
453           		  // signal the thread's sleep semaphore to awaken it
454           		  Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
455           	       
456           		  if(sleep_sem == 0)
457           		  {
458           		     th->dereference_tsd();
459           		     throw NullPointer();
460           		  }
461           		  // put the thread on the dead  list 
462           		  _dead.insert_first(th);
463           		  sleep_sem->signal(); 
464           		  th->dereference_tsd();
465           		  th = 0;
466           	       }
467           	    }
468           	    th = q->next(th);
469           	 }
470           	 q->unlock();
471           	 while (needed.value() > 0)
472           	 {
473 mike  1.2 	    _link_pool(_init_thread());
474           	    needed--;
475           	 }
476                 }
477              }
478              
479              
480              return;
481           }
482           
483           Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
484           {
485              struct timeval now;
486              gettimeofday(&now, NULL);
487              if( (now.tv_sec - start->tv_sec) > interval->tv_sec || 
488                  (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&
489           	((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )
490                 return true;
491              else
492                 return false;
493           }
494 mike  1.2 
495           
496           PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
497           {
498              Thread *myself = reinterpret_cast<Thread *>(parm);
499              if(myself != 0)
500              {
501                 myself->detach();
502                 myself->_handle.thid = 0;
503                 myself->cancel();
504                 myself->test_cancel();
505                 myself->exit_self(0);
506              }
507              return((PEGASUS_THREAD_RETURN)0);
508           }
509           
510           
511           PEGASUS_NAMESPACE_END
512           

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2