(file) Return to Semaphore.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           //%/////////////////////////////////////////////////////////////////////////////
 33           
 34           #include <Pegasus/Common/Time.h>
 35           #include <Pegasus/Common/IPCExceptions.h>
 36           #include "Semaphore.h"
 37           
 38 kumpf 1.7 PEGASUS_NAMESPACE_BEGIN
 39 mike  1.6 
 40           static const Uint32 PEGASUS_SEM_VALUE_MAX = 0x0000ffff;
 41 mike  1.2 
 42           //==============================================================================
 43           //
 44           // PEGASUS_USE_PTHREAD_SEMAPHORE
 45           //
 46           //==============================================================================
 47           
 48           #if defined(PEGASUS_USE_PTHREAD_SEMAPHORE)
 49           
 50           Semaphore::Semaphore(Uint32 initial)
 51           {
 52 mike  1.5     pthread_mutex_init(&_rep.mutex, NULL);
 53               pthread_cond_init(&_rep.cond, NULL);
 54 mike  1.2 
 55               if (initial > PEGASUS_SEM_VALUE_MAX)
 56 mike  1.5         _count = PEGASUS_SEM_VALUE_MAX - 1;
 57 mike  1.2     else
 58 mike  1.5         _count = initial;
 59 mike  1.2 
 60               _rep.owner = Threads::self();
 61               _rep.waiters = 0;
 62           }
 63           
 64           Semaphore::~Semaphore()
 65           {
 66 ouyang.jian 1.9 #if !defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
 67                     && !defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
 68 mike        1.5     pthread_mutex_lock(&_rep.mutex);
 69                     int r = 0;
 70                     while ((r = pthread_cond_destroy(&_rep.cond) == EBUSY) ||
 71                            (r == -1 && errno == EBUSY))
 72                     {
 73                         pthread_mutex_unlock(&_rep.mutex);
 74                         Threads::yield();
 75                         pthread_mutex_lock(&_rep.mutex);
 76                     }
 77                     pthread_mutex_unlock(&_rep.mutex);
 78                     pthread_mutex_destroy(&_rep.mutex);
 79 mike        1.2 #else
 80 mike        1.5     int val;
 81                     val = pthread_mutex_destroy(&_rep.mutex);
 82                 
 83                     if (val != 0)
 84                         pthread_cond_destroy(&_rep.cond);
 85                     else
 86                         val = pthread_cond_destroy(&_rep.cond);
 87                 
 88                     while (EBUSY == val)
 89                     {
 90                         Threads::yield();
 91                         val = pthread_mutex_destroy(&_rep.mutex);
 92                         if (val != 0)
 93                             pthread_cond_destroy(&_rep.cond);
 94                         else
 95                             val = pthread_cond_destroy(&_rep.cond);
 96                     }
 97 mike        1.2 #endif
 98                 }
 99                 
100 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
101                     || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
102 mike        1.2 // cleanup function
103                 static void semaphore_cleanup(void *arg)
104                 {
105 mike        1.5     // cast back to proper type and unlock mutex
106 dave.sudlik 1.8     SemaphoreRep *s = (SemaphoreRep *) arg;
107 mike        1.5     pthread_mutex_unlock(&s->mutex);
108 mike        1.2 }
109                 #endif
110                 
111                 // block until this semaphore is in a signalled state or
112                 // throw an exception if the wait fails
113                 void Semaphore::wait(Boolean ignoreInterrupt)
114                 {
115 mike        1.5     // Acquire mutex to enter critical section.
116                     pthread_mutex_lock(&_rep.mutex);
117 mike        1.2 
118 mike        1.5     // Push cleanup function onto cleanup stack
119                     // The mutex will unlock if the thread is killed early
120 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
121                     || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
122 mike        1.2     Threads::cleanup_push(&semaphore_cleanup, &_rep);
123                 #endif
124                 
125 mike        1.5     // Keep track of the number of waiters so that <sema_post> works correctly.
126                     _rep.waiters++;
127 mike        1.2 
128 mike        1.5     // Wait until the semaphore count is > 0, then atomically release
129                     // <lock_> and wait for <count_nonzero_> to be signaled.
130                     while (_count == 0)
131                         pthread_cond_wait(&_rep.cond, &_rep.mutex);
132 mike        1.2 
133 mike        1.5     // <_rep.mutex> is now held.
134 mike        1.2 
135 mike        1.5     // Decrement the waiters count.
136                     _rep.waiters--;
137 mike        1.2 
138 mike        1.5     // Decrement the semaphore's count.
139                     _count--;
140 mike        1.2 
141                     // Since we push an unlock onto the cleanup stack
142 mike        1.5     // We will pop it off to release the mutex when leaving the critical
143                     // section.
144 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
145                     || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
146 mike        1.5     Threads::cleanup_pop(1);
147 mike        1.2 #endif
148 mike        1.5     // Release mutex to leave critical section.
149                     pthread_mutex_unlock(&_rep.mutex);
150 mike        1.2 }
151                 
152                 void Semaphore::try_wait()
153                 {
154                 // not implemented
155 mike        1.5     throw(WaitFailed(_rep.owner));
156 mike        1.2 }
157                 
158                 void Semaphore::time_wait(Uint32 milliseconds)
159                 {
160 mike        1.5     // Acquire mutex to enter critical section.
161                     pthread_mutex_lock(&_rep.mutex);
162                     Boolean timedOut = false;
163 mike        1.2 
164 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
165                     || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
166 mike        1.5     // Push cleanup function onto cleanup stack
167                     // The mutex will unlock if the thread is killed early
168 dave.sudlik 1.8     Threads::cleanup_push(&semaphore_cleanup, &_rep);
169 mike        1.2 #endif
170                 
171 mike        1.5     // Keep track of the number of waiters so that <sema_post> works correctly.
172                     _rep.waiters++;
173                 
174                     struct timeval now = { 0, 0 };
175                     struct timespec waittime = { 0, 0 };
176                     gettimeofday(&now, NULL);
177                     waittime.tv_sec = now.tv_sec;
178                     waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);     // microseconds
179                     waittime.tv_sec += (waittime.tv_nsec / 1000000);    // roll overflow into
180                     waittime.tv_nsec = (waittime.tv_nsec % 1000000);    // the "seconds" part
181                     waittime.tv_nsec = waittime.tv_nsec * 1000; // convert to nanoseconds
182 mike        1.2 
183 mike        1.5     while ((_count == 0) && !timedOut)
184                     {
185                         int r = pthread_cond_timedwait(&_rep.cond, &_rep.mutex, &waittime);
186                 
187                         if (((r == -1 && errno == ETIMEDOUT) || (r == ETIMEDOUT)) &&
188                             _count == 0)
189                         {
190                             timedOut = true;
191                         }
192                     }
193 mike        1.2 
194 mike        1.5     if (!timedOut)
195                     {
196                         // Decrement the semaphore's count.
197                         _count--;
198                     }
199                 
200                     // Decrement the waiters count.
201                     _rep.waiters--;
202 mike        1.2 
203 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
204                     || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
205 mike        1.5     // Since we push an unlock onto the cleanup stack
206                     // We will pop it off to release the mutex when leaving the critical
207                     // section.
208                     Threads::cleanup_pop(1);
209 mike        1.2 #endif
210                 
211 mike        1.5     // Release mutex to leave critical section.
212                     pthread_mutex_unlock(&_rep.mutex);
213 mike        1.2 
214 mike        1.5     if (timedOut)
215                     {
216                         throw TimeOut(Threads::self());
217                     }
218 mike        1.2 }
219                 
220                 // increment the count of the semaphore
221                 void Semaphore::signal()
222                 {
223 mike        1.5     pthread_mutex_lock(&_rep.mutex);
224 mike        1.2 
225 mike        1.5     // Always allow one thread to continue if it is waiting.
226                     if (_rep.waiters > 0)
227                         pthread_cond_signal(&_rep.cond);
228 mike        1.2 
229 mike        1.5     // Increment the semaphore's count.
230                     _count++;
231 mike        1.2 
232 mike        1.5     pthread_mutex_unlock(&_rep.mutex);
233 mike        1.2 }
234                 
235                 // return the count of the semaphore
236                 int Semaphore::count() const
237                 {
238 mike        1.5     return _count;
239 mike        1.2 }
240                 
241                 #endif /* PEGASUS_USE_PTHREAD_SEMAPHORE */
242                 
243                 //==============================================================================
244                 //
245                 // PEGASUS_USE_POSIX_SEMAPHORE
246                 //
247                 //==============================================================================
248                 
249                 #if defined(PEGASUS_USE_POSIX_SEMAPHORE)
250                 
251                 Semaphore::Semaphore(Uint32 initial)
252                 {
253 mike        1.5     if (initial > PEGASUS_SEM_VALUE_MAX)
254                         initial = PEGASUS_SEM_VALUE_MAX - 1;
255                     sem_init(&_rep.sem, 0, initial);
256                     _rep.owner = Threads::self();
257 mike        1.2 }
258                 
259                 Semaphore::~Semaphore()
260                 {
261 mike        1.5     while (EBUSY == sem_destroy(&_rep.sem))
262                     {
263                         Threads::yield();
264                     }
265 mike        1.2 }
266                 
267                 // block until this semaphore is in a signalled state, or
268                 // throw an exception if the wait fails
269                 void Semaphore::wait(Boolean ignoreInterrupt)
270                 {
271                     do
272                     {
273                         int rc = sem_wait(&_rep.sem);
274                         if (rc == 0)
275                             break;
276                 
277                         int e = errno;
278                         if (e == EINTR)
279                         {
280                             if (ignoreInterrupt == false)
281                                 throw(WaitInterrupted(_rep.owner));
282                         }
283 mike        1.5         else
284                             throw(WaitFailed(_rep.owner));
285 mike        1.2 
286                         // keep going if above conditions fail
287 mike        1.5     }
288                     while (true);
289 mike        1.2 
290                 }
291                 
292                 // wait succeeds immediately if semaphore has a non-zero count,
293                 // return immediately and throw and exception if the
294                 // count is zero.
295                 void Semaphore::try_wait()
296                 {
297 mike        1.5     if (sem_trywait(&_rep.sem))
298                         throw(WaitFailed(_rep.owner));
299 mike        1.2 }
300                 
301                 
302                 
303                 
304                 // Note: I could not get sem_timed_wait to work reliably.
305                 // See my comments above on mut timed_wait.
306                 // I reimplemented using try_wait, which works reliably.
307                 // mdd Sun Aug  5 13:25:31 2001
308                 
309                 // wait for milliseconds and throw an exception
310                 // if wait times out without gaining the semaphore
311                 void Semaphore::time_wait(Uint32 milliseconds)
312                 {
313 mike        1.5     int retcode, i = 0;
314                 
315                     struct timeval now, finish, remaining;
316                     Uint32 usec;
317 mike        1.2 
318 mike        1.5     gettimeofday(&finish, NULL);
319                     finish.tv_sec += (milliseconds / 1000);
320                     milliseconds %= 1000;
321                     usec = finish.tv_usec + (milliseconds * 1000);
322                     finish.tv_sec += (usec / 1000000);
323                     finish.tv_usec = usec % 1000000;
324 mike        1.2 
325 mike        1.5     while (1)
326                     {
327                         do
328                         {
329                             retcode = sem_trywait(&_rep.sem);
330                         }
331                         while (retcode == -1 && errno == EINTR);
332                 
333                         if (retcode == 0)
334                             return;
335                 
336                         if (retcode == -1 && errno != EAGAIN)
337                             throw IPCException(Threads::self());
338                         gettimeofday(&now, NULL);
339                         if (Time::subtract(&remaining, &finish, &now))
340                             throw TimeOut(Threads::self());
341                         Threads::yield();
342                     }
343 mike        1.2 }
344                 
345                 // increment the count of the semaphore
346                 void Semaphore::signal()
347                 {
348 mike        1.5     sem_post(&_rep.sem);
349 mike        1.2 }
350                 
351                 // return the count of the semaphore
352                 int Semaphore::count() const
353                 {
354 mike        1.5     sem_getvalue(&_rep.sem, &_count);
355                     return _count;
356 mike        1.2 }
357                 
358                 #endif /* PEGASUS_USE_POSIX_SEMAPHORE */
359                 
360                 //==============================================================================
361                 //
362                 // PEGASUS_USE_WINDOWS_SEMAPHORE
363                 //
364                 //==============================================================================
365                 
366                 #if defined(PEGASUS_USE_WINDOWS_SEMAPHORE)
367                 
368 mike        1.5 Semaphore::Semaphore(Uint32 initial)
369 mike        1.2 {
370 mike        1.5     if (initial > PEGASUS_SEM_VALUE_MAX)
371                         initial = PEGASUS_SEM_VALUE_MAX - 1;
372                     _count = initial;
373                     _rep.owner = Threads::self();
374                     _rep.sem = CreateSemaphore(NULL, initial, PEGASUS_SEM_VALUE_MAX, NULL);
375 mike        1.2 }
376                 
377                 Semaphore::~Semaphore()
378                 {
379 mike        1.5     CloseHandle(_rep.sem);
380 mike        1.2 }
381                 
382                 // block until this semaphore is in a signalled state
383                 // note that windows does not support interrupt
384                 void Semaphore::wait(Boolean ignoreInterrupt)
385                 {
386                     DWORD errorcode = WaitForSingleObject(_rep.sem, INFINITE);
387 mike        1.5     if (errorcode != WAIT_FAILED)
388 mike        1.2         _count--;
389                     else
390                         throw(WaitFailed(Threads::self()));
391                 }
392                 
393                 // wait succeeds immediately if semaphore has a non-zero count,
394                 // return immediately and throw and exception if the
395                 // count is zero.
396                 void Semaphore::try_wait()
397                 {
398                     DWORD errorcode = WaitForSingleObject(_rep.sem, 0);
399 mike        1.5     if (errorcode == WAIT_TIMEOUT || errorcode == WAIT_FAILED)
400 mike        1.2         throw(WaitFailed(Threads::self()));
401                     _count--;
402                 }
403                 
404                 
405                 // wait for milliseconds and throw an exception
406                 // if wait times out without gaining the semaphore
407                 void Semaphore::time_wait(Uint32 milliseconds)
408                 {
409                     DWORD errorcode = WaitForSingleObject(_rep.sem, milliseconds);
410                     if (errorcode == WAIT_TIMEOUT || errorcode == WAIT_FAILED)
411                         throw(TimeOut(Threads::self()));
412                     _count--;
413                 }
414                 
415                 // increment the count of the semaphore
416                 void Semaphore::signal()
417                 {
418                     _count++;
419                     ReleaseSemaphore(_rep.sem, 1, NULL);
420                 }
421 mike        1.2 
422                 // return the count of the semaphore
423                 int Semaphore::count() const
424                 {
425 kumpf       1.7     return _count;
426 mike        1.2 }
427                 
428                 #endif /* PEGASUS_USE_WINDOWS_SEMAPHORE */
429                 
430                 PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2