(file) Return to Thread.h CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%///////////-*-c++-*-//////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6           // of this software and associated documentation files (the "Software"), to 
  7           // deal in the Software without restriction, including without limitation the 
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
  9           // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11           // 
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 
 13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 
 18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22 mike  1.2 //
 23           // Author: Mike Day (mdday@us.ibm.com)
 24           //
 25           // Modified By: Markus Mueller
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29           
 30           #ifndef Pegasus_Thread_h
 31           #define Pegasus_Thread_h
 32           #include <Pegasus/Common/Config.h>
 33 mike  1.5 #include <Pegasus/Common/IPC.h>
 34 mike  1.2 #include <Pegasus/Common/Exception.h>
 35           #include <Pegasus/Common/DQueue.h>
 36           
 37 mike  1.4 // REVIEW: Spend time getting to know this.
 38           
 39 mike  1.2 PEGASUS_NAMESPACE_BEGIN
 40           
 41           class PEGASUS_COMMON_LINKAGE cleanup_handler
 42           {
 43           
 44              public:
 45                 cleanup_handler( void (*routine)(void *), void *arg  ) : _routine(routine), _arg(arg)  {}
 46                 ~cleanup_handler()  {; }
 47                 inline Boolean operator==(const void *key) const
 48                 { 
 49           	 if(key == (void *)_routine) 
 50           	    return true; 
 51           	 return false; 
 52                 }
 53                 inline Boolean operator ==(const cleanup_handler & b) const 
 54                 {
 55           	 return(operator==((const void *)b._routine));
 56                 }
 57              private:
 58                 void execute(void) { _routine(_arg); } 
 59                 cleanup_handler();
 60 mike  1.2       void (*_routine)(void *);
 61           
 62                 void *_arg; 
 63                 PEGASUS_CLEANUP_HANDLE _cleanup_buffer;
 64                 friend class DQueue<class cleanup_handler>;
 65                 friend class Thread;
 66           };
 67           
 68           ///////////////////////////////////////////////////////////////////////////////
 69           
 70           
 71           class  PEGASUS_COMMON_LINKAGE thread_data
 72           {
 73           
 74              public:
 75                 static void default_delete(void *data);
 76                 
 77 kumpf 1.7       thread_data( const Sint8 *key ) : _delete_func(NULL) , _data(NULL), _size(0)
 78 mike  1.2       {
 79           	 PEGASUS_ASSERT(key != NULL);
 80           	 size_t keysize = strlen(key);
 81           	 _key = new Sint8 [keysize + 1];
 82           	 memcpy(_key, key, keysize);
 83           	 _key[keysize] = 0x00;
 84           	 
 85                 }
 86             
 87 kumpf 1.7       thread_data(const Sint8 *key, size_t size) : _delete_func(default_delete), _size(size)
 88 mike  1.2       {
 89           	 PEGASUS_ASSERT(key != NULL);
 90           	 size_t keysize = strlen(key);
 91           	 _key = new Sint8 [keysize + 1];
 92           	 memcpy(_key, key, keysize);
 93           	 _key[keysize] = 0x00;
 94           	 _data = ::operator new(_size) ;
 95           
 96                 }
 97           
 98 kumpf 1.7       thread_data(const Sint8 *key, size_t size, void *data) : _delete_func(default_delete), _size(size)
 99 mike  1.2       {
100           	 PEGASUS_ASSERT(key != NULL);
101           	 PEGASUS_ASSERT(data != NULL);
102           	 size_t keysize = strlen(key);
103           
104           	 _key = new Sint8[keysize + 1];
105           	 memcpy(_key, key, keysize);
106           	 _key[keysize] = 0x00;
107           	 _data = ::operator new(_size);
108           	 memcpy(_data, data, size);
109                 }
110           
111                 ~thread_data() 
112                 { 
113           	 if( _data != NULL) 
114           	    if(_delete_func != NULL)
115 sage  1.9             {
116 mike  1.2 	       _delete_func( _data ); 
117 sage  1.9             }
118 mike  1.2 	 if( _key != NULL )
119           	    delete [] _key;
120                 }  
121           
122                 void put_data(void (*del)(void *), size_t size, void *data ) throw(NullPointer)
123                 {
124           	 if(_data != NULL)
125           	    if(_delete_func != NULL)
126           	       _delete_func(_data);
127           
128           	 _delete_func = del;
129           	 _data = data;
130           	 _size = size;
131           	 return ;
132                 }
133           
134                 size_t get_size(void) { return _size; }
135           
136                 void get_data(void **data, size_t *size) 
137                 {  
138           	 if(data == NULL || size == NULL)
139 mike  1.2 	    throw NullPointer();
140           	 
141           	 *data = _data;
142           	 *size = _size;
143           	 return;
144           	 
145                 }
146           
147                 void copy_data(void **buf, size_t *size) throw(BufferTooSmall, NullPointer)
148                 {
149           	 if((buf == NULL) || (size == NULL)) 
150           	    throw NullPointer() ; 
151           	 *buf = ::operator new(_size);
152           	 *size = _size;
153           	 memcpy(*buf, _data, _size);
154           	 return;
155                 }
156                 
157                 inline Boolean operator==(const void *key) const 
158                 { 
159           	 if ( ! strcmp(_key, (Sint8 *)key)) 
160 mike  1.2 	    return(true); 
161           	 return(false);
162                 } 
163 mday  1.8 
164 mike  1.2       inline Boolean operator==(const thread_data& b) const
165                 {
166           	 return(operator==((const void *)b._key));
167                 }
168           
169              private:
170                 void (*_delete_func) (void *data) ;
171                 thread_data();
172                 void *_data;
173                 size_t _size;
174                 Sint8 *_key;
175           
176                 friend class DQueue<thread_data>;
177                 friend class Thread;
178           };
179           
180           
181           ///////////////////////////////////////////////////////////////////////////
182           
183           class PEGASUS_COMMON_LINKAGE ThreadPool;
184           
185 mike  1.2 class PEGASUS_COMMON_LINKAGE Thread
186           {
187           
188              public:
189                 Thread( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *start )(void *),
190           	      void *parameter, Boolean detached );
191           
192                 ~Thread();
193           
194                 void run(void);
195           
196                 // get the user parameter 
197                 inline void *get_parm(void) { return _thread_parm; }
198           
199                 // send the thread a signal -- may not be appropriate due to Windows 
200                 //  void kill(int signum); 
201           
202                 // cancellation must be deferred (not asynchronous)
203                 // for user-level threads the thread itself can decide
204                 // when it should die. 
205                 void cancel(void);
206 mike  1.2 
207                 // cancel if there is a pending cancellation request
208                 void test_cancel(void);
209           
210 mday  1.3       Boolean is_cancelled(void);
211                 
212 mike  1.2       // for user-level threads  - put the calling thread
213                 // to sleep and jump to the thread scheduler. 
214                 // platforms with preemptive scheduling and native threads 
215                 // can define this to be a no-op. 
216                 // platforms without preemptive scheduling like NetWare 
217                 // or gnu portable threads will have an existing 
218                 // routine that can be mapped to this method 
219           
220                 void thread_switch(void);
221           
222 kumpf 1.11 #if defined(PEGASUS_PLATFORM_LINUX_IX86_GNU) || defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
223 mike  1.2        // suspend this thread 
224                  void suspend(void) ;
225            
226                  // resume this thread
227                  void resume(void) ;
228            #endif
229            
230                  static void sleep(Uint32 msec) ;
231            
232                  // block the calling thread until this thread terminates
233                  void join( void );
234                  void thread_init(void);
235            
236                  // thread routine needs to call this function when
237                  // it is ready to exit
238                  void exit_self(PEGASUS_THREAD_RETURN return_code) ;
239            
240                  // stack of functions to be called when thread terminates
241                  // will be called last in first out (LIFO)
242                  void cleanup_push( void (*routine) (void *), void *parm ) throw(IPCException);
243                  void cleanup_pop(Boolean execute = true) throw(IPCException);
244 mike  1.2  
245                  // create and initialize a tsd
246 kumpf 1.7        inline void create_tsd(const Sint8 *key, int size, void *buffer) throw(IPCException)
247 mike  1.2        {
248            	 thread_data *tsd = new thread_data(key, size, buffer);
249            	 try { _tsd.insert_first(tsd); }
250            	 catch(IPCException& e) { e = e; delete tsd; throw; }
251                  }
252            
253                  // get the buffer associated with the key
254                  // NOTE: this call leaves the tsd LOCKED !!!! 
255 kumpf 1.7        inline void *reference_tsd(const Sint8 *key) throw(IPCException)
256 mike  1.2        {
257            	 _tsd.lock(); 
258 kumpf 1.7  	 thread_data *tsd = _tsd.reference((const void *)key);
259 mike  1.2  	 if(tsd != NULL)
260            	    return( (void *)(tsd->_data) );
261            	 else
262            	    return(NULL);
263                  }
264            
265 kumpf 1.7        inline void *try_reference_tsd(const Sint8 *key) throw(IPCException)
266 mike  1.2        {
267            	 _tsd.try_lock();
268 kumpf 1.7  	 thread_data *tsd = _tsd.reference((const void *)key);
269 mike  1.2  	 if(tsd != NULL)
270            	    return((void *)(tsd->_data) );
271            	 else
272            	    return(NULL);
273                  }
274                  
275            
276                  // release the lock held on the tsd
277                  // NOTE: assumes a corresponding and prior call to reference_tsd() !!!
278                  inline void dereference_tsd(void) throw(IPCException)
279                  {
280            	 _tsd.unlock();
281                  }
282            
283                  // delete the tsd associated with the key
284 kumpf 1.7        inline void delete_tsd(const Sint8 *key) throw(IPCException)
285 mike  1.2        {
286 kumpf 1.7  	 thread_data *tsd = _tsd.remove((const void *)key);
287 mike  1.2  	 if(tsd != NULL)
288            	    delete tsd;
289                  }
290            
291 kumpf 1.7        inline void *remove_tsd(const Sint8 *key) throw(IPCException)
292 mike  1.2        {
293 kumpf 1.7  	 return(_tsd.remove((const void *)key));
294 mike  1.2        }
295                  
296                  inline void empty_tsd(void) throw(IPCException)
297                  {
298            	 _tsd.empty_list();
299                  }
300                  
301                  // create or re-initialize tsd associated with the key
302                  // if the tsd already exists, return the existing buffer
303 kumpf 1.7        thread_data *put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) 
304 mike  1.2  	 throw(IPCException)
305            
306                  {
307            	 PEGASUS_ASSERT(key != NULL);
308            	 thread_data *tsd ;
309 kumpf 1.7  	 tsd = _tsd.remove((const void *)key);  // may throw an IPC exception 
310 mike  1.2  	 thread_data *ntsd = new thread_data(key);
311            	 ntsd->put_data(delete_func, size, value);
312            	 try { _tsd.insert_first(ntsd); }
313            	 catch(IPCException& e) { e = e; delete ntsd; throw; }
314            	 return(tsd);
315                  }
316                  inline PEGASUS_THREAD_RETURN get_exit(void) { return _exit_code; }
317                  inline PEGASUS_THREAD_TYPE self(void) {return pegasus_thread_self(); }
318            
319                  PEGASUS_THREAD_HANDLE getThreadHandle() {return _handle;}
320            
321                  inline Boolean operator==(const void *key) const 
322                  { 
323            	 if ( (void *)this == key) 
324            	    return(true); 
325            	 return(false);
326                  } 
327                  inline Boolean operator==(const Thread & b) const
328                  {
329            	 return(operator==((const void *)&b ));
330                  }
331 mike  1.2  
332                  void detach(void);
333              
334               private:
335                  Thread();
336 kumpf 1.7        inline void create_tsd(const Sint8 *key ) throw(IPCException)
337 mike  1.2        {
338            	 thread_data *tsd = new thread_data(key);
339            	 try { _tsd.insert_first(tsd); }
340            	 catch(IPCException& e) { e = e; delete tsd; throw; }
341                  }
342                  PEGASUS_THREAD_HANDLE _handle;
343                  Boolean _is_detached;
344                  Boolean _cancel_enabled;
345                  Boolean _cancelled; 
346              
347                  PEGASUS_SEM_HANDLE _suspend_count;
348            
349                  // always pass this * as the void * parameter to the thread
350                  // store the user parameter in _thread_parm 
351            
352                  PEGASUS_THREAD_RETURN  ( PEGASUS_THREAD_CDECL *_start)(void *) ;
353                  DQueue<class cleanup_handler> _cleanup;
354                  DQueue<class thread_data> _tsd;
355            
356                  void *_thread_parm;
357                  PEGASUS_THREAD_RETURN _exit_code;
358 mike  1.2        static Boolean _signals_blocked;
359                  friend class ThreadPool;
360            } ;
361            
362            
363            class PEGASUS_COMMON_LINKAGE ThreadPool
364            {
365               public:
366            
367                  ThreadPool(Sint16 initial_size,
368 kumpf 1.7  		 const Sint8 *key,
369 mike  1.2  		 Sint16 min,
370            		 Sint16 max,
371            		 struct timeval & alloc_wait,
372            		 struct timeval & dealloc_wait, 
373            		 struct timeval & deadlock_detect);
374                  
375                  ~ThreadPool(void);
376            
377 mday  1.8        void allocate_and_awaken(void *parm, 
378 mday  1.10 			       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *work)(void *), 
379            			       Semaphore *blocking = 0)
380 mday  1.8  	 throw(IPCException);
381                  
382            
383                  Uint32 kill_dead_threads( void ) 
384 mike  1.2  	 throw(IPCException);
385                  
386                  void get_key(Sint8 *buf, int bufsize);
387            
388                  inline Boolean operator==(const void *key) const 
389                  { 
390            	 if ( ! strncmp( reinterpret_cast<Sint8 *>(const_cast<void *>(key)), _key, 16  )) 
391            	    return(true); 
392            	 return(false);
393                  } 
394                  inline Boolean operator==(const ThreadPool & b) const
395                  {
396            	 return(operator==((const void *) b._key ));
397                  }
398            
399                  inline void set_min_threads(Sint16 min)
400                  {
401            	 _min_threads = min;
402                  }
403                  
404                  inline Sint16 get_min_threads(void) const
405 mike  1.2        {
406            	 return _min_threads;
407                  }
408            
409                  inline void set_max_threads(Sint16 max)
410                  {
411            	 _max_threads = max;
412                  }
413                  
414                  inline Sint16 get_max_threads(void) const
415                  {
416            	 return _max_threads;
417                  }
418                  
419                  inline void set_allocate_wait(const struct timeval & alloc_wait)
420                  {
421            	 _allocate_wait.tv_sec = alloc_wait.tv_sec;
422            	 _allocate_wait.tv_usec = alloc_wait.tv_usec;
423                  }
424                  
425                  inline struct timeval *get_allocate_wait(struct timeval *buffer) const
426 mike  1.2        {
427            	 if(buffer == 0)
428            	    throw NullPointer();
429            	 buffer->tv_sec = _allocate_wait.tv_sec;
430            	 buffer->tv_usec = _allocate_wait.tv_usec;
431            	 return buffer;
432                  }
433            
434                  inline void set_deallocate_wait(const struct timeval & dealloc_wait)
435                  {
436            	 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
437            	 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
438                  }
439                  
440                  inline struct timeval *get_deallocate_wait(struct timeval *buffer) const
441                  {
442            	 if(buffer == 0)
443            	    throw NullPointer();
444            	 buffer->tv_sec = _deallocate_wait.tv_sec;
445            	 buffer->tv_usec = _deallocate_wait.tv_usec;
446            	 return buffer;
447 mike  1.2        }
448            
449                  inline void set_deadlock_detect(const struct timeval & deadlock)
450                  {
451            	 _deadlock_detect.tv_sec = deadlock.tv_sec;
452            	 _deadlock_detect.tv_usec = deadlock.tv_usec;
453                  }
454                  
455                  inline struct timeval * get_deadlock_detect(struct timeval *buffer) const
456                  {
457            	 if(buffer == 0)
458            	    throw NullPointer();
459            	 buffer->tv_sec = _deadlock_detect.tv_sec;
460            	 buffer->tv_usec = _deadlock_detect.tv_usec;
461            	 return buffer;
462                  }
463            
464                  inline Uint32 running_count(void)
465                  {
466            	 return _running.count();
467                  }
468 mike  1.2        
469                  static Boolean check_time(struct timeval *start, struct timeval *interval);
470            
471               private:
472                  ThreadPool(void);
473                  Sint16 _max_threads;
474                  Sint16 _min_threads;
475                  AtomicInt _current_threads;
476                  struct timeval _allocate_wait;
477                  struct timeval _deallocate_wait;
478                  struct timeval _deadlock_detect;
479                  static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _loop(void *);
480                  Sint8 _key[17];
481                  DQueue<Thread> _pool;
482                  DQueue<Thread> _running;
483                  DQueue<Thread> _dead;
484                  AtomicInt _dying;
485                  
486            
487                  static void _sleep_sem_del(void *p);
488                  
489 mike  1.2        void _check_deadlock(struct timeval *start) throw(Deadlock);
490                  Boolean _check_deadlock_no_throw(struct timeval *start);
491                  Boolean _check_dealloc(struct timeval *start);
492                  Thread *_init_thread(void) throw(IPCException);
493                  void _link_pool(Thread *th) throw(IPCException);
494                  static PEGASUS_THREAD_RETURN  _undertaker(void *);
495                  
496             };
497            
498            
499            inline void ThreadPool::_sleep_sem_del(void *p)
500            {
501               if(p != 0)
502               {
503                  delete (Semaphore *)p;
504               }
505            }
506            
507            inline void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
508            {
509               if (true == check_time(start, &_deadlock_detect))
510 mike  1.2        throw Deadlock(pegasus_thread_self());
511               return;
512            }
513            
514            
515            inline Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
516            {
517               return(check_time(start, &_deadlock_detect));
518            }
519            
520            inline Boolean ThreadPool::_check_dealloc(struct timeval *start)
521            {
522               return(check_time(start, &_deallocate_wait));
523            }
524            
525            inline Thread *ThreadPool::_init_thread(void) throw(IPCException)
526            {
527 mday  1.8     Thread *th = (Thread *) new Thread(_loop, this, false);
528 mike  1.2     // allocate a sleep semaphore and pass it in the thread context
529               // initial count is zero, loop function will sleep until
530               // we signal the semaphore
531               Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
532               th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
533 mday  1.8     
534 mike  1.2     struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
535               th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
536               // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
537               th->run();
538               _current_threads++;
539 mday  1.8     pegasus_yield();
540               
541 mike  1.2     return th;
542            }
543            
544            inline void ThreadPool::_link_pool(Thread *th) throw(IPCException)
545            {
546               if(th == 0)
547                  throw NullPointer();
548               _pool.insert_first(th);
549            }
550            
551            
552            #if defined(PEGASUS_OS_TYPE_WINDOWS)
553            # include "ThreadWindows_inline.h"
554 sage  1.6  #elif defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
555            # include "ThreadzOS_inline.h"
556 mike  1.2  #elif defined(PEGASUS_OS_TYPE_UNIX)
557            # include "ThreadUnix_inline.h"
558            #endif
559            
560            PEGASUS_NAMESPACE_END
561            
562            #endif // Pegasus_Thread_h

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2