(file) Return to IPCVms.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.11 //%2006////////////////////////////////////////////////////////////////////////
  2 gs.keenan 1.1  //
  3                // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4                // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5                // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6                // IBM Corp.; EMC Corporation, The Open Group.
  7                // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8                // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 carson.hovey 1.2  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10                   // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl         1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12                   // EMC Corporation; Symantec Corporation; The Open Group.
 13 gs.keenan    1.1  //
 14                   // Permission is hereby granted, free of charge, to any person obtaining a copy
 15                   // of this software and associated documentation files (the "Software"), to
 16                   // deal in the Software without restriction, including without limitation the
 17                   // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18                   // sell copies of the Software, and to permit persons to whom the Software is
 19                   // furnished to do so, subject to the following conditions:
 20 karl         1.11 // 
 21 gs.keenan    1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22                   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23                   // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24                   // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25                   // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26                   // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27                   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28                   // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29                   //
 30                   //==============================================================================
 31                   //
 32                   // Author: Markus Mueller (sedgewick_de@yahoo.de)
 33                   //
 34                   // Modified By: Mike Day (mdday@us.ibm.com) -- added native implementation
 35                   // of AtomicInt class, exceptions
 36 david.dillard 1.3  //          Ramnath Ravindran (Ramnath.Ravindran@compaq.com)
 37                    //          David Eger (dteger@us.ibm.com)
 38                    //          Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
 39                    //          David Dillard, VERITAS Software Corp.
 40                    //              (david.dillard@veritas.com)
 41 gs.keenan     1.8  //          Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
 42 gs.keenan     1.1  //
 43                    //%/////////////////////////////////////////////////////////////////////////////
 44                    
 45                    PEGASUS_NAMESPACE_BEGIN
 46                    
 47 gs.keenan     1.7  #define SEM_VALUE_MAX 0x0000ffff
 48 gs.keenan     1.1  
 49 gs.keenan     1.8  Mutex::Mutex()
 50 gs.keenan     1.1  {
 51 gs.keenan     1.8    pthread_mutexattr_init(&_mutex.mutatt);
 52                      pthread_mutex_init(&_mutex.mut, NULL);
 53 gs.keenan     1.7    _mutex.owner = 0;
 54 gs.keenan     1.1  }
 55                    
 56 gs.keenan     1.8  Mutex::Mutex(int mutex_type)
 57 gs.keenan     1.1  {
 58 gs.keenan     1.8    pthread_mutexattr_init(&_mutex.mutatt);
 59                      pthread_mutex_init(&_mutex.mut, &_mutex.mutatt);
 60 gs.keenan     1.7    _mutex.owner = 0;
 61 gs.keenan     1.1  }
 62                    
 63                    // to be able share the mutex between different condition variables
 64 gs.keenan     1.8  Mutex::Mutex(const Mutex & mutex)
 65 gs.keenan     1.1  {
 66 gs.keenan     1.7    // only copy the handle, not the entire object.
 67                      // avoid calling the destructor twice.
 68                      _mutex.mut = mutex._mutex.mut;
 69                      _mutex.owner = 0;
 70 gs.keenan     1.1  }
 71                    
 72 gs.keenan     1.8  Mutex::~Mutex()
 73 gs.keenan     1.1  {
 74 gs.keenan     1.8    if (0 == pthread_mutex_destroy(&_mutex.mut))
 75                        pthread_mutexattr_destroy(&_mutex.mutatt);
 76 gs.keenan     1.1  }
 77                    
 78 david.dillard 1.3  // block until gaining the lock - throw a deadlock
 79                    // exception if process already holds the lock
 80 gs.keenan     1.1  
 81 gs.keenan     1.8  void Mutex::lock(PEGASUS_THREAD_TYPE caller)
 82 gs.keenan     1.1  {
 83 gs.keenan     1.7    int errorcode;
 84 gs.keenan     1.8    if (0 == (errorcode = pthread_mutex_lock(&(_mutex.mut))))
 85 gs.keenan     1.7    {
 86                        _mutex.owner = caller;
 87                        return;
 88                      }
 89                      if (errorcode == EDEADLK)
 90                      {
 91 gs.keenan     1.8      throw(Deadlock(_mutex.owner));
 92 gs.keenan     1.7    }
 93                      else
 94                      {
 95 gs.keenan     1.8      throw(WaitFailed(_mutex.owner));
 96 gs.keenan     1.7    }
 97 gs.keenan     1.1  }
 98 david.dillard 1.3  
 99                    // try to gain the lock - lock succeeds immediately if the
100 gs.keenan     1.1  // mutex is not already locked. throws an exception and returns
101 david.dillard 1.3  // immediately if the mutex is currently locked.
102 gs.keenan     1.1  
103 gs.keenan     1.8  void Mutex::try_lock(PEGASUS_THREAD_TYPE caller)
104 gs.keenan     1.1  {
105 gs.keenan     1.7    int errorcode;
106 gs.keenan     1.8    if (0 == (errorcode = pthread_mutex_trylock(&_mutex.mut)))
107 gs.keenan     1.7    {
108                        _mutex.owner = caller;
109                        return;
110                      }
111                      else if (errorcode == EBUSY)
112                      {
113 gs.keenan     1.8      throw(AlreadyLocked(_mutex.owner));
114 gs.keenan     1.7    }
115                      else if (errorcode == EDEADLK)
116                      {
117 gs.keenan     1.8      throw(Deadlock(_mutex.owner));
118 gs.keenan     1.7    }
119                      else
120                      {
121 gs.keenan     1.8      throw(WaitFailed(_mutex.owner));
122 gs.keenan     1.7    }
123 gs.keenan     1.1  }
124                    
125                    // wait for milliseconds and throw an exception then return if the wait
126                    // expires without gaining the lock. Otherwise return without throwing an
127                    // exception.
128                    
129 david.dillard 1.3  // Note: I was unable to get the expected behavior using pthread_mutex_timedlock.
130 gs.keenan     1.1  // I don't know excactly why, but the locks were never timing out. Reimplemting
131                    // using pthread_mutex_trylock works reliably. The documentation says that
132                    // pthread_mutex_timedlock works with error checking mutexes but works
133                    // just like pthread_mutex_lock (i.e., it never times out) with other
134                    // kinds of mutexes. I couldn't determine whether or not it actually
135                    // works with any type of mutex other than PTHREAD_MUTEX_TIMED_NP.
136                    // However, we want the mutexes to be error checking whenever possible
137                    // mdday Sun Aug  5 13:08:43 2001
138                    
139                    // pthread_mutex_timedlock is not supported on HUPX
140                    // mdday Sun Aug  5 14:12:22 2001
141                    
142 gs.keenan     1.8  void Mutex::timed_lock(Uint32 milliseconds, PEGASUS_THREAD_TYPE caller)
143 gs.keenan     1.1  {
144                    
145 gs.keenan     1.7    struct timeval now,
146                        finish,
147                        remaining;
148                      int errorcode;
149                    
150                      Uint32 usec;
151                    
152 gs.keenan     1.8    gettimeofday(&finish, NULL);
153 gs.keenan     1.7    finish.tv_sec += (milliseconds / 1000);
154                      milliseconds %= 1000;
155                      usec = finish.tv_usec + (milliseconds * 1000);
156                      finish.tv_sec += (usec / 1000000);
157                      finish.tv_usec = usec % 1000000;
158                    
159                      while (1)
160                      {
161 gs.keenan     1.8      errorcode = pthread_mutex_trylock(&_mutex.mut);
162 gs.keenan     1.7      if (errorcode == 0)
163                        {
164                          break;
165                        }
166                    
167                        if (errorcode == EBUSY)
168                        {
169 gs.keenan     1.8        gettimeofday(&now, NULL);
170                          if (timeval_subtract(&remaining, &finish, &now))
171 gs.keenan     1.1        {
172 gs.keenan     1.8  	throw TimeOut(pegasus_thread_self());
173 gs.keenan     1.1        }
174 gs.keenan     1.8        pegasus_yield();
175 gs.keenan     1.7        continue;
176                        }
177                        if (errorcode == EDEADLK)
178                        {
179 gs.keenan     1.8        throw Deadlock(pegasus_thread_self());
180 gs.keenan     1.7      }
181 gs.keenan     1.8      throw WaitFailed(pegasus_thread_self());
182 gs.keenan     1.7    }
183 gs.keenan     1.1  }
184                    
185                    // unlock the mutex
186                    
187 gs.keenan     1.8  void Mutex::unlock()
188 gs.keenan     1.1  {
189 gs.keenan     1.7    PEGASUS_THREAD_TYPE m_owner = _mutex.owner;
190                      _mutex.owner = 0;
191 gs.keenan     1.8    if (0 != pthread_mutex_unlock(&_mutex.mut))
192 gs.keenan     1.7    {
193                        _mutex.owner = m_owner;
194 gs.keenan     1.8      throw(Permission(_mutex.owner));
195 gs.keenan     1.7    }
196 gs.keenan     1.1  }
197                    
198                    #ifdef PEGASUS_READWRITE_NATIVE
199                    
200                    //-----------------------------------------------------------------
201                    /// Native Implementation of Read/Write semaphore
202                    //-----------------------------------------------------------------
203                    
204 gs.keenan     1.8  ReadWriteSem:: ReadWriteSem():_readers(0), _writers(0)
205 gs.keenan     1.1  {
206 gs.keenan     1.8    pthread_rwlock_init(&_rwlock.rwlock, NULL);
207 gs.keenan     1.7    _rwlock.owner = 0;
208 gs.keenan     1.1  }
209 david.dillard 1.3  
210 gs.keenan     1.8  ReadWriteSem::~ReadWriteSem()
211 gs.keenan     1.1  {
212                    
213 gs.keenan     1.8    while (EBUSY == pthread_rwlock_destroy(&_rwlock.rwlock))
214 gs.keenan     1.7    {
215 gs.keenan     1.8      pegasus_yield();
216 gs.keenan     1.7    }
217 gs.keenan     1.1  }
218                    
219 gs.keenan     1.8  void ReadWriteSem::wait(Uint32 mode, PEGASUS_THREAD_TYPE caller)
220 gs.keenan     1.1  {
221 gs.keenan     1.7    int errorcode;
222                      if (mode == PEG_SEM_READ)
223                      {
224 gs.keenan     1.8      if (0 == (errorcode = pthread_rwlock_rdlock(&_rwlock.rwlock)))
225 gs.keenan     1.7      {
226                          _readers++;
227                          return;
228                        }
229                      }
230                      else if (mode == PEG_SEM_WRITE)
231                      {
232 gs.keenan     1.8      if (0 == (errorcode = pthread_rwlock_wrlock(&_rwlock.rwlock)))
233 gs.keenan     1.7      {
234                          _rwlock.owner = caller;
235                          _writers++;
236                          return;
237                        }
238                      }
239                      else
240 gs.keenan     1.8      throw(Permission(pegasus_thread_self()));
241 gs.keenan     1.7  
242                      if (errorcode == EDEADLK)
243                      {
244 gs.keenan     1.8      throw(Deadlock(_rwlock.owner));
245 gs.keenan     1.7    }
246                      else
247                      {
248 gs.keenan     1.8      throw(WaitFailed(pegasus_thread_self()));
249 gs.keenan     1.7    }
250                    }
251                    
252 gs.keenan     1.8  void ReadWriteSem::try_wait(Uint32 mode, PEGASUS_THREAD_TYPE caller)
253 gs.keenan     1.7  {
254                      int errorcode = 0;
255                      if (mode == PEG_SEM_READ)
256                      {
257 gs.keenan     1.8      if (0 == (errorcode = pthread_rwlock_tryrdlock(&_rwlock.rwlock)))
258 gs.keenan     1.7      {
259                          _readers++;
260                          return;
261                        }
262                      }
263                      else if (mode == PEG_SEM_WRITE)
264                      {
265 gs.keenan     1.8      if (0 == (errorcode = pthread_rwlock_trywrlock(&_rwlock.rwlock)))
266 gs.keenan     1.7      {
267                          _writers++;
268                          _rwlock.owner = caller;
269                          return;
270                        }
271                      }
272                      else
273                      {
274 gs.keenan     1.8      throw(Permission(pegasus_thread_self()));
275 gs.keenan     1.7    }
276                    
277                      if (errorcode == EBUSY)
278                      {
279 gs.keenan     1.8      throw(AlreadyLocked(_rwlock.owner));
280 gs.keenan     1.7    }
281                      else if (errorcode == EDEADLK)
282                      {
283 gs.keenan     1.8      throw(Deadlock(_rwlock.owner));
284 gs.keenan     1.7    }
285                      else
286                      {
287 gs.keenan     1.8      throw(WaitFailed(pegasus_thread_self()));
288 gs.keenan     1.7    }
289 gs.keenan     1.1  }
290                    
291                    // timedrdlock and timedwrlock are not supported on HPUX
292                    // mdday Sun Aug  5 14:21:00 2001
293 gs.keenan     1.8  void ReadWriteSem::timed_wait(Uint32 mode, PEGASUS_THREAD_TYPE caller, int milliseconds)
294 gs.keenan     1.1  {
295 gs.keenan     1.7    int errorcode = 0,
296                        timeout;
297                      struct timeval now,
298                        finish,
299                        remaining;
300                      Uint32 usec;
301                    
302 gs.keenan     1.8    gettimeofday(&finish, NULL);
303 gs.keenan     1.7    finish.tv_sec += (milliseconds / 1000);
304                      milliseconds %= 1000;
305                      usec = finish.tv_usec + (milliseconds * 1000);
306                      finish.tv_sec += (usec / 1000000);
307                      finish.tv_usec = usec % 1000000;
308                    
309                      if (mode == PEG_SEM_READ)
310                      {
311                        do
312                        {
313 gs.keenan     1.8        errorcode = pthread_rwlock_tryrdlock(&_rwlock.rwlock);
314                          gettimeofday(&now, NULL);
315 gs.keenan     1.7      }
316                        while (errorcode == EBUSY &&
317 gs.keenan     1.8  	 (0 == (timeout = timeval_subtract(&remaining, &finish, &now))));
318 gs.keenan     1.7      if (0 == errorcode)
319                        {
320                          _readers++;
321                          return;
322                        }
323                      }
324                      else if (mode == PEG_SEM_WRITE)
325                      {
326                        do
327                        {
328 gs.keenan     1.8        errorcode = pthread_rwlock_trywrlock(&_rwlock.rwlock);
329                          gettimeofday(&now, NULL);
330 gs.keenan     1.7      }
331                        while (errorcode == EBUSY &&
332 gs.keenan     1.8  	 (0 == (timeout = timeval_subtract(&remaining, &finish, &now))));
333 gs.keenan     1.7  
334                        if (0 == errorcode)
335                        {
336                          _writers++;
337                          _rwlock.owner = caller;
338                          return;
339                        }
340                      }
341                      else
342                      {
343 gs.keenan     1.8      throw(Permission(pegasus_thread_self()));
344 gs.keenan     1.7    }
345                      if (timeout != 0)
346                      {
347 gs.keenan     1.8      throw(TimeOut(_rwlock.owner));
348 gs.keenan     1.7    }
349                      else if (errorcode == EDEADLK)
350                      {
351 gs.keenan     1.8      throw(Deadlock(_rwlock.owner));
352 gs.keenan     1.7    }
353                      else
354                      {
355 gs.keenan     1.8      throw(WaitFailed(pegasus_thread_self()));
356 gs.keenan     1.7    }
357                    }
358                    
359 gs.keenan     1.8  void ReadWriteSem::unlock(Uint32 mode, PEGASUS_THREAD_TYPE caller)
360 gs.keenan     1.7  {
361                      PEGASUS_THREAD_TYPE owner;
362                    
363                      if (mode == PEG_SEM_WRITE)
364                      {
365                        owner = _rwlock.owner;
366                        _rwlock.owner = 0;
367                      }
368 gs.keenan     1.8    if (0 != pthread_rwlock_unlock(&_rwlock.rwlock))
369 gs.keenan     1.7    {
370                        _rwlock.owner = owner;
371 gs.keenan     1.8      throw(Permission(pegasus_thread_self()));
372 gs.keenan     1.7    }
373 mike          1.10   if (mode == PEG_SEM_READ && _readers.get() != 0)
374 gs.keenan     1.7    {
375                        _readers--;
376                      }
377 mike          1.10   else if (_writers.get() != 0)
378 gs.keenan     1.7    {
379                        _writers--;
380                      }
381                    }
382                    
383 gs.keenan     1.8  int ReadWriteSem::read_count() const
384 gs.keenan     1.7  {
385 mike          1.10   return (_readers.get());
386 gs.keenan     1.1  }
387                    
388 gs.keenan     1.8  int ReadWriteSem::write_count() const
389 gs.keenan     1.1  {
390 mike          1.10   return (_writers.get());
391 gs.keenan     1.1  }
392                    
393 gs.keenan     1.7  #endif				// PEGASUS_READWRITE_NATIVE
394 gs.keenan     1.1  //-----------------------------------------------------------------
395                    // END of native read/write implementation for unix
396                    //-----------------------------------------------------------------
397                    
398                    //-----------------------------------------------------------------
399                    // Native implementation of Conditional semaphore object
400                    //-----------------------------------------------------------------
401                    
402                    #ifdef PEGASUS_CONDITIONAL_NATIVE
403                    
404                    // Note: I felt uncomfortable exposing the condition mutex outside
405 david.dillard 1.3  // of the class so I defined method calls to lock and unlock the
406 gs.keenan     1.1  // mutex object. This protects the (hidden) conditional mutex from
407                    // being called outside of the control of the object.
408                    
409 david.dillard 1.3  // Further, the use model of conditions seems to require locking the object,
410                    // examining the state of the condition variable while that variable is
411 gs.keenan     1.1  // protected from other threads, and then determining whether to signal or
412                    //    wait on the condition variable. Then afterwards explicitly unlocking
413 david.dillard 1.3  // the condition object.
414 gs.keenan     1.1  
415 david.dillard 1.3  // So I commented out the method calls that do all three operations
416                    // without examining the state of the condition variable.
417                    // i.e., lock, signal, unlock or lock, wait, unlock.
418 gs.keenan     1.1  
419                    //    The method calls I commented out are: wait, signal, time_wait.
420                    // mdday Sun Aug  5 13:19:30 2001
421                    
422                    /// Conditions are implemented as process-wide condition variables
423                    
424 gs.keenan     1.8  Condition::Condition():_disallow(0)
425 gs.keenan     1.1  {
426 gs.keenan     1.8    _cond_mutex.reset(new Mutex());
427 gs.keenan     1.7    _destroy_mut = true;
428 gs.keenan     1.8    pthread_cond_init((PEGASUS_COND_TYPE *) & _condition, 0);
429 gs.keenan     1.1  }
430                    
431 gs.keenan     1.8  Condition:: Condition(Mutex & mutex):_disallow(0)
432 gs.keenan     1.1  {
433 gs.keenan     1.8    _cond_mutex.reset(&mutex);
434 gs.keenan     1.7    _destroy_mut = false;
435 gs.keenan     1.8    pthread_cond_init((PEGASUS_COND_TYPE *) & _condition, 0);
436 gs.keenan     1.1  }
437                    
438 gs.keenan     1.8  Condition::~Condition()
439 gs.keenan     1.1  {
440 gs.keenan     1.7    _disallow++;
441 gs.keenan     1.8    while (EBUSY == pthread_cond_destroy(&_condition))
442 gs.keenan     1.7    {
443 gs.keenan     1.8      pthread_cond_broadcast(&_condition);
444                        pegasus_yield();
445 gs.keenan     1.7    }
446                      if (_destroy_mut == true)
447                      {
448 gs.keenan     1.8      _cond_mutex.reset();
449 gs.keenan     1.7    }
450                      else
451                      {
452 gs.keenan     1.8      _cond_mutex.release();
453 gs.keenan     1.7    }
454 gs.keenan     1.1  }
455                    
456 gs.keenan     1.8  void Condition::signal(PEGASUS_THREAD_TYPE caller)
457 david.dillard 1.3  {
458 gs.keenan     1.8    _cond_mutex->lock(caller);
459                      pthread_cond_broadcast(&_condition);
460                      _cond_mutex->unlock();
461 gs.keenan     1.1  }
462                    
463 gs.keenan     1.8  void Condition::unlocked_signal(PEGASUS_THREAD_TYPE caller)
464 gs.keenan     1.1  {
465 gs.keenan     1.8    if (_cond_mutex->get_owner() != caller)
466 gs.keenan     1.7    {
467 gs.keenan     1.8      throw Permission(_cond_mutex->get_owner());
468 gs.keenan     1.7    }
469 gs.keenan     1.8    pthread_cond_broadcast(&_condition);
470 gs.keenan     1.1  }
471                    
472 gs.keenan     1.8  void Condition::lock_object(PEGASUS_THREAD_TYPE caller)
473 gs.keenan     1.1  {
474 mike          1.10   if (_disallow.get() > 0)
475 gs.keenan     1.7    {
476 gs.keenan     1.8      throw ListClosed();
477 gs.keenan     1.7    }
478 gs.keenan     1.8    _cond_mutex->lock(caller);
479 gs.keenan     1.1  }
480                    
481 gs.keenan     1.8  void Condition::try_lock_object(PEGASUS_THREAD_TYPE caller)
482 gs.keenan     1.1  {
483 mike          1.10   if (_disallow.get() > 0)
484 gs.keenan     1.7    {
485 gs.keenan     1.8      throw ListClosed();
486 gs.keenan     1.7    }
487 gs.keenan     1.8    _cond_mutex->try_lock(caller);
488 gs.keenan     1.1  }
489                    
490 gs.keenan     1.8  void Condition::wait_lock_object(PEGASUS_THREAD_TYPE caller, int milliseconds)
491 gs.keenan     1.1  {
492 mike          1.10   if (_disallow.get() > 0)
493 gs.keenan     1.7    {
494 gs.keenan     1.8      throw ListClosed();
495 gs.keenan     1.7    }
496 gs.keenan     1.8    _cond_mutex->timed_lock(milliseconds, caller);
497 mike          1.10   if (_disallow.get() > 0)
498 gs.keenan     1.7    {
499 gs.keenan     1.8      _cond_mutex->unlock();
500                        throw ListClosed();
501 gs.keenan     1.7    }
502 gs.keenan     1.1  }
503                    
504 gs.keenan     1.8  void Condition::unlock_object()
505 gs.keenan     1.1  {
506 gs.keenan     1.8    _cond_mutex->unlock();
507 gs.keenan     1.1  }
508                    
509 david.dillard 1.3  // block until this semaphore is in a signalled state
510 gs.keenan     1.1  
511 gs.keenan     1.8  void Condition::unlocked_wait(PEGASUS_THREAD_TYPE caller)
512 gs.keenan     1.1  {
513 gs.keenan     1.7    // The caller must own the Mutex in order to wait on the Condition
514                      if (_cond_mutex->get_owner() != caller)
515                      {
516                        throw Permission(_cond_mutex->get_owner());
517                      }
518                    
519 mike          1.10   if (_disallow.get() > 0)
520 gs.keenan     1.7    {
521 gs.keenan     1.8      _cond_mutex->unlock();
522                        throw ListClosed();
523 gs.keenan     1.7    }
524                    
525                      // pthread_cond_timedwait will release the Mutex
526                      _cond_mutex->_set_owner(0);
527                    
528 gs.keenan     1.8    pthread_cond_wait(&_condition, &_cond_mutex->_mutex.mut);
529 gs.keenan     1.7  
530                      // The caller holds the Mutex again when pthread_cond_timedwait returns
531 gs.keenan     1.8    _cond_mutex->_set_owner(caller);
532 gs.keenan     1.1  }
533                    
534 david.dillard 1.3  // block until this semaphore is in a signalled state
535 gs.keenan     1.1  
536 gs.keenan     1.7  void Condition::unlocked_timed_wait(
537 gs.keenan     1.8  				     int milliseconds,
538                    				     PEGASUS_THREAD_TYPE caller)
539 gs.keenan     1.7  {
540                      // The caller must own the Mutex in order to wait on the Condition
541                      if (_cond_mutex->get_owner() != caller)
542                      {
543                        throw Permission(_cond_mutex->get_owner());
544                      }
545                    
546 mike          1.10   if (_disallow.get() > 0)
547 gs.keenan     1.7    {
548 gs.keenan     1.8      _cond_mutex->unlock();
549                        throw ListClosed();
550 gs.keenan     1.7    }
551                      struct timeval now;
552                      struct timespec waittime;
553 gs.keenan     1.8    gettimeofday(&now, NULL);
554 gs.keenan     1.7    waittime.tv_sec = now.tv_sec;
555                      waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);	// microseconds
556                      waittime.tv_sec += (waittime.tv_nsec / 1000000);	// roll overflow into
557                      waittime.tv_nsec = (waittime.tv_nsec % 1000000);	// the "seconds" part
558                      waittime.tv_nsec = waittime.tv_nsec * 1000;	// convert to nanoseconds
559                    
560 gs.keenan     1.8    // pthread_cond_timedwait will release the Mutex
561                      _cond_mutex->_set_owner(0);
562 gs.keenan     1.1  
563 gs.keenan     1.8    int retcode = pthread_cond_timedwait(
564                    		       &_condition, &_cond_mutex->_mutex.mut, &waittime);
565 gs.keenan     1.1  
566 gs.keenan     1.8    // The caller holds the Mutex again when pthread_cond_timedwait returns
567                      _cond_mutex->_set_owner(caller);
568 david.dillard 1.3  
569 gs.keenan     1.8    if (retcode == ETIMEDOUT)
570                      {
571                        throw TimeOut(caller);
572                      }
573                      else if (retcode != EINTR)
574                      {
575                        throw WaitFailed(caller);
576                      }
577 gs.keenan     1.1  }
578                    #endif // native conditional semaphore
579                    
580                    //-----------------------------------------------------------------
581                    // END of native conditional semaphore implementation
582                    //-----------------------------------------------------------------
583                    
584                    //-----------------------------------------------------------------
585                    // Native implementation of semaphore object
586                    //-----------------------------------------------------------------
587                    
588                    //
589                    // implementation as used in ACE derived from Mutex + Condition Variable
590                    //
591                    
592 gs.keenan     1.8  Semaphore::Semaphore(Uint32 initial)
593 gs.keenan     1.1  {
594 gs.keenan     1.8    pthread_mutex_init(&_semaphore.mutex, NULL);
595                      pthread_cond_init(&_semaphore.cond, NULL);
596 gs.keenan     1.7    if (initial > SEM_VALUE_MAX)
597                      {
598                        _count = SEM_VALUE_MAX - 1;
599                      }
600                      else
601                      {
602                        _count = initial;
603                      }
604 gs.keenan     1.8    _semaphore.owner = pegasus_thread_self();
605 gs.keenan     1.7    _semaphore.waiters = 0;
606                    }
607                    
608 gs.keenan     1.8  Semaphore::~Semaphore()
609 gs.keenan     1.7  {
610 gs.keenan     1.8    pthread_mutex_lock(&_semaphore.mutex);
611                      while (EBUSY == pthread_cond_destroy(&_semaphore.cond))
612 gs.keenan     1.7    {
613 gs.keenan     1.8      pthread_mutex_unlock(&_semaphore.mutex);
614                        pegasus_yield();
615                        pthread_mutex_lock(&_semaphore.mutex);
616 gs.keenan     1.7    }
617 gs.keenan     1.8    pthread_mutex_unlock(&_semaphore.mutex);
618                      pthread_mutex_destroy(&_semaphore.mutex);
619 gs.keenan     1.1  }
620                    
621                    // block until this semaphore is in a signalled state or
622                    // throw an exception if the wait fails
623                    
624 gs.keenan     1.8  void Semaphore::wait(Boolean ignoreInterrupt)
625 gs.keenan     1.1  {
626 gs.keenan     1.7    // Acquire mutex to enter critical section.
627 gs.keenan     1.1  
628 gs.keenan     1.8    pthread_mutex_lock(&_semaphore.mutex);
629 gs.keenan     1.1  
630 gs.keenan     1.7    // Keep track of the number of waiters so that <sema_post> works correctly.
631 gs.keenan     1.1  
632 gs.keenan     1.7    _semaphore.waiters++;
633 gs.keenan     1.1  
634 gs.keenan     1.7    // Wait until the semaphore count is > 0, then atomically release
635                      // <lock_> and wait for <count_nonzero_> to be signaled.
636 gs.keenan     1.1  
637 gs.keenan     1.7    while (_count == 0)
638                      {
639 gs.keenan     1.8      pthread_cond_wait(&_semaphore.cond, &_semaphore.mutex);
640 gs.keenan     1.7    }
641 gs.keenan     1.1  
642 gs.keenan     1.7    // <_semaphore.mutex> is now held.
643 gs.keenan     1.1  
644 gs.keenan     1.7    // Decrement the waiters count.
645 gs.keenan     1.1  
646 gs.keenan     1.7    _semaphore.waiters--;
647 gs.keenan     1.1  
648 gs.keenan     1.7    // Decrement the semaphore's count.
649 gs.keenan     1.1  
650 gs.keenan     1.7    _count--;
651 gs.keenan     1.1  
652 gs.keenan     1.7    // Release mutex to leave critical section.
653 gs.keenan     1.1  
654 gs.keenan     1.8    pthread_mutex_unlock(&_semaphore.mutex);
655 gs.keenan     1.1  }
656                    
657 gs.keenan     1.8  void Semaphore::try_wait()
658 gs.keenan     1.1  {
659                    // not implemented
660                    
661 gs.keenan     1.8    throw(WaitFailed(_semaphore.owner));
662 gs.keenan     1.1  }
663                    
664 gs.keenan     1.8  void Semaphore::time_wait(Uint32 milliseconds)
665 gs.keenan     1.1  {
666 gs.keenan     1.9    // Acquire mutex to enter critical section.
667                    
668                      pthread_mutex_lock (&_semaphore.mutex);
669                    
670                      // Keep track of the number of waiters so that <sema_post> works correctly.
671                    
672                      _semaphore.waiters++;
673                    
674                      struct timeval now = {0,0};
675                      struct timespec waittime = {0,0};
676                      int retcode = 0;
677                      gettimeofday(&now, NULL);
678                      waittime.tv_sec = now.tv_sec;
679                      waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);  // microseconds
680                      waittime.tv_sec += (waittime.tv_nsec / 1000000);  // roll overflow into
681                      waittime.tv_nsec = (waittime.tv_nsec % 1000000);  // the "seconds" part
682                      waittime.tv_nsec = waittime.tv_nsec * 1000;  // convert to nanoseconds
683                    
684                      // We are in a sense also sending a signal - as in the Semaphore is released
685                      // after the time has elapsed.
686                    
687 gs.keenan     1.9    int old_count =_count;
688                    
689                      retcode = pthread_cond_timedwait(&_semaphore.cond, &_semaphore.mutex, &waittime) ;
690                    
691                      if (_count != old_count)
692                      {
693                        _count=old_count;
694                      }
695                    
696                      // Decrement the waiters count.
697                    
698                      _semaphore.waiters--;
699 gs.keenan     1.1  
700 gs.keenan     1.9    pthread_mutex_unlock (&_semaphore.mutex);
701 gs.keenan     1.1  }
702                    
703 david.dillard 1.3  // increment the count of the semaphore
704 gs.keenan     1.1  
705 gs.keenan     1.8  void Semaphore::signal()
706 gs.keenan     1.1  {
707 gs.keenan     1.8    pthread_mutex_lock(&_semaphore.mutex);
708 gs.keenan     1.1  
709 gs.keenan     1.7    // Always allow one thread to continue if it is waiting.
710 gs.keenan     1.1  
711 gs.keenan     1.7    if (_semaphore.waiters > 0)
712                      {
713 gs.keenan     1.8      pthread_cond_signal(&_semaphore.cond);
714 gs.keenan     1.7    }
715 gs.keenan     1.1  
716 gs.keenan     1.7    // Increment the semaphore's count.
717 gs.keenan     1.1  
718 gs.keenan     1.7    _count++;
719 gs.keenan     1.1  
720 gs.keenan     1.8    pthread_mutex_unlock(&_semaphore.mutex);
721 gs.keenan     1.1  }
722                    
723                    // return the count of the semaphore
724                    
725 gs.keenan     1.8  int Semaphore::count() const
726 gs.keenan     1.1  {
727 gs.keenan     1.7    return _count;
728 gs.keenan     1.1  }
729                    
730                    PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2