(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mday  1.1.2.17 
  2 mday  1.1.2.1  //%/////////////////////////////////////////////////////////////////////////////
  3                //
  4 rudy  1.1.2.13 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM,
  5                // Compaq Computer Corporation
  6 mday  1.1.2.1  //
  7                // Permission is hereby granted, free of charge, to any person obtaining a copy
  8                // of this software and associated documentation files (the "Software"), to 
  9                // deal in the Software without restriction, including without limitation the 
 10                // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
 11                // sell copies of the Software, and to permit persons to whom the Software is
 12                // furnished to do so, subject to the following conditions:
 13                // 
 14                // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 
 15                // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 16                // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 17                // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
 18                // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 19                // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 
 20                // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 21                // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 22                //
 23                //==============================================================================
 24                //
 25 mday  1.1.2.2  // Author: Mike Day (mdday@us.ibm.com)
 26 mday  1.1.2.1  //
 27 rudy  1.1.2.13 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
 28                //              added nsk platform support  
 29 mday  1.1.2.1  //
 30                //%/////////////////////////////////////////////////////////////////////////////
 31                
 32                #include "Thread.h"
 33 mday  1.1.2.4  #include <Pegasus/Common/IPC.h>
 34 mday  1.1.2.1  
 35                #if defined(PEGASUS_OS_TYPE_WINDOWS)
 36 mday  1.1.2.16 # include "ThreadWindows.cpp"
 37 mday  1.1.2.1  #elif defined(PEGASUS_OS_TYPE_UNIX)
 38                # include "ThreadUnix.cpp"
 39 rudy  1.1.2.13 #elif defined(PEGASUS_OS_TYPE_NSK)
 40                # include "ThreadNsk.cpp"
 41 mday  1.1.2.1  #else
 42                # error "Unsupported platform"
 43                #endif
 44                
 45                PEGASUS_NAMESPACE_BEGIN
 46                
 47 mday  1.1.2.8  void thread_data::default_delete(void * data) 
 48                { 
 49                   if( data != NULL)
 50                      ::operator delete(data); 
 51                }
 52                
 53 mday  1.1.2.3  Boolean Thread::_signals_blocked = false;
 54 mday  1.1.2.1  
 55                // for non-native implementations
 56                #ifndef PEGASUS_THREAD_CLEANUP_NATIVE 
 57                void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 58                {
 59 mday  1.1.2.5      cleanup_handler *cu = new cleanup_handler(routine, parm);
 60                    try 
 61                    {
 62                	_cleanup.insert_first(cu); 
 63                    } 
 64                    catch(IPCException& e) 
 65                    { 
 66 mday  1.1.2.7  	delete cu;
 67 mday  1.1.2.5  	throw; 
 68                    }
 69                    return;
 70 mday  1.1.2.1  }
 71 mday  1.1.2.5  	  
 72 kumpf 1.1.2.6  void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 73 mday  1.1.2.1  {
 74 mday  1.1.2.5      cleanup_handler *cu ;
 75                    try 
 76                    { 
 77                	cu = _cleanup.remove_first() ;
 78                    }
 79                    catch(IPCException& e) 
 80 mday  1.1.2.7      {
 81 kumpf 1.1.2.6  	PEGASUS_ASSERT(0); 
 82 mday  1.1.2.14      }
 83 mday  1.1.2.5      if(execute == true)
 84                	cu->execute();
 85                    delete cu;
 86 mday  1.1.2.1  }
 87 mday  1.1.2.5  		    
 88 mday  1.1.2.3  #endif
 89 mday  1.1.2.4  
 90                
 91                //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 92                
 93 mday  1.1.2.2  
 94 mday  1.1.2.3  #ifndef PEGASUS_THREAD_EXIT_NATIVE 
 95 mday  1.1.2.2  void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code) 
 96                { 
 97 mday  1.1.2.5      // execute the cleanup stack and then return 
 98 mday  1.1.2.17    while( _cleanup.count() )
 99 mday  1.1.2.5     {
100                       try 
101                       { 
102                	   cleanup_pop(true); 
103                       }
104                       catch(IPCException& e) 
105                       { 
106 mday  1.1.2.7  	  PEGASUS_ASSERT(0); 
107                	  break; 
108 mday  1.1.2.5         } 
109                   }
110 mday  1.1.2.7     _exit_code = exit_code;
111                   exit_thread(exit_code);
112 mday  1.1.2.2  }
113 mday  1.1.2.4  
114 mday  1.1.2.7  
115 mday  1.1.2.1  #endif
116 mday  1.1.2.9  
117 mday  1.1.2.14 ThreadPool::ThreadPool(Sint16 initial_size,
118                		       Sint8 *key,
119                		       Sint16 min,
120                		       Sint16 max,
121                		       struct timeval & alloc_wait,
122                		       struct timeval & dealloc_wait, 
123                		       struct timeval & deadlock_detect)
124 mday  1.1.2.9     : _max_threads(max), _min_threads(min),
125                     _current_threads(0), _waiters(initial_size), 
126                     _pool_sem(0), _pool(true), _running(true), 
127 mday  1.1.2.14      _dead(true), _dying(0)
128 mday  1.1.2.9  {
129 mday  1.1.2.14    _allocate_wait.tv_sec = alloc_wait.tv_sec;
130                   _allocate_wait.tv_usec = alloc_wait.tv_usec;
131                   _deallocate_wait.tv_sec = dealloc_wait.tv_sec; 
132                   _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
133                   _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
134                   _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
135 mday  1.1.2.9     memset(_key, 0x00, 17);
136                   if(key != 0)
137                      strncpy(_key, key, 16);
138                   if(_max_threads < initial_size)
139                      _max_threads = initial_size;
140                   if(_min_threads > initial_size)
141                      _min_threads = initial_size;
142                   
143                   int i;
144                   for(i = 0; i < initial_size; i++)
145                   {
146                      _link_pool(_init_thread());
147                   }
148                }
149                
150 mday  1.1.2.14    
151                
152 mday  1.1.2.9  ThreadPool::~ThreadPool(void)
153                {
154                   _dying++;
155                   Thread *th = _pool.remove_first();
156                   while(th != 0)
157 mday  1.1.2.14    {      
158                      Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
159                
160                      if(sleep_sem == 0)
161                      {
162                	 th->dereference_tsd();
163                	 throw NullPointer();
164                      }
165                      
166                      sleep_sem->signal();
167                      sleep_sem->signal();
168                      th->dereference_tsd();
169 mday  1.1.2.9        // signal the thread's sleep semaphore
170                      th->cancel();
171                      th->join();
172                      th->empty_tsd();
173                      delete th;
174                      th = _pool.remove_first();
175                   }
176 mday  1.1.2.14 
177                   th = _running.remove_first();
178                   while(th != 0)
179                   {
180                      // signal the thread's sleep semaphore
181                      th->cancel();
182                      th->join();
183                      th->empty_tsd();
184                      delete th;
185                      th = _running.remove_first();
186                   }
187                
188                   th = _dead.remove_first();
189                   while(th != 0)
190                   {
191                      // signal the thread's sleep semaphore
192                      th->cancel();
193                      th->join();
194                      th->empty_tsd();
195                      delete th;
196                      th = _dead.remove_first();
197 mday  1.1.2.14    }
198 mday  1.1.2.9  }
199                
200                // make this static to the class 
201                PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
202                {
203                   Thread *myself = (Thread *)parm;
204                   if(myself == 0)
205                      throw NullPointer();
206                   ThreadPool *pool = (ThreadPool *)myself->get_parm();
207                   if(pool == 0 )
208                      throw NullPointer();
209                   Semaphore *sleep_sem;
210                   struct timeval *deadlock_timer;
211                   
212                   try 
213                   {
214                      sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
215                      myself->dereference_tsd();
216                      deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
217                      myself->dereference_tsd();
218                   }
219 mday  1.1.2.9     catch(IPCException & e)
220                   {
221                      myself->exit_self(0);
222                   }
223                   if(sleep_sem == 0 || deadlock_timer == 0)
224                      throw NullPointer();
225                
226                   while(pool->_dying < 1)
227                   {
228                      sleep_sem->wait();
229                      // when we awaken we reside on the running queue, not the pool queue
230 mday  1.1.2.14       if(pool->_dying > 0)
231                	 break;
232 mday  1.1.2.19      
233 mday  1.1.2.9        
234                      PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *);
235                      void *parm;
236                
237                      try 
238                      {
239                	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
240                	    myself->reference_tsd("work func");
241                	 myself->dereference_tsd();
242                	 parm = myself->reference_tsd("work parm");
243                	 myself->dereference_tsd();
244                      }
245                      catch(IPCException & e)
246                      {
247                	 myself->exit_self(0);
248                      }
249                      
250                      if(_work == 0)
251                	 throw NullPointer();
252 mday  1.1.2.19       gettimeofday(deadlock_timer, NULL);
253 mday  1.1.2.9        _work(parm);
254                	    
255                      // put myself back onto the available list 
256                      try 
257                      {
258                	 pool->_running.remove((void *)myself);
259                	 pool->_link_pool(myself);
260                      }
261                      catch(IPCException & e)
262                      {
263                	 myself->exit_self(0);
264                      }
265                   }
266 mday  1.1.2.14    // wait to be awakend by the thread pool destructor
267                   sleep_sem->wait();
268                   myself->test_cancel();
269 mday  1.1.2.9     myself->exit_self(0);
270                   return((PEGASUS_THREAD_RETURN)0);
271                }
272                
273                
274                void ThreadPool::allocate_and_awaken(void *parm,
275                				     PEGASUS_THREAD_RETURN \
276                				     (PEGASUS_THREAD_CDECL *work)(void *))
277                   throw(IPCException)
278                {
279                   struct timeval start;
280                   gettimeofday(&start, NULL);
281                   
282                   Thread *th = _pool.remove_first();
283                
284                   while (th == 0 && _dying < 1)
285                   {
286                      try  // we couldn't get a free thread from the pool
287                      {
288                	 // wait for the right interval and try again
289                	 while(th == 0 && _dying < 1)
290 mday  1.1.2.9  	 {
291                	    _check_deadlock(&start);
292                	    Uint32 interval = _allocate_wait.tv_sec * 1000;
293                	    if(_allocate_wait.tv_usec > 0)
294                	       interval += (_deallocate_wait.tv_usec / 1000);
295                	    // will throw a timeout if no thread comes free
296                	    _pool_sem.time_wait(interval);
297                	    th = _pool.remove_first();
298                	 }
299                      }
300                      catch(TimeOut & to)
301                      {
302                	 if(_current_threads < _max_threads)
303                	 {
304                	    th = _init_thread();
305                	    break;
306                	 } 
307                      } 
308                      // will throw a Deadlock Exception before falling out of the loop
309 mday  1.1.2.14 
310                      _check_deadlock(&start); 
311                      
312 mday  1.1.2.9     } // while th == null
313                   
314                   if(_dying < 1)
315                   {
316                      // initialize the thread data with the work function and parameters
317                      th->remove_tsd("work func");
318                      th->put_tsd("work func", NULL, 
319                		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
320                		  (void *)work);
321                      th->remove_tsd("work parm");
322                      th->put_tsd("work parm", NULL, sizeof(void *), parm);
323                      
324                      // put the thread on the running list 
325                      _running.insert_first(th);
326                
327                      // signal the thread's sleep semaphore to awaken it
328                      Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
329 mday  1.1.2.14 
330 mday  1.1.2.9        if(sleep_sem == 0)
331 mday  1.1.2.14       {
332                	 th->dereference_tsd();
333 mday  1.1.2.9  	 throw NullPointer();
334 mday  1.1.2.14       }
335                      
336 mday  1.1.2.9        sleep_sem->signal();
337 mday  1.1.2.14       th->dereference_tsd();
338 mday  1.1.2.9     }
339                   else
340                      _pool.insert_first(th);
341                }
342                
343                // caller is responsible for only calling this routine during slack periods
344                // but should call it at least once per _deadlock_detect with the running q
345                // and at least once per _deallocate_wait for the pool q
346                
347 mday  1.1.2.14 void ThreadPool::kill_dead_threads(void) 
348                	 throw(IPCException)
349 mday  1.1.2.9  {
350                   struct timeval now;
351                   gettimeofday(&now, NULL);
352                   
353 mday  1.1.2.14 
354                   // first go thread the dead q and clean it up as much as possible
355                   while(_dead.count() > 0)
356 mday  1.1.2.9     {
357 mday  1.1.2.14       Thread *dead = _dead.remove_first();
358                      if(dead == 0)
359                	 throw NullPointer();
360                      if(dead->_handle.thid != 0)
361 mday  1.1.2.9        {
362 mday  1.1.2.17 	 dead->detach();
363 mday  1.1.2.14 	 destroy_thread(dead->_handle.thid, 0);
364 mday  1.1.2.17 	 dead->_handle.thid = 0;
365 mday  1.1.2.14 	 while(dead->_cleanup.count() )
366                	 {
367 mday  1.1.2.18 	    // this may throw a permission exception, 
368                	    // which I will remove from the code prior to stabilizing
369 mday  1.1.2.14 	    dead->cleanup_pop(true);
370                	 }
371 mday  1.1.2.9        }
372 mday  1.1.2.14       delete dead;
373                   }
374                   
375                   DQueue<Thread> * map[2] = 
376 mday  1.1.2.9        {
377 mday  1.1.2.14 	 &_pool, &_running
378                      };
379                   
380                   
381                   DQueue<Thread> *q = 0;
382                   int i = 0;
383                   AtomicInt needed(0);
384                   
385                   for( q = map[i] ; i < 2; i++, q = map[i])
386                   {
387                      if(q->count() > 0 )
388 mday  1.1.2.9        {
389                	 try 
390                	 {
391 mday  1.1.2.14 	    q->try_lock();
392 mday  1.1.2.9  	 }
393                	 catch(AlreadyLocked & a)
394                	 {
395 mday  1.1.2.14 	    q++;
396 mday  1.1.2.9  	    continue;
397                	 }
398 mday  1.1.2.14 
399                	 struct timeval dt = { 0, 0 };
400                	 struct timeval *dtp;
401                	 Thread *th = 0;
402                	 th = q->next(th);
403                	 while (th != 0 )
404 mday  1.1.2.9  	 {
405 mday  1.1.2.14 	    try 
406                	    {
407                	       dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
408                	    }
409                	    catch(AlreadyLocked & a)
410                	    {
411                	       th = q->next(th);
412                	       continue;
413                	    }
414                	 
415                	    if(dtp != 0)
416                	    {
417                	       memcpy(&dt, dtp, sizeof(struct timeval));
418 mday  1.1.2.9  	    
419 mday  1.1.2.14 	    }
420                	    th->dereference_tsd();
421                	    struct timeval deadlock_timeout;
422                	    if( true == check_time(&dt, get_deadlock_detect(&deadlock_timeout) ))
423 mday  1.1.2.9  	    {
424 mday  1.1.2.14 	       // if we are deallocating from the pool, escape if we are
425                	       // down to the minimum thread count 
426                	       if( _current_threads.value() <= (Uint32)_min_threads )
427                	       {
428                		  if( i == 1)
429                		  {
430                		     th = q->next(th);
431                		     continue;
432                		  }
433                		  else 
434                		  {
435                		     // we are killing a hung thread and we will drop below the 
436                		     // minimum. create another thread to make up for the one
437                		     // we are about to kill
438                		     needed++;
439                		  }
440                	       }
441                	       
442                	       th = q->remove_no_lock((void *)th);
443                	    
444                	       if(th != 0)
445 mday  1.1.2.14 	       {
446                		  th->remove_tsd("work func");
447                		  th->put_tsd("work func", NULL, 
448                			      sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
449                			      (void *)&_undertaker);
450                		  th->remove_tsd("work parm");
451                		  th->put_tsd("work parm", NULL, sizeof(void *), th);
452                	        
453                		  // signal the thread's sleep semaphore to awaken it
454                		  Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
455                	       
456                		  if(sleep_sem == 0)
457                		  {
458                		     th->dereference_tsd();
459                		     throw NullPointer();
460                		  }
461                		  // put the thread on the dead  list 
462                		  _dead.insert_first(th);
463                		  sleep_sem->signal(); 
464                		  th->dereference_tsd();
465                		  th = 0;
466 mday  1.1.2.14 	       }
467 mday  1.1.2.9  	    }
468 mday  1.1.2.14 	    th = q->next(th);
469                	 }
470                	 q->unlock();
471                	 while (needed.value() > 0)
472                	 {
473                	    _link_pool(_init_thread());
474                	    needed--;
475 mday  1.1.2.9  	 }
476                      }
477                   }
478                   
479 mday  1.1.2.14    
480 mday  1.1.2.9     return;
481                }
482                
483 mday  1.1.2.14 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
484 mday  1.1.2.9  {
485                   struct timeval now;
486                   gettimeofday(&now, NULL);
487                   if( (now.tv_sec - start->tv_sec) > interval->tv_sec || 
488                       (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&
489                	((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )
490                      return true;
491                   else
492                      return false;
493 mday  1.1.2.14 }
494                
495                
496                PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
497                {
498                   Thread *myself = reinterpret_cast<Thread *>(parm);
499                   if(myself != 0)
500                   {
501                      myself->detach();
502                      myself->_handle.thid = 0;
503                      myself->cancel();
504                      myself->test_cancel();
505                      myself->exit_self(0);
506                   }
507                   return((PEGASUS_THREAD_RETURN)0);
508 mday  1.1.2.9  }
509                
510 mday  1.1.2.1  
511                PEGASUS_NAMESPACE_END
512 mday  1.1.2.7  

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2