(file) Return to IPCUnix.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.68 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.48 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.41 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.48 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.53 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.68 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12            // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2  //
 14            // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.23 // of this software and associated documentation files (the "Software"), to
 16            // deal in the Software without restriction, including without limitation the
 17            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 19            // furnished to do so, subject to the following conditions:
 20 karl  1.68 // 
 21 kumpf 1.23 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.23 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29            //
 30            //==============================================================================
 31            //
 32            // Author: Markus Mueller (sedgewick_de@yahoo.de)
 33            //
 34            // Modified By: Mike Day (mdday@us.ibm.com) -- added native implementation
 35            // of AtomicInt class, exceptions
 36 david.dillard 1.59 //          Ramnath Ravindran (Ramnath.Ravindran@compaq.com)
 37                    //          David Eger (dteger@us.ibm.com)
 38                    //          Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 39                    //          Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 40                    //          David Dillard, VERITAS Software Corp.
 41                    //              (david.dillardd@veritas.com)
 42 aruran.ms     1.62 //          Aruran, IBM (ashanmug@in.ibm.com) for BUG# 3518
 43 mike          1.2  //
 44                    //%/////////////////////////////////////////////////////////////////////////////
 45                    
 46 mike          1.3  PEGASUS_NAMESPACE_BEGIN
 47 mike          1.2  
 48 mike          1.3  #ifdef PEGASUS_PLATFORM_SOLARIS_SPARC_GNU
 49                    # define SEM_VALUE_MAX 0x0000ffff
 50                    #endif
 51 mike          1.2  
 52                    Mutex::Mutex()
 53                    {
 54 sage          1.6     pthread_mutexattr_init(&_mutex.mutatt);
 55 chuck         1.35 
 56                    #if defined(PEGASUS_PLATFORM_OS400_ISERIES_IBM)
 57                       // Needed because of EDEADLK checks below.
 58                       // Otherwise, the thread will block if it already owns the mutex.
 59                       pthread_mutexattr_settype(&_mutex.mutatt, PTHREAD_MUTEX_ERRORCHECK);
 60                       pthread_mutex_init(&_mutex.mut, &_mutex.mutatt);
 61 david.eger    1.38 #elif defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
 62 konrad.r      1.45 
 63                    #if defined(PEGASUS_OS_LSB)
 64                       pthread_mutexattr_settype(&_mutex.mutatt, PTHREAD_MUTEX_ERRORCHECK);
 65                    #else
 66 david.eger    1.38    pthread_mutexattr_settype(&_mutex.mutatt, PTHREAD_MUTEX_ERRORCHECK_NP);
 67 r.kieninger   1.54 #endif
 68 konrad.r      1.45 
 69 david.eger    1.38    pthread_mutex_init(&_mutex.mut, &_mutex.mutatt);
 70 chuck         1.35 #else
 71 mike          1.2     pthread_mutex_init(&_mutex.mut, NULL);
 72 chuck         1.35 #endif
 73                    
 74 mike          1.2     _mutex.owner = 0;
 75                    }
 76                    
 77 r.kieninger   1.54 Mutex::Mutex(int mutex_type)
 78 mike          1.2  {
 79                       pthread_mutexattr_init(&_mutex.mutatt);
 80 tony          1.42 #if !defined(SUNOS_5_6)
 81 mike          1.2     pthread_mutexattr_settype(&_mutex.mutatt, mutex_type);
 82 tony          1.42 #endif
 83 mike          1.2     pthread_mutex_init(&_mutex.mut,&_mutex.mutatt);
 84                       _mutex.owner = 0;
 85                    }
 86                    
 87                    // to be able share the mutex between different condition variables
 88                    Mutex::Mutex(const Mutex& mutex)
 89                    {
 90 r.kieninger   1.54    // only copy the handle, not the entire object.
 91                       // avoid calling the destructor twice.
 92 mike          1.2     _mutex.mut  = mutex._mutex.mut;
 93                       _mutex.owner = 0;
 94                    }
 95                    
 96                    Mutex::~Mutex()
 97                    {
 98 mday          1.40 //   while( EBUSY == pthread_mutex_destroy(&_mutex.mut))
 99                       //  {
100                    //      pegasus_yield();
101                    //   }
102                       // <<< Fri Oct 17 10:34:42 2003 mdd >>>
103 r.kieninger   1.54    //
104 mday          1.40    // don't hang if some thread exited without releasing
105 r.kieninger   1.54    // a semaphore.
106 mday          1.40    if(0 == pthread_mutex_destroy(&_mutex.mut))
107                          pthread_mutexattr_destroy(&_mutex.mutatt);
108 mike          1.2  }
109                    
110                    
111 r.kieninger   1.54 // block until gaining the lock - throw a deadlock
112                    // exception if process already holds the lock
113 david.dillard 1.59  void Mutex::lock(PEGASUS_THREAD_TYPE caller)
114 mday          1.9  {
115                       int errorcode;
116 r.kieninger   1.54    if( 0 == (errorcode = pthread_mutex_lock(&(_mutex.mut))))
117 mday          1.9     {
118                          _mutex.owner = caller;
119                          return;
120                       }
121 r.kieninger   1.54    if (errorcode == EDEADLK)
122 mday          1.9        throw( Deadlock( _mutex.owner ) );
123 r.kieninger   1.54    else
124 mday          1.9        throw( WaitFailed( _mutex.owner) );
125                    }
126 r.kieninger   1.54 
127                    // try to gain the lock - lock succeeds immediately if the
128 mday          1.9  // mutex is not already locked. throws an exception and returns
129 r.kieninger   1.54 // immediately if the mutex is currently locked.
130 david.dillard 1.59  void Mutex::try_lock(PEGASUS_THREAD_TYPE caller)
131 mday          1.9  {
132                       int errorcode ;
133 r.kieninger   1.54    if(0 == (errorcode = pthread_mutex_trylock(&_mutex.mut)))
134 mday          1.9     {
135                          _mutex.owner = caller;
136                          return;
137                       }
138 r.kieninger   1.54    else if (errorcode == EBUSY)
139 mday          1.9        throw(AlreadyLocked(_mutex.owner));
140 r.kieninger   1.54    else if (errorcode == EDEADLK)
141 mday          1.9        throw(Deadlock(_mutex.owner));
142                       else
143                          throw(WaitFailed(_mutex.owner));
144                    }
145                    
146                    // wait for milliseconds and throw an exception then return if the wait
147                    // expires without gaining the lock. Otherwise return without throwing an
148                    // exception.
149                    
150 r.kieninger   1.54 // Note: I was unable to get the expected behavior using pthread_mutex_timedlock.
151 mday          1.9  // I don't know excactly why, but the locks were never timing out. Reimplemting
152                    // using pthread_mutex_trylock works reliably. The documentation says that
153                    // pthread_mutex_timedlock works with error checking mutexes but works
154                    // just like pthread_mutex_lock (i.e., it never times out) with other
155                    // kinds of mutexes. I couldn't determine whether or not it actually
156                    // works with any type of mutex other than PTHREAD_MUTEX_TIMED_NP.
157                    // However, we want the mutexes to be error checking whenever possible
158                    // mdday Sun Aug  5 13:08:43 2001
159                    
160                    // pthread_mutex_timedlock is not supported on HUPX
161                    // mdday Sun Aug  5 14:12:22 2001
162                    
163 r.kieninger   1.54  void Mutex::timed_lock( Uint32 milliseconds , PEGASUS_THREAD_TYPE caller)
164 mday          1.9  {
165                    
166 mday          1.11    struct timeval now, finish, remaining;
167 mday          1.9     int errorcode;
168 r.kieninger   1.54 
169 mday          1.11    Uint32 usec;
170 r.kieninger   1.54 
171 mday          1.11    gettimeofday(&finish, NULL);
172                       finish.tv_sec += (milliseconds / 1000 );
173                       milliseconds %= 1000;
174                       usec = finish.tv_usec + ( milliseconds * 1000 );
175                       finish.tv_sec += (usec / 1000000);
176                       finish.tv_usec = usec % 1000000;
177 mday          1.9  
178                       while(1)
179                       {
180                          errorcode = pthread_mutex_trylock(&_mutex.mut);
181                          if (errorcode == 0 )
182 david.dillard 1.59          break;
183 r.kieninger   1.54 
184 mday          1.9        if(errorcode == EBUSY)
185                          {
186 kumpf         1.60          gettimeofday(&now, NULL);
187                             if ( timeval_subtract( &remaining, &finish, &now ))
188                             {
189                                throw TimeOut(pegasus_thread_self());
190                             }
191                             pegasus_yield();
192                             continue;
193 mday          1.9        }
194                          if( errorcode == EDEADLK )
195 kumpf         1.60          throw Deadlock(pegasus_thread_self());
196 mday          1.9        throw WaitFailed(pegasus_thread_self());
197                       }
198                    }
199                    
200                    // unlock the mutex
201 david.dillard 1.59  void Mutex::unlock()
202 mday          1.9  {
203                       PEGASUS_THREAD_TYPE m_owner = _mutex.owner;
204                       _mutex.owner = 0;
205 r.kieninger   1.54    if(0 != pthread_mutex_unlock(&_mutex.mut))
206 mday          1.9     {
207                          _mutex.owner = m_owner;
208                          throw(Permission(_mutex.owner));
209                       }
210                    }
211                    
212 mike          1.2  
213 r.kieninger   1.54 #ifdef PEGASUS_READWRITE_NATIVE
214 mike          1.2  //-----------------------------------------------------------------
215                    /// Native Implementation of Read/Write semaphore
216                    //-----------------------------------------------------------------
217                    
218                    
219 david.dillard 1.59 ReadWriteSem::ReadWriteSem() :  _readers(0), _writers(0)
220 mike          1.2  {
221                       pthread_rwlock_init(&_rwlock.rwlock, NULL);
222                       _rwlock.owner = 0;
223                    }
224 r.kieninger   1.54 
225 david.dillard 1.59 ReadWriteSem::~ReadWriteSem()
226 mike          1.2  {
227                    
228                       while( EBUSY == pthread_rwlock_destroy(&_rwlock.rwlock))
229                       {
230                          pegasus_yield();
231                       }
232                    }
233                    
234                    
235 r.kieninger   1.54 void ReadWriteSem::wait(Uint32 mode, PEGASUS_THREAD_TYPE caller)
236 mike          1.2  {
237                       int errorcode;
238 r.kieninger   1.54    if (mode == PEG_SEM_READ)
239 mike          1.2     {
240                          if(0 == (errorcode = pthread_rwlock_rdlock(&_rwlock.rwlock)))
241                          {
242 kumpf         1.60          _readers++;
243                             return;
244 mike          1.2        }
245                       }
246                       else if (mode == PEG_SEM_WRITE)
247                       {
248                          if( 0 == (errorcode = pthread_rwlock_wrlock(&_rwlock.rwlock)))
249                          {
250 kumpf         1.60          _rwlock.owner = caller;
251                             _writers++;
252                             return;
253 mike          1.2        }
254                       }
255 r.kieninger   1.54    else
256 sage          1.10       throw(Permission(pegasus_thread_self()));
257 r.kieninger   1.54 
258 mike          1.2     if (errorcode == EDEADLK)
259                          throw(Deadlock(_rwlock.owner));
260                       else
261 sage          1.10       throw(WaitFailed(pegasus_thread_self()));
262 mike          1.2  }
263                    
264 r.kieninger   1.54 void ReadWriteSem::try_wait(Uint32 mode, PEGASUS_THREAD_TYPE caller)
265 mike          1.2  {
266                       int errorcode = 0;
267 r.kieninger   1.54    if (mode == PEG_SEM_READ)
268 mike          1.2     {
269                          if( 0 == (errorcode = pthread_rwlock_tryrdlock(&_rwlock.rwlock)))
270                          {
271 kumpf         1.60          _readers++;
272                             return;
273 mike          1.2        }
274                       }
275 r.kieninger   1.54    else if (mode == PEG_SEM_WRITE)
276 mike          1.2     {
277                          if(0 == (errorcode = pthread_rwlock_trywrlock(&_rwlock.rwlock)))
278                          {
279 kumpf         1.60          _writers++;
280                             _rwlock.owner = caller;
281                             return;
282 mike          1.2        }
283                       }
284 r.kieninger   1.54    else
285 sage          1.10       throw(Permission(pegasus_thread_self()));
286 mike          1.2  
287                       if (errorcode == EBUSY)
288                          throw(AlreadyLocked(_rwlock.owner));
289                       else if (errorcode == EDEADLK)
290                          throw(Deadlock(_rwlock.owner));
291                       else
292 sage          1.10       throw(WaitFailed(pegasus_thread_self()));
293 mike          1.2  }
294                    
295                    
296                    // timedrdlock and timedwrlock are not supported on HPUX
297                    // mdday Sun Aug  5 14:21:00 2001
298 r.kieninger   1.54 void ReadWriteSem::timed_wait(Uint32 mode, PEGASUS_THREAD_TYPE caller, int milliseconds)
299 mike          1.2  {
300 mday          1.11    int errorcode = 0, timeout ;
301                       struct timeval now, finish, remaining;
302                       Uint32 usec;
303 r.kieninger   1.54 
304 mday          1.11    gettimeofday(&finish, NULL);
305                       finish.tv_sec += (milliseconds / 1000 );
306                       milliseconds %= 1000;
307                       usec = finish.tv_usec + ( milliseconds * 1000 );
308                       finish.tv_sec += (usec / 1000000);
309                       finish.tv_usec = usec % 1000000;
310 r.kieninger   1.54 
311 mike          1.2     if (mode == PEG_SEM_READ)
312                       {
313                          do
314                          {
315 kumpf         1.60          errorcode = pthread_rwlock_tryrdlock(&_rwlock.rwlock);
316                             gettimeofday(&now, NULL);
317 mike          1.2        }
318 r.kieninger   1.54       while (errorcode == EBUSY &&
319 kumpf         1.60              ( 0 == (timeout = timeval_subtract(&remaining, &finish, &now ))));
320 mike          1.2        if(0 == errorcode)
321                          {
322 kumpf         1.60          _readers++;
323                             return;
324 mike          1.2        }
325                       }
326                       else if (mode == PEG_SEM_WRITE)
327                       {
328                          do
329                          {
330 kumpf         1.60          errorcode = pthread_rwlock_trywrlock(&_rwlock.rwlock);
331                             gettimeofday(&now, NULL);
332 mike          1.2        }
333 r.kieninger   1.54       while (errorcode == EBUSY &&
334 kumpf         1.60              ( 0 == (timeout = timeval_subtract(&remaining, &finish, &now ))));
335 mike          1.2  
336                          if(0 == errorcode)
337                          {
338 kumpf         1.60          _writers++;
339                             _rwlock.owner = caller;
340                             return;
341 mike          1.2        }
342                       }
343                       else
344 kumpf         1.22       throw(Permission(pegasus_thread_self()));
345 mday          1.11    if (timeout != 0 )
346 mike          1.2        throw(TimeOut(_rwlock.owner));
347                       else if (errorcode == EDEADLK)
348                          throw(Deadlock(_rwlock.owner));
349                       else
350 sage          1.10       throw(WaitFailed(pegasus_thread_self()));
351 mike          1.2  }
352                    
353 david.dillard 1.59 void ReadWriteSem::unlock(Uint32 mode, PEGASUS_THREAD_TYPE caller)
354 mike          1.2  {
355                       PEGASUS_THREAD_TYPE owner;
356                    
357                       if (mode == PEG_SEM_WRITE)
358                       {
359                          owner = _rwlock.owner;
360                          _rwlock.owner = 0;
361                       }
362                       if(0 != pthread_rwlock_unlock(&_rwlock.rwlock))
363                       {
364                          _rwlock.owner = owner;
365 sage          1.10       throw(Permission(pegasus_thread_self()));
366 mike          1.2     }
367 mike          1.64    if(mode == PEG_SEM_READ && _readers.get() != 0 )
368 mike          1.2        _readers--;
369 mike          1.64    else if (_writers.get() != 0 )
370 mike          1.2        _writers--;
371                    }
372                    
373 kumpf         1.60 int ReadWriteSem::read_count() const
374 mike          1.2  
375                    {
376 david.eger    1.39 #if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
377 mike          1.64    PEGASUS_ASSERT(_readers.get() ==  _rwlock.rwlock.__rw_readers);
378 mike          1.2  #endif
379 mike          1.64    return( _readers.get() );
380 mike          1.2  }
381                    
382 kumpf         1.60 int ReadWriteSem::write_count() const
383 mike          1.2  {
384 david.eger    1.39 #if defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
385 r.kieninger   1.54    if(_rwlock.rwlock.__rw_writer != NULL)
386 mike          1.2     {
387 mike          1.64       PEGASUS_ASSERT(_writers.get()  == 1);
388 mike          1.2     }
389                    #endif
390 mike          1.64    return( _writers.get() );
391 mike          1.2  }
392                    
393 r.kieninger   1.54 #endif // PEGASUS_READWRITE_NATIVE
394 mike          1.2  //-----------------------------------------------------------------
395                    // END of native read/write implementation for unix
396                    //-----------------------------------------------------------------
397                    
398                    
399                    
400                    //-----------------------------------------------------------------
401                    // Native implementation of Conditional semaphore object
402                    //-----------------------------------------------------------------
403                    
404                    #ifdef PEGASUS_CONDITIONAL_NATIVE
405                    
406                    // Note: I felt uncomfortable exposing the condition mutex outside
407 r.kieninger   1.54 // of the class so I defined method calls to lock and unlock the
408 mike          1.2  // mutex object. This protects the (hidden) conditional mutex from
409                    // being called outside of the control of the object.
410                    
411 r.kieninger   1.54 // Further, the use model of conditions seems to require locking the object,
412                    // examining the state of the condition variable while that variable is
413 mike          1.2  // protected from other threads, and then determining whether to signal or
414                    //    wait on the condition variable. Then afterwards explicitly unlocking
415 r.kieninger   1.54 // the condition object.
416 mike          1.2  
417 r.kieninger   1.54 // So I commented out the method calls that do all three operations
418                    // without examining the state of the condition variable.
419                    // i.e., lock, signal, unlock or lock, wait, unlock.
420 mike          1.2  
421                    //    The method calls I commented out are: wait, signal, time_wait.
422                    // mdday Sun Aug  5 13:19:30 2001
423                    
424                    /// Conditions are implemented as process-wide condition variables
425                    Condition::Condition() : _disallow(0)
426                    {
427 a.arora       1.44    _cond_mutex.reset(new Mutex());
428 mike          1.2     _destroy_mut = true;
429                       pthread_cond_init((PEGASUS_COND_TYPE *)&_condition, 0);
430                    
431 kumpf         1.31 // #ifdef PEGASUS_PLATFORM_HPUX_ACC
432 kumpf         1.14 //    // HP-UX can not deal with the non-static structure assignment.
433                    //    // Also, the (PEGASUS_COND_TYPE) cast seems to break the HP-UX compile.
434 mike          1.2  //    PEGASUS_COND_TYPE tmpCond = PTHREAD_COND_INITIALIZER;
435                    //    memcpy(&_condition, &tmpCond, sizeof(PEGASUS_COND_TYPE));
436                    // #else
437                    //    _condition = (PEGASUS_COND_TYPE) PTHREAD_COND_INITIALIZER;
438                    // #endif
439                    }
440                    
441                    //#if defined(PEGASUS_PLATFORM_LINUX_IX86_GNU)
442 aruran.ms     1.62 Condition::Condition(Mutex& mutex)  : _disallow(0)
443 mike          1.2  {
444 aruran.ms     1.62    _cond_mutex.reset(&mutex);
445 mike          1.2     _destroy_mut = false;
446                       pthread_cond_init((PEGASUS_COND_TYPE *)&_condition, 0);
447                    }
448 kumpf         1.31 // #elif defined(PEGASUS_PLATFORM_HPUX_ACC)
449 mike          1.2  // Condition::Condition(const Mutex& mutex)  : _disallow(0)
450                    // {
451                    //    _cond_mutex = Mutex(mutex);
452                    //    PEGASUS_COND_TYPE tmpCond = PTHREAD_COND_INITIALIZER;
453                    //    memcpy(&_condition, &tmpCond, sizeof(PEGASUS_COND_TYPE));
454                    // }
455                    // #endif
456                    
457                    Condition::~Condition()
458                    {
459                       _disallow++;
460                       while(EBUSY == pthread_cond_destroy(&_condition))
461                       {
462                          pthread_cond_broadcast(&_condition);
463                          pegasus_yield();
464                       }
465                       if(_destroy_mut == true)
466 a.arora       1.44       _cond_mutex.reset();
467                       else
468                          _cond_mutex.release();
469 mike          1.2  }
470                    
471 kumpf         1.58 void Condition::signal(PEGASUS_THREAD_TYPE caller)
472 r.kieninger   1.54 {
473                       _cond_mutex->lock(caller);
474 mday          1.9     pthread_cond_broadcast(&_condition);
475 r.kieninger   1.54    _cond_mutex->unlock();
476 mday          1.9  }
477                    
478                    
479 kumpf         1.58 void Condition::unlocked_signal(PEGASUS_THREAD_TYPE caller)
480 mday          1.9  {
481 kumpf         1.58    if (_cond_mutex->get_owner() != caller)
482                       {
483 mday          1.9        throw Permission(_cond_mutex->get_owner());
484 kumpf         1.58    }
485                    
486 mday          1.9     pthread_cond_broadcast(&_condition);
487                    }
488                    
489                    
490 kumpf         1.58 void Condition::lock_object(PEGASUS_THREAD_TYPE caller)
491 mday          1.9  {
492 mday          1.18 
493 mike          1.63    if(_disallow.get() > 0)
494 mday          1.9        throw ListClosed();
495                       _cond_mutex->lock(caller);
496                    }
497                    
498 kumpf         1.58 void Condition::try_lock_object(PEGASUS_THREAD_TYPE caller)
499 mday          1.9  {
500 mike          1.63    if(_disallow.get() > 0)
501 mday          1.9        throw ListClosed();
502                       _cond_mutex->try_lock(caller);
503                    }
504                    
505 kumpf         1.58 void Condition::wait_lock_object(PEGASUS_THREAD_TYPE caller, int milliseconds)
506 mday          1.9  {
507 mike          1.63    if(_disallow.get() > 0)
508 mday          1.9        throw ListClosed();
509                       _cond_mutex->timed_lock(milliseconds, caller);
510 mike          1.63    if( _disallow.get() > 0 )
511 mday          1.9     {
512                          _cond_mutex->unlock();
513                          throw ListClosed();
514                       }
515                    }
516                    
517 david.dillard 1.59 void Condition::unlock_object()
518 mday          1.9  {
519                       _cond_mutex->unlock();
520                    }
521                    
522                    
523 r.kieninger   1.54 // block until this semaphore is in a signalled state
524 kumpf         1.58 void Condition::unlocked_wait(PEGASUS_THREAD_TYPE caller)
525 mday          1.9  {
526 kumpf         1.58    // The caller must own the Mutex in order to wait on the Condition
527                       if (_cond_mutex->get_owner() != caller)
528                       {
529                          throw Permission(_cond_mutex->get_owner());
530                       }
531                    
532 mike          1.63    if(_disallow.get() > 0)
533 mday          1.9     {
534                          _cond_mutex->unlock();
535                          throw ListClosed();
536                       }
537 kumpf         1.58 
538                       // pthread_cond_timedwait will release the Mutex
539 david         1.65 #ifdef PEGASUS_OS_OS400
540                       PEGASUS_THREAD_TYPE tmp;
541                       tmp =0;
542                       _cond_mutex->_set_owner(tmp);
543                    #else
544 kumpf         1.58    _cond_mutex->_set_owner(0);
545 david         1.65 #endif
546 kumpf         1.58 
547 mday          1.9     pthread_cond_wait(&_condition, &_cond_mutex->_mutex.mut);
548 kumpf         1.58 
549                       // The caller holds the Mutex again when pthread_cond_timedwait returns
550 mday          1.9     _cond_mutex->_set_owner(caller);
551                    }
552                    
553 r.kieninger   1.54 // block until this semaphore is in a signalled state
554 kumpf         1.58 void Condition::unlocked_timed_wait(
555                       int milliseconds,
556 david.dillard 1.59    PEGASUS_THREAD_TYPE caller)
557 mday          1.9  {
558 kumpf         1.58    // The caller must own the Mutex in order to wait on the Condition
559                       if (_cond_mutex->get_owner() != caller)
560                       {
561                          throw Permission(_cond_mutex->get_owner());
562                       }
563                    
564 mike          1.63    if (_disallow.get() > 0)
565 mday          1.9     {
566                          _cond_mutex->unlock();
567                          throw ListClosed();
568                       }
569 kumpf         1.58 
570 mday          1.9     struct timeval now;
571                       struct timespec waittime;
572                       gettimeofday(&now, NULL);
573                       waittime.tv_sec = now.tv_sec;
574                       waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);  // microseconds
575                       waittime.tv_sec += (waittime.tv_nsec / 1000000);  // roll overflow into
576                       waittime.tv_nsec = (waittime.tv_nsec % 1000000);  // the "seconds" part
577                       waittime.tv_nsec = waittime.tv_nsec * 1000;  // convert to nanoseconds
578                    
579 kumpf         1.58    // pthread_cond_timedwait will release the Mutex
580 david         1.65 #ifdef PEGASUS_OS_OS400
581                       PEGASUS_THREAD_TYPE tmp;
582                       tmp =0;
583                       _cond_mutex->_set_owner(tmp);
584                    #else
585 kumpf         1.58    _cond_mutex->_set_owner(0);
586 david         1.65 #endif
587 kumpf         1.58 
588                       int retcode = pthread_cond_timedwait(
589                          &_condition, &_cond_mutex->_mutex.mut, &waittime);
590 mday          1.9  
591 kumpf         1.58    // The caller holds the Mutex again when pthread_cond_timedwait returns
592 mday          1.9     _cond_mutex->_set_owner(caller);
593 r.kieninger   1.54 
594 kumpf         1.58    if (retcode == ETIMEDOUT)
595                       {
596                          throw TimeOut(caller);
597                       }
598                       else if (retcode != EINTR)
599                       {
600                          throw WaitFailed(caller);
601                       }
602 mday          1.9  }
603                    
604 mike          1.2  #endif // native conditional semaphore
605                    //-----------------------------------------------------------------
606                    // END of native conditional semaphore implementation
607                    //-----------------------------------------------------------------
608                    
609 mday          1.9  
610                    
611                    //-----------------------------------------------------------------
612                    // Native implementation of semaphore object
613                    //-----------------------------------------------------------------
614                    
615 konrad.r      1.57 #if !defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) && !defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) && !defined(PEGASUS_PLATFORM_DARWIN_PPC_GNU) && !defined(USE_PTHREAD_COND_IN_SEMAPHORE)
616 r.kieninger   1.54 
617                    Semaphore::Semaphore(Uint32 initial)
618 mike          1.2  {
619                       if(initial > SEM_VALUE_MAX)
620                          initial = SEM_VALUE_MAX - 1;
621                       sem_init(&_semaphore.sem,0,initial);
622 kumpf         1.22    _semaphore.owner = pegasus_thread_self();
623 mike          1.2  }
624                    
625                    Semaphore::~Semaphore()
626                    {
627 kumpf         1.60    while (EBUSY == sem_destroy(&_semaphore.sem))
628 mike          1.2     {
629                          pegasus_yield();
630                       }
631                    }
632                    
633 chuck         1.37 // block until this semaphore is in a signalled state, or
634 r.kieninger   1.54 // throw an exception if the wait fails
635 david.dillard 1.59 void Semaphore::wait(Boolean ignoreInterrupt)
636 mday          1.9  {
637 kumpf         1.60     do
638                        {
639                            int rc = sem_wait(&_semaphore.sem);
640                            if (rc == 0)
641                                break;
642                    
643                            int e = errno;
644                            if (e == EINTR)
645                            {
646                                if (ignoreInterrupt == false)
647                                    throw(WaitInterrupted(_semaphore.owner));
648                            }
649                            else throw(WaitFailed(_semaphore.owner));
650 brian.campbell 1.50 
651 kumpf          1.60         // keep going if above conditions fail
652                         } while (true);
653 r.kieninger    1.54 
654 mday           1.9  }
655                     
656 r.kieninger    1.54 // wait succeeds immediately if semaphore has a non-zero count,
657                     // return immediately and throw and exception if the
658                     // count is zero.
659 david.dillard  1.59 void Semaphore::try_wait()
660 mday           1.9  {
661 r.kieninger    1.54    if (sem_trywait(&_semaphore.sem))
662 mday           1.9        throw(WaitFailed(_semaphore.owner));
663                     }
664                     
665                     
666                     
667                     
668 r.kieninger    1.54 // Note: I could not get sem_timed_wait to work reliably.
669                     // See my comments above on mut timed_wait.
670                     // I reimplemented using try_wait, which works reliably.
671 mday           1.9  // mdd Sun Aug  5 13:25:31 2001
672                     
673                     // wait for milliseconds and throw an exception
674                     // if wait times out without gaining the semaphore
675 david.dillard  1.59 void Semaphore::time_wait(Uint32 milliseconds)
676 mday           1.9  {
677                        int retcode, i = 0;
678                     
679                        struct timeval now, finish, remaining;
680 mday           1.11    Uint32 usec;
681 r.kieninger    1.54 
682 mday           1.11    gettimeofday(&finish, NULL);
683                        finish.tv_sec += (milliseconds / 1000 );
684                        milliseconds %= 1000;
685                        usec = finish.tv_usec + ( milliseconds * 1000 );
686                        finish.tv_sec += (usec / 1000000);
687                        finish.tv_usec = usec % 1000000;
688 r.kieninger    1.54 
689 mday           1.9     while( 1 )
690                        {
691 r.kieninger    1.54       do
692 mday           1.9        {
693 kumpf          1.60          retcode = sem_trywait(&_semaphore.sem);
694 mday           1.9        } while (retcode == -1 && errno == EINTR);
695                     
696                           if ( retcode == 0 )
697 kumpf          1.60          return ;
698 mday           1.9  
699                           if( retcode == -1 && errno != EAGAIN )
700 kumpf          1.60          throw IPCException(pegasus_thread_self());
701 mday           1.9        gettimeofday(&now, NULL);
702                           if (  timeval_subtract( &remaining, &finish, &now ) )
703 kumpf          1.60          throw TimeOut(pegasus_thread_self());
704 mday           1.9        pegasus_yield();
705                        }
706                     }
707                     
708 r.kieninger    1.54 // increment the count of the semaphore
709 kumpf          1.58 void Semaphore::signal()
710 mday           1.9  {
711                        sem_post(&_semaphore.sem);
712                     }
713                     
714                     // return the count of the semaphore
715 david.dillard  1.59 int Semaphore::count() const
716 mday           1.9  {
717                        sem_getvalue(&_semaphore.sem,&_count);
718 dudhe.girish   1.43    return _count;
719                     }
720                     
721                     #elif defined(PEGASUS_PLATFORM_DARWIN_PPC_GNU)
722                     
723                     Semaphore::Semaphore(Uint32 initial)
724                     {
725                        if(initial > SEM_VALUE_MAX)
726                           initial = SEM_VALUE_MAX - 1;
727                        _semaphore.sem = sem_open("peg4",O_CREAT,0,initial);
728                        sem_unlink("peg4");
729                        _semaphore.owner = pegasus_thread_self();
730                        _semaphore.waiters = 0;
731                     
732                     }
733                     
734                     Semaphore::~Semaphore()
735                     {
736 kumpf          1.60     if ( _semaphore.waiters == 0 )
737                             sem_close(_semaphore.sem);
738 dudhe.girish   1.43 }
739                     
740                     // block until this semaphore is in a signalled state, or
741                     // throw an exception if the wait fails
742 david.dillard  1.59 void Semaphore::wait(Boolean ignoreInterrupt)
743 dudhe.girish   1.43 {
744                        _semaphore.waiters++;
745                        if (sem_wait(_semaphore.sem))
746                           throw(WaitFailed(_semaphore.owner));
747                        _semaphore.waiters--;
748                     }
749                     
750                     // wait succeeds immediately if semaphore has a non-zero count,
751                     // return immediately and throw and exception if the
752                     // count is zero.
753 david.dillard  1.59 void Semaphore::try_wait()
754 dudhe.girish   1.43 {
755                        if (sem_trywait(_semaphore.sem))
756                           throw(WaitFailed(_semaphore.owner));
757                     }
758                     
759                     // wait for milliseconds and throw an exception
760                     // if wait times out without gaining the semaphore
761 david.dillard  1.59 void Semaphore::time_wait(Uint32 milliseconds)
762 dudhe.girish   1.43 {
763                        int retcode, i = 0;
764                     
765                        struct timeval now, finish, remaining;
766                        Uint32 usec;
767                     
768                        gettimeofday(&finish, NULL);
769                        finish.tv_sec += (milliseconds / 1000 );
770                        milliseconds %= 1000;
771                        usec = finish.tv_usec + ( milliseconds * 1000 );
772                        finish.tv_sec += (usec / 1000000);
773                        finish.tv_usec = usec % 1000000;
774                     
775                        while( 1 )
776                        {
777                           do
778                           {
779                              retcode = sem_trywait(_semaphore.sem);
780                           } while (retcode == -1 && errno == EINTR);
781                     
782                           if ( retcode == 0 )
783 dudhe.girish   1.43          return ;
784                     
785                           if( retcode == -1 && errno != EAGAIN )
786                              throw IPCException(pegasus_thread_self());
787                           gettimeofday(&now, NULL);
788                           if (  timeval_subtract( &remaining, &finish, &now ) )
789                              throw TimeOut(pegasus_thread_self());
790                           pegasus_yield();
791                        }
792                     }
793                     
794                     // increment the count of the semaphore
795 kumpf          1.58 void Semaphore::signal()
796 dudhe.girish   1.43 {
797                        sem_post(_semaphore.sem);
798                     }
799                     
800                     // return the count of the semaphore
801 kumpf          1.61  int Semaphore::count() const
802 dudhe.girish   1.43 {
803                        sem_getvalue(_semaphore.sem,&_count);
804 mday           1.9     return _count;
805                     }
806 mike           1.2  
807                     #else
808                     //
809                     // implementation as used in ACE derived from Mutex + Condition Variable
810                     //
811 r.kieninger    1.54 Semaphore::Semaphore(Uint32 initial)
812 mike           1.2  {
813 sage           1.4      pthread_mutex_init (&_semaphore.mutex,NULL);
814                         pthread_cond_init (&_semaphore.cond,NULL);
815 mike           1.2      if (initial > SEM_VALUE_MAX)
816                              _count = SEM_VALUE_MAX - 1;
817                         else
818                              _count = initial;
819 sage           1.5      _semaphore.owner = pegasus_thread_self();
820 mike           1.2      _semaphore.waiters = 0;
821                     }
822                     
823                     Semaphore::~Semaphore()
824                     {
825 dan            1.30 #ifndef PEGASUS_PLATFORM_AIX_RS_IBMCXX
826 mike           1.2     pthread_mutex_lock(&_semaphore.mutex);
827                        while( EBUSY == pthread_cond_destroy(&_semaphore.cond))
828                        {
829                           pthread_mutex_unlock(&_semaphore.mutex);
830                           pegasus_yield();
831 sage           1.25       pthread_mutex_lock(&_semaphore.mutex);
832 mike           1.2     }
833                        pthread_mutex_unlock(&_semaphore.mutex);
834                        pthread_mutex_destroy(&_semaphore.mutex);
835 dan            1.30 #else
836                        int val;
837                        val = pthread_mutex_destroy(&_semaphore.mutex);
838                        if (val != 0)
839                           pthread_cond_destroy(&_semaphore.cond);
840 r.kieninger    1.54    else
841 dan            1.30       val = pthread_cond_destroy(&_semaphore.cond);
842                     
843                        while( EBUSY == val )
844                        {
845                           pegasus_yield();
846                           val = pthread_mutex_destroy(&_semaphore.mutex);
847 r.kieninger    1.54       if (val != 0)
848 dan            1.30          pthread_cond_destroy(&_semaphore.cond);
849 r.kieninger    1.54       else
850 dan            1.30          val = pthread_cond_destroy(&_semaphore.cond);
851                        }
852                     #endif
853 mike           1.2  }
854                     
855 dan            1.29 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
856 r.kieninger    1.54 // cleanup function
857 sage           1.25 static void semaphore_cleanup(void *arg)
858                     {
859                        //cast back to proper type and unlock mutex
860                        PEGASUS_SEM_HANDLE *s = (PEGASUS_SEM_HANDLE *)arg;
861                        pthread_mutex_unlock(&s->mutex);
862                     }
863 sage           1.28 #endif
864 sage           1.25 
865                     
866 chuck          1.37 // block until this semaphore is in a signalled state or
867                     // throw an exception if the wait fails
868 david.dillard  1.59 void Semaphore::wait(Boolean ignoreInterrupt)
869 mike           1.2  {
870                        // Acquire mutex to enter critical section.
871                        pthread_mutex_lock (&_semaphore.mutex);
872                     
873 sage           1.25    // Push cleanup function onto cleanup stack
874                        // The mutex will unlock if the thread is killed early
875 dan            1.29 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
876 sage           1.27    native_cleanup_push(&semaphore_cleanup, &_semaphore);
877 sage           1.28 #endif
878 sage           1.25 
879 mike           1.2     // Keep track of the number of waiters so that <sema_post> works correctly.
880                        _semaphore.waiters++;
881                     
882                        // Wait until the semaphore count is > 0, then atomically release
883 r.kieninger    1.54    // <lock_> and wait for <count_nonzero_> to be signaled.
884 sage           1.4     while (_count == 0)
885                           pthread_cond_wait (&_semaphore.cond,&_semaphore.mutex);
886 mike           1.2  
887                        // <_semaphore.mutex> is now held.
888                     
889                        // Decrement the waiters count.
890                        _semaphore.waiters--;
891                     
892                        // Decrement the semaphore's count.
893                        _count--;
894                     
895 sage           1.25     // Since we push an unlock onto the cleanup stack
896                        // We will pop it off to release the mutex when leaving the critical section.
897 dan            1.29 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
898 sage           1.25    native_cleanup_pop(1);
899 konrad.r       1.55 #endif
900 sage           1.28    // Release mutex to leave critical section.
901                        pthread_mutex_unlock (&_semaphore.mutex);
902 mike           1.2  }
903                     
904 david.dillard  1.59 void Semaphore::try_wait()
905 mike           1.2  {
906                     // not implemented
907 sage           1.25       throw(WaitFailed(_semaphore.owner));
908 mike           1.2  }
909                     
910 david.dillard  1.59 void Semaphore::time_wait(Uint32 milliseconds)
911 mike           1.2  {
912 konrad.r       1.55    // Acquire mutex to enter critical section.
913                        pthread_mutex_lock (&_semaphore.mutex);
914                     
915 kumpf          1.67 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || \
916                         defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
917 konrad.r       1.55    // Push cleanup function onto cleanup stack
918                        // The mutex will unlock if the thread is killed early
919                        native_cleanup_push(&semaphore_cleanup, &_semaphore);
920                     #endif
921                     
922                        // Keep track of the number of waiters so that <sema_post> works correctly.
923                        _semaphore.waiters++;
924                     
925                        struct timeval now = {0,0};
926                        struct timespec waittime = {0,0};
927                        gettimeofday(&now, NULL);
928                        waittime.tv_sec = now.tv_sec;
929                        waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);  // microseconds
930                        waittime.tv_sec += (waittime.tv_nsec / 1000000);  // roll overflow into
931                        waittime.tv_nsec = (waittime.tv_nsec % 1000000);  // the "seconds" part
932                        waittime.tv_nsec = waittime.tv_nsec * 1000;  // convert to nanoseconds
933                     
934 kumpf          1.67    Boolean timedOut = false;
935 konrad.r       1.55 
936 kumpf          1.67    while ((_count == 0) && !timedOut)
937                        {
938                           int retcode = pthread_cond_timedwait(
939                              &_semaphore.cond, &_semaphore.mutex, &waittime);
940                     
941                           if ((retcode == -1) && (errno = ETIMEDOUT) && (_count == 0))
942                           {
943                              timedOut = true;
944                           }
945                        }
946                     
947                        if (!timedOut)
948                        {
949                           // Decrement the semaphore's count.
950                           _count--;
951                        }
952 konrad.r       1.55 
953                        // Decrement the waiters count.
954                        _semaphore.waiters--;
955                     
956 kumpf          1.67 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || \
957                         defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
958                        // Since we push an unlock onto the cleanup stack
959 konrad.r       1.55    // We will pop it off to release the mutex when leaving the critical section.
960                        native_cleanup_pop(1);
961                     #endif
962                     
963 kumpf          1.67    // Release mutex to leave critical section.
964 konrad.r       1.55    pthread_mutex_unlock (&_semaphore.mutex);
965                     
966 kumpf          1.67    if (timedOut)
967                        {
968                           throw TimeOut(pegasus_thread_self());
969                        }
970 mike           1.2  }
971                     
972 r.kieninger    1.54 // increment the count of the semaphore
973 mike           1.2  void Semaphore::signal()
974                     {
975                        pthread_mutex_lock (&_semaphore.mutex);
976                     
977                        // Always allow one thread to continue if it is waiting.
978                        if (_semaphore.waiters > 0)
979 sage           1.4        pthread_cond_signal (&_semaphore.cond);
980 mike           1.2  
981                        // Increment the semaphore's count.
982                        _count++;
983                     
984                        pthread_mutex_unlock (&_semaphore.mutex);
985                     }
986                     
987                     // return the count of the semaphore
988 kumpf          1.61 int Semaphore::count() const
989 mike           1.2  {
990                        return _count;
991                     }
992 mday           1.9  
993 mike           1.2  #endif
994                     
995                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2