(file) Return to Semaphore.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           // Author: Mike Day (mdday@us.ibm.com)
 33           //
 34           //%/////////////////////////////////////////////////////////////////////////////
 35           
 36           #include <Pegasus/Common/Time.h>
 37           #include <Pegasus/Common/IPCExceptions.h>
 38           #include "Semaphore.h"
 39           
 40           PEGASUS_NAMESPACE_BEGIN
 41           
 42           static const Uint32 PEGASUS_SEM_VALUE_MAX = 0x0000ffff;
 43 mike  1.2 
 44           //==============================================================================
 45           //
 46           // PEGASUS_USE_PTHREAD_SEMAPHORE
 47           //
 48           //==============================================================================
 49           
 50           #if defined(PEGASUS_USE_PTHREAD_SEMAPHORE)
 51           
 52           Semaphore::Semaphore(Uint32 initial)
 53           {
 54               pthread_mutex_init (&_rep.mutex, NULL);
 55               pthread_cond_init (&_rep.cond, NULL);
 56           
 57               if (initial > PEGASUS_SEM_VALUE_MAX)
 58                    _count = PEGASUS_SEM_VALUE_MAX - 1;
 59               else
 60                    _count = initial;
 61           
 62               _rep.owner = Threads::self();
 63               _rep.waiters = 0;
 64 mike  1.2 }
 65           
 66           Semaphore::~Semaphore()
 67           {
 68           #ifndef PEGASUS_PLATFORM_AIX_RS_IBMCXX
 69              pthread_mutex_lock(&_rep.mutex);
 70              while( EBUSY == pthread_cond_destroy(&_rep.cond))
 71              {
 72                 pthread_mutex_unlock(&_rep.mutex);
 73                 Threads::yield();
 74                 pthread_mutex_lock(&_rep.mutex);
 75              }
 76              pthread_mutex_unlock(&_rep.mutex);
 77              pthread_mutex_destroy(&_rep.mutex);
 78           #else
 79              int val;
 80              val = pthread_mutex_destroy(&_rep.mutex);
 81              if (val != 0)
 82                 pthread_cond_destroy(&_rep.cond);
 83              else
 84                 val = pthread_cond_destroy(&_rep.cond);
 85 mike  1.2 
 86              while( EBUSY == val )
 87              {
 88                 Threads::yield();
 89                 val = pthread_mutex_destroy(&_rep.mutex);
 90                 if (val != 0)
 91                    pthread_cond_destroy(&_rep.cond);
 92                 else
 93                    val = pthread_cond_destroy(&_rep.cond);
 94              }
 95           #endif
 96           }
 97           
 98           #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
 99           // cleanup function
100           static void semaphore_cleanup(void *arg)
101           {
102              //cast back to proper type and unlock mutex
103              PEGASUS_SEM_HANDLE *s = (PEGASUS_SEM_HANDLE *)arg;
104              pthread_mutex_unlock(&s->mutex);
105           }
106 mike  1.2 #endif
107           
108           // block until this semaphore is in a signalled state or
109           // throw an exception if the wait fails
110           void Semaphore::wait(Boolean ignoreInterrupt)
111           {
112              // Acquire mutex to enter critical section.
113              pthread_mutex_lock (&_rep.mutex);
114           
115              // Push cleanup function onto cleanup stack
116              // The mutex will unlock if the thread is killed early
117           #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
118               Threads::cleanup_push(&semaphore_cleanup, &_rep);
119           #endif
120           
121              // Keep track of the number of waiters so that <sema_post> works correctly.
122              _rep.waiters++;
123           
124              // Wait until the semaphore count is > 0, then atomically release
125              // <lock_> and wait for <count_nonzero_> to be signaled.
126              while (_count == 0)
127 mike  1.2       pthread_cond_wait (&_rep.cond,&_rep.mutex);
128           
129              // <_rep.mutex> is now held.
130           
131              // Decrement the waiters count.
132              _rep.waiters--;
133           
134              // Decrement the semaphore's count.
135              _count--;
136           
137               // Since we push an unlock onto the cleanup stack
138              // We will pop it off to release the mutex when leaving the critical section.
139           #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
140              Threads::cleanup_pop(1);
141           #endif
142              // Release mutex to leave critical section.
143              pthread_mutex_unlock (&_rep.mutex);
144           }
145           
146           void Semaphore::try_wait()
147           {
148 mike  1.2 // not implemented
149                 throw(WaitFailed(_rep.owner));
150           }
151           
152           void Semaphore::time_wait(Uint32 milliseconds)
153           {
154              // Acquire mutex to enter critical section.
155              pthread_mutex_lock (&_rep.mutex);
156              Boolean timedOut = false;
157           
158           #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
159              // Push cleanup function onto cleanup stack
160              // The mutex will unlock if the thread is killed early
161              Thread::cleanup_push(&semaphore_cleanup, &_rep);
162           #endif
163           
164              // Keep track of the number of waiters so that <sema_post> works correctly.
165              _rep.waiters++;
166           
167              struct timeval now = {0,0};
168              struct timespec waittime = {0,0};
169 mike  1.2    gettimeofday(&now, NULL);
170              waittime.tv_sec = now.tv_sec;
171              waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);  // microseconds
172              waittime.tv_sec += (waittime.tv_nsec / 1000000);  // roll overflow into
173              waittime.tv_nsec = (waittime.tv_nsec % 1000000);  // the "seconds" part
174              waittime.tv_nsec = waittime.tv_nsec * 1000;  // convert to nanoseconds
175           
176              while ((_count == 0) && !timedOut)
177              {
178 mike  1.3       int r = pthread_cond_timedwait(
179 mike  1.2          &_rep.cond, &_rep.mutex, &waittime);
180           
181 mike  1.3       if (((r == -1 && errno == ETIMEDOUT) || (r == ETIMEDOUT)) && _count == 0)
182 mike  1.2       {
183                    timedOut = true;
184                 }
185              }
186           
187              if (!timedOut)
188              {
189                 // Decrement the semaphore's count.
190                 _count--;
191              }
192           
193              // Decrement the waiters count.
194              _rep.waiters--;
195           
196           #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX)
197              // Since we push an unlock onto the cleanup stack
198              // We will pop it off to release the mutex when leaving the critical section.
199              Threads::cleanup_pop(1);
200           #endif
201           
202              // Release mutex to leave critical section.
203 mike  1.2    pthread_mutex_unlock (&_rep.mutex);
204           
205              if (timedOut)
206              {
207                 throw TimeOut(Threads::self());
208              }
209           }
210           
211           // increment the count of the semaphore
212           void Semaphore::signal()
213           {
214              pthread_mutex_lock (&_rep.mutex);
215           
216              // Always allow one thread to continue if it is waiting.
217              if (_rep.waiters > 0)
218                 pthread_cond_signal (&_rep.cond);
219           
220              // Increment the semaphore's count.
221              _count++;
222           
223              pthread_mutex_unlock (&_rep.mutex);
224 mike  1.2 }
225           
226           // return the count of the semaphore
227           int Semaphore::count() const
228           {
229              return _count;
230           }
231           
232           #endif /* PEGASUS_USE_PTHREAD_SEMAPHORE */
233           
234           //==============================================================================
235           //
236           // PEGASUS_USE_POSIX_SEMAPHORE
237           //
238           //==============================================================================
239           
240           #if defined(PEGASUS_USE_POSIX_SEMAPHORE)
241           
242           Semaphore::Semaphore(Uint32 initial)
243           {
244              if(initial > PEGASUS_SEM_VALUE_MAX)
245 mike  1.2       initial = PEGASUS_SEM_VALUE_MAX - 1;
246              sem_init(&_rep.sem,0,initial);
247              _rep.owner = Threads::self();
248           }
249           
250           Semaphore::~Semaphore()
251           {
252              while (EBUSY == sem_destroy(&_rep.sem))
253              {
254                 Threads::yield();
255              }
256           }
257           
258           // block until this semaphore is in a signalled state, or
259           // throw an exception if the wait fails
260           void Semaphore::wait(Boolean ignoreInterrupt)
261           {
262               do
263               {
264                   int rc = sem_wait(&_rep.sem);
265                   if (rc == 0)
266 mike  1.2             break;
267           
268                   int e = errno;
269                   if (e == EINTR)
270                   {
271                       if (ignoreInterrupt == false)
272                           throw(WaitInterrupted(_rep.owner));
273                   }
274                   else throw(WaitFailed(_rep.owner));
275           
276                   // keep going if above conditions fail
277               } while (true);
278           
279           }
280           
281           // wait succeeds immediately if semaphore has a non-zero count,
282           // return immediately and throw and exception if the
283           // count is zero.
284           void Semaphore::try_wait()
285           {
286              if (sem_trywait(&_rep.sem))
287 mike  1.2       throw(WaitFailed(_rep.owner));
288           }
289           
290           
291           
292           
293           // Note: I could not get sem_timed_wait to work reliably.
294           // See my comments above on mut timed_wait.
295           // I reimplemented using try_wait, which works reliably.
296           // mdd Sun Aug  5 13:25:31 2001
297           
298           // wait for milliseconds and throw an exception
299           // if wait times out without gaining the semaphore
300           void Semaphore::time_wait(Uint32 milliseconds)
301           {
302              int retcode, i = 0;
303           
304              struct timeval now, finish, remaining;
305              Uint32 usec;
306           
307              gettimeofday(&finish, NULL);
308 mike  1.2    finish.tv_sec += (milliseconds / 1000 );
309              milliseconds %= 1000;
310              usec = finish.tv_usec + ( milliseconds * 1000 );
311              finish.tv_sec += (usec / 1000000);
312              finish.tv_usec = usec % 1000000;
313           
314              while( 1 )
315              {
316                 do
317                 {
318                    retcode = sem_trywait(&_rep.sem);
319                 } while (retcode == -1 && errno == EINTR);
320           
321                 if ( retcode == 0 )
322                    return ;
323           
324                 if( retcode == -1 && errno != EAGAIN )
325                    throw IPCException(Threads::self());
326                 gettimeofday(&now, NULL);
327                 if (  Time::subtract( &remaining, &finish, &now ) )
328                    throw TimeOut(Threads::self());
329 mike  1.2       Threads::yield();
330              }
331           }
332           
333           // increment the count of the semaphore
334           void Semaphore::signal()
335           {
336              sem_post(&_rep.sem);
337           }
338           
339           // return the count of the semaphore
340           int Semaphore::count() const
341           {
342              sem_getvalue(&_rep.sem,&_count);
343              return _count;
344           }
345           
346           #endif /* PEGASUS_USE_POSIX_SEMAPHORE */
347           
348           //==============================================================================
349           //
350 mike  1.2 // PEGASUS_USE_WINDOWS_SEMAPHORE
351           //
352           //==============================================================================
353           
354           #if defined(PEGASUS_USE_WINDOWS_SEMAPHORE)
355           
356           Semaphore::Semaphore(Uint32 initial) 
357           {
358              if(initial > PEGASUS_SEM_VALUE_MAX)
359                 initial = PEGASUS_SEM_VALUE_MAX - 1;
360              _count = initial;
361              _rep.owner = Threads::self();
362              _rep.sem = CreateSemaphore(NULL, initial, PEGASUS_SEM_VALUE_MAX, NULL);
363           }
364           
365           Semaphore::~Semaphore()
366           {
367              CloseHandle(_rep.sem);
368           }
369           
370           // block until this semaphore is in a signalled state
371 mike  1.2 // note that windows does not support interrupt
372           void Semaphore::wait(Boolean ignoreInterrupt)
373           {
374               DWORD errorcode = WaitForSingleObject(_rep.sem, INFINITE);
375               if(errorcode != WAIT_FAILED)
376                   _count--;
377               else
378                   throw(WaitFailed(Threads::self()));
379           }
380           
381           // wait succeeds immediately if semaphore has a non-zero count,
382           // return immediately and throw and exception if the
383           // count is zero.
384           void Semaphore::try_wait()
385           {
386               DWORD errorcode = WaitForSingleObject(_rep.sem, 0);
387               if(errorcode == WAIT_TIMEOUT || errorcode == WAIT_FAILED)
388                   throw(WaitFailed(Threads::self()));
389               _count--;
390           }
391           
392 mike  1.2 
393           // wait for milliseconds and throw an exception
394           // if wait times out without gaining the semaphore
395           void Semaphore::time_wait(Uint32 milliseconds)
396           {
397               DWORD errorcode = WaitForSingleObject(_rep.sem, milliseconds);
398               if (errorcode == WAIT_TIMEOUT || errorcode == WAIT_FAILED)
399                   throw(TimeOut(Threads::self()));
400               _count--;
401           }
402           
403           // increment the count of the semaphore
404           void Semaphore::signal()
405           {
406               _count++;
407               ReleaseSemaphore(_rep.sem, 1, NULL);
408           }
409           
410           // return the count of the semaphore
411           int Semaphore::count() const
412           {
413 mike  1.2     return(_count);
414           }
415           
416           #endif /* PEGASUS_USE_WINDOWS_SEMAPHORE */
417           
418           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2