(file) Return to Thread.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mday  1.1.2.1 //%/////////////////////////////////////////////////////////////////////////////
  2               //
  3               // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4               //
  5               // Permission is hereby granted, free of charge, to any person obtaining a copy
  6               // of this software and associated documentation files (the "Software"), to 
  7               // deal in the Software without restriction, including without limitation the 
  8               // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
  9               // sell copies of the Software, and to permit persons to whom the Software is
 10               // furnished to do so, subject to the following conditions:
 11               // 
 12               // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 
 13               // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14               // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15               // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
 16               // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 17               // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 
 18               // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19               // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20               //
 21               //==============================================================================
 22 mday  1.1.2.1 //
 23 mday  1.1.2.2 // Author: Mike Day (mdday@us.ibm.com)
 24 mday  1.1.2.1 //
 25 mday  1.1.2.2 // Modified By: 
 26 mday  1.1.2.1 //
 27               //%/////////////////////////////////////////////////////////////////////////////
 28               
 29               #include "Thread.h"
 30 mday  1.1.2.4 #include <Pegasus/Common/IPC.h>
 31 mday  1.1.2.1 
 32               #if defined(PEGASUS_OS_TYPE_WINDOWS)
 33               # include "ThreadWindows.cpp"
 34               #elif defined(PEGASUS_OS_TYPE_UNIX)
 35               # include "ThreadUnix.cpp"
 36               #else
 37               # error "Unsupported platform"
 38               #endif
 39               
 40               PEGASUS_NAMESPACE_BEGIN
 41               
 42 mday  1.1.2.8 void thread_data::default_delete(void * data) 
 43               { 
 44                  if( data != NULL)
 45                     ::operator delete(data); 
 46               }
 47               
 48 mday  1.1.2.3 Boolean Thread::_signals_blocked = false;
 49 mday  1.1.2.1 
 50               // for non-native implementations
 51               #ifndef PEGASUS_THREAD_CLEANUP_NATIVE 
 52               void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
 53               {
 54 mday  1.1.2.5     cleanup_handler *cu = new cleanup_handler(routine, parm);
 55                   try 
 56                   {
 57               	_cleanup.insert_first(cu); 
 58                   } 
 59                   catch(IPCException& e) 
 60                   { 
 61 mday  1.1.2.7 	delete cu;
 62 mday  1.1.2.5 	throw; 
 63                   }
 64                   return;
 65 mday  1.1.2.1 }
 66 mday  1.1.2.5 	  
 67 kumpf 1.1.2.6 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
 68 mday  1.1.2.1 {
 69 mday  1.1.2.5     cleanup_handler *cu ;
 70                   try 
 71                   { 
 72               	cu = _cleanup.remove_first() ;
 73                   }
 74                   catch(IPCException& e) 
 75 mday  1.1.2.7     {
 76 kumpf 1.1.2.6 	PEGASUS_ASSERT(0); 
 77 mday  1.1.2.5     }
 78                   if(execute == true)
 79               	cu->execute();
 80                   delete cu;
 81 mday  1.1.2.1 }
 82 mday  1.1.2.5 		    
 83 mday  1.1.2.3 #endif
 84 mday  1.1.2.4 
 85               
 86               //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
 87               
 88 mday  1.1.2.2 
 89 mday  1.1.2.3 #ifndef PEGASUS_THREAD_EXIT_NATIVE 
 90 mday  1.1.2.2 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code) 
 91               { 
 92 mday  1.1.2.5     // execute the cleanup stack and then return 
 93                  while( _cleanup.count() )
 94                  {
 95                      try 
 96                      { 
 97               	   cleanup_pop(true); 
 98                      }
 99                      catch(IPCException& e) 
100                      { 
101 mday  1.1.2.7 	  PEGASUS_ASSERT(0); 
102               	  break; 
103 mday  1.1.2.5        } 
104                  }
105 mday  1.1.2.7    _exit_code = exit_code;
106                  exit_thread(exit_code);
107 mday  1.1.2.2 }
108 mday  1.1.2.4 
109 mday  1.1.2.7 
110 mday  1.1.2.1 #endif
111 mday  1.1.2.9 
112               
113               ThreadPool::ThreadPool(Sint16 initial_size, 
114               		       Sint16 max, 
115               		       Sint16 min, 
116               		       Sint8 *key)
117                  : _max_threads(max), _min_threads(min),
118                    _current_threads(0), _waiters(initial_size), 
119                    _pool_sem(0), _pool(true), _running(true), 
120                    _dying(0)
121               {
122                  _allocate_wait.tv_sec = 1;
123                  _allocate_wait.tv_usec = 0;
124                  _deallocate_wait.tv_sec = 30;
125                  _deallocate_wait.tv_usec = 0;
126                  _deadlock_detect.tv_sec = 60;
127                  _deadlock_detect.tv_usec = 0;
128                  memset(_key, 0x00, 17);
129                  if(key != 0)
130                     strncpy(_key, key, 16);
131                  if(_max_threads < initial_size)
132 mday  1.1.2.9       _max_threads = initial_size;
133                  if(_min_threads > initial_size)
134                     _min_threads = initial_size;
135                  
136                  int i;
137                  for(i = 0; i < initial_size; i++)
138                  {
139                     _link_pool(_init_thread());
140                  }
141               }
142               
143               ThreadPool::~ThreadPool(void)
144               {
145                  _dying++;
146                  Thread *th = _pool.remove_first();
147                  while(th != 0)
148                  {
149                     // signal the thread's sleep semaphore
150                     th->cancel();
151                     th->join();
152                     th->empty_tsd();
153 mday  1.1.2.9       delete th;
154                     th = _pool.remove_first();
155                  }
156               }
157               
158               // make this static to the class 
159               PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
160               {
161                  Thread *myself = (Thread *)parm;
162                  if(myself == 0)
163                     throw NullPointer();
164                  ThreadPool *pool = (ThreadPool *)myself->get_parm();
165                  if(pool == 0 )
166                     throw NullPointer();
167                  Semaphore *sleep_sem;
168                  struct timeval *deadlock_timer;
169                  
170                  try 
171                  {
172                     sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
173                     myself->dereference_tsd();
174 mday  1.1.2.9       deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
175                     myself->dereference_tsd();
176                  }
177                  catch(IPCException & e)
178                  {
179                     myself->exit_self(0);
180                  }
181                  if(sleep_sem == 0 || deadlock_timer == 0)
182                     throw NullPointer();
183               
184                  while(pool->_dying < 1)
185                  {
186                     myself->test_cancel();
187                     sleep_sem->wait();
188                     // when we awaken we reside on the running queue, not the pool queue
189                     myself->test_cancel();
190                     gettimeofday(deadlock_timer, NULL);
191                     
192                     PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *);
193                     void *parm;
194               
195 mday  1.1.2.9       try 
196                     {
197               	 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
198               	    myself->reference_tsd("work func");
199               	 myself->dereference_tsd();
200               	 parm = myself->reference_tsd("work parm");
201               	 myself->dereference_tsd();
202                     }
203                     catch(IPCException & e)
204                     {
205               	 myself->exit_self(0);
206                     }
207                     
208                     if(_work == 0)
209               	 throw NullPointer();
210                     _work(parm);
211               	    
212                     // put myself back onto the available list 
213                     try 
214                     {
215               	 pool->_running.remove((void *)myself);
216 mday  1.1.2.9 	 pool->_link_pool(myself);
217                     }
218                     catch(IPCException & e)
219                     {
220               	 myself->exit_self(0);
221                     }
222                  }
223                  myself->exit_self(0);
224                  return((PEGASUS_THREAD_RETURN)0);
225               }
226               
227               
228               void ThreadPool::allocate_and_awaken(void *parm,
229               				     PEGASUS_THREAD_RETURN \
230               				     (PEGASUS_THREAD_CDECL *work)(void *))
231                  throw(IPCException)
232               {
233                  struct timeval start;
234                  gettimeofday(&start, NULL);
235                  
236                  Thread *th = _pool.remove_first();
237 mday  1.1.2.9 
238                  while (th == 0 && _dying < 1)
239                  {
240                     try  // we couldn't get a free thread from the pool
241                     {
242               	 // wait for the right interval and try again
243               	 while(th == 0 && _dying < 1)
244               	 {
245               	    _check_deadlock(&start);
246               	    Uint32 interval = _allocate_wait.tv_sec * 1000;
247               	    if(_allocate_wait.tv_usec > 0)
248               	       interval += (_deallocate_wait.tv_usec / 1000);
249               	    // will throw a timeout if no thread comes free
250               	    _pool_sem.time_wait(interval);
251               	    th = _pool.remove_first();
252               	 }
253                     }
254                     catch(TimeOut & to)
255                     {
256               	 if(_current_threads < _max_threads)
257               	 {
258 mday  1.1.2.9 	    th = _init_thread();
259               	    break;
260               	 } 
261                     } 
262                     // will throw a Deadlock Exception before falling out of the loop
263                     _check_deadlock(&start);
264                  } // while th == null
265                  
266                  if(_dying < 1)
267                  {
268                     // initialize the thread data with the work function and parameters
269                     th->remove_tsd("work func");
270                     th->put_tsd("work func", NULL, 
271               		  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
272               		  (void *)work);
273                     th->remove_tsd("work parm");
274                     th->put_tsd("work parm", NULL, sizeof(void *), parm);
275                     
276                     // put the thread on the running list 
277                     _running.insert_first(th);
278               
279 mday  1.1.2.9       // signal the thread's sleep semaphore to awaken it
280                     Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
281                     if(sleep_sem == 0)
282               	 throw NullPointer();
283                     sleep_sem->signal();
284                  }
285                  else
286                     _pool.insert_first(th);
287               }
288               
289               // caller is responsible for only calling this routine during slack periods
290               // but should call it at least once per _deadlock_detect with the running q
291               // and at least once per _deallocate_wait for the pool q
292               
293               void ThreadPool::_kill_dead_threads(DQueue<Thread> *q, Boolean (*check)(struct timeval *)) 
294                  throw(IPCException)
295               {
296                  struct timeval now;
297                  gettimeofday(&now, NULL);
298                  
299 mday  1.1.2.11    DQueue<Thread> dead(true) ;
300 mday  1.1.2.9     
301                   if(q->count() > 0 )
302                   {
303                      try 
304                      {
305                	 q->try_lock();
306                      }
307                      catch(AlreadyLocked & a)
308                      {
309                	 return;
310                      }
311                
312                      Thread *context = 0;
313                      struct timeval dt = { 0, 0 };
314                      struct timeval *dtp;
315                      Thread *th = q->next(context);
316                      while (th != 0 )
317                      {
318                	 try 
319                	 {
320                	    dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
321 mday  1.1.2.9  	 }
322                	 catch(AlreadyLocked & a)
323                	 {
324                	    context = th;
325                	    th = q->next(context);
326                	    continue;
327                	 }
328                	 
329                	 if(dtp != 0)
330                	 {
331                	    memcpy(&dt, dtp, sizeof(struct timeval));
332                	    
333                	 }
334                	 th->dereference_tsd();
335                	 if( true == check(&dt))
336                	 {
337                	    th = q->remove_no_lock((void *)th);
338                	    
339                	    if(th != 0)
340                	    {
341                	       dead.insert_first(th);
342 mday  1.1.2.9  	       th = 0;
343                	    }
344                	 }
345                	 context = th;
346                	 th = q->next(context);
347                      }
348                      q->unlock();
349                   }
350                   
351                   if(dead.count())
352                   {
353                      Thread *th = dead.remove_first();
354                      while(th != 0)
355                      {
356                	 th->cancel();
357                	 th->join();
358                	 delete th;
359                	 th = dead.remove_first();
360                      }
361                   }
362                   return;
363 mday  1.1.2.9  }
364                
365                Boolean ThreadPool::_check_time(struct timeval *start, struct timeval *interval)
366                {
367                   struct timeval now;
368                   gettimeofday(&now, NULL);
369                   if( (now.tv_sec - start->tv_sec) > interval->tv_sec || 
370                       (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&
371                	((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )
372                      return true;
373                   else
374                      return false;
375                }
376                
377 mday  1.1.2.1  
378                PEGASUS_NAMESPACE_END
379 mday  1.1.2.7  

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2