(file) Return to Semaphore.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 martin 1.17 //%LICENSE////////////////////////////////////////////////////////////////
  2 martin 1.18 //
  3 martin 1.17 // Licensed to The Open Group (TOG) under one or more contributor license
  4             // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5             // this work for additional information regarding copyright ownership.
  6             // Each contributor licenses this file to you under the OpenPegasus Open
  7             // Source License; you may not use this file except in compliance with the
  8             // License.
  9 martin 1.18 //
 10 martin 1.17 // Permission is hereby granted, free of charge, to any person obtaining a
 11             // copy of this software and associated documentation files (the "Software"),
 12             // to deal in the Software without restriction, including without limitation
 13             // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14             // and/or sell copies of the Software, and to permit persons to whom the
 15             // Software is furnished to do so, subject to the following conditions:
 16 martin 1.18 //
 17 martin 1.17 // The above copyright notice and this permission notice shall be included
 18             // in all copies or substantial portions of the Software.
 19 martin 1.18 //
 20 martin 1.17 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21 martin 1.18 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 martin 1.17 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23             // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24             // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25             // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26             // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27 martin 1.18 //
 28 martin 1.17 //////////////////////////////////////////////////////////////////////////
 29 mike   1.2  //
 30             //%/////////////////////////////////////////////////////////////////////////////
 31             
 32             #include <Pegasus/Common/Time.h>
 33 kumpf  1.16 #include <Pegasus/Common/Exception.h>
 34             #include <Pegasus/Common/System.h>
 35 mike   1.2  #include "Semaphore.h"
 36             
 37 kumpf  1.7  PEGASUS_NAMESPACE_BEGIN
 38 mike   1.6  
 39             static const Uint32 PEGASUS_SEM_VALUE_MAX = 0x0000ffff;
 40 mike   1.2  
 41             //==============================================================================
 42             //
 43             // PEGASUS_USE_PTHREAD_SEMAPHORE
 44             //
 45             //==============================================================================
 46             
 47             #if defined(PEGASUS_USE_PTHREAD_SEMAPHORE)
 48             
 49             Semaphore::Semaphore(Uint32 initial)
 50             {
 51 mike   1.5      pthread_mutex_init(&_rep.mutex, NULL);
 52                 pthread_cond_init(&_rep.cond, NULL);
 53 mike   1.2  
 54                 if (initial > PEGASUS_SEM_VALUE_MAX)
 55 dmitry.mikulin 1.11     {
 56 kumpf          1.16         _rep.count = PEGASUS_SEM_VALUE_MAX - 1;
 57 dmitry.mikulin 1.11     }
 58 mike           1.2      else
 59 dmitry.mikulin 1.11     {
 60 kumpf          1.16         _rep.count = initial;
 61 dmitry.mikulin 1.11     }
 62 mike           1.2  
 63                         _rep.owner = Threads::self();
 64                         _rep.waiters = 0;
 65                     }
 66                     
 67                     Semaphore::~Semaphore()
 68                     {
 69 ouyang.jian    1.9  #if !defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
 70                         && !defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
 71 mike           1.5      pthread_mutex_lock(&_rep.mutex);
 72                         int r = 0;
 73                         while ((r = pthread_cond_destroy(&_rep.cond) == EBUSY) ||
 74                                (r == -1 && errno == EBUSY))
 75                         {
 76                             pthread_mutex_unlock(&_rep.mutex);
 77                             Threads::yield();
 78                             pthread_mutex_lock(&_rep.mutex);
 79                         }
 80                         pthread_mutex_unlock(&_rep.mutex);
 81                         pthread_mutex_destroy(&_rep.mutex);
 82 mike           1.2  #else
 83 mike           1.5      int val;
 84                         val = pthread_mutex_destroy(&_rep.mutex);
 85                     
 86                         if (val != 0)
 87 dmitry.mikulin 1.11     {
 88 mike           1.5          pthread_cond_destroy(&_rep.cond);
 89 dmitry.mikulin 1.11     }
 90 mike           1.5      else
 91 dmitry.mikulin 1.11     {
 92 mike           1.5          val = pthread_cond_destroy(&_rep.cond);
 93 dmitry.mikulin 1.11     }
 94 mike           1.5  
 95                         while (EBUSY == val)
 96                         {
 97                             Threads::yield();
 98                             val = pthread_mutex_destroy(&_rep.mutex);
 99                             if (val != 0)
100 dmitry.mikulin 1.11         {
101 mike           1.5              pthread_cond_destroy(&_rep.cond);
102 dmitry.mikulin 1.11         }
103 mike           1.5          else
104 dmitry.mikulin 1.11         {
105 mike           1.5              val = pthread_cond_destroy(&_rep.cond);
106 dmitry.mikulin 1.11         }
107 mike           1.5      }
108 mike           1.2  #endif
109                     }
110                     
111 ouyang.jian    1.9  #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
112                         || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
113 mike           1.2  // cleanup function
114                     static void semaphore_cleanup(void *arg)
115                     {
116 mike           1.5      // cast back to proper type and unlock mutex
117 dave.sudlik    1.8      SemaphoreRep *s = (SemaphoreRep *) arg;
118 mike           1.5      pthread_mutex_unlock(&s->mutex);
119 mike           1.2  }
120                     #endif
121                     
122                     // block until this semaphore is in a signalled state or
123                     // throw an exception if the wait fails
124 kumpf          1.13 void Semaphore::wait()
125 mike           1.2  {
126 mike           1.5      // Acquire mutex to enter critical section.
127                         pthread_mutex_lock(&_rep.mutex);
128 mike           1.2  
129 mike           1.5      // Push cleanup function onto cleanup stack
130                         // The mutex will unlock if the thread is killed early
131 ouyang.jian    1.9  #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
132                         || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
133 mike           1.2      Threads::cleanup_push(&semaphore_cleanup, &_rep);
134                     #endif
135                     
136 mike           1.5      // Keep track of the number of waiters so that <sema_post> works correctly.
137                         _rep.waiters++;
138 mike           1.2  
139 mike           1.5      // Wait until the semaphore count is > 0, then atomically release
140                         // <lock_> and wait for <count_nonzero_> to be signaled.
141 kumpf          1.16     while (_rep.count == 0)
142 dmitry.mikulin 1.11     {
143 mike           1.5          pthread_cond_wait(&_rep.cond, &_rep.mutex);
144 dmitry.mikulin 1.11     }
145 mike           1.2  
146 mike           1.5      // <_rep.mutex> is now held.
147 mike           1.2  
148 mike           1.5      // Decrement the waiters count.
149                         _rep.waiters--;
150 mike           1.2  
151 mike           1.5      // Decrement the semaphore's count.
152 kumpf          1.16     _rep.count--;
153 mike           1.2  
154                         // Since we push an unlock onto the cleanup stack
155 mike           1.5      // We will pop it off to release the mutex when leaving the critical
156                         // section.
157 ouyang.jian    1.9  #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
158                         || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
159 mike           1.5      Threads::cleanup_pop(1);
160 mike           1.2  #endif
161 mike           1.5      // Release mutex to leave critical section.
162                         pthread_mutex_unlock(&_rep.mutex);
163 mike           1.2  }
164                     
165 kumpf          1.14 Boolean Semaphore::time_wait(Uint32 milliseconds)
166 mike           1.2  {
167 mike           1.5      // Acquire mutex to enter critical section.
168                         pthread_mutex_lock(&_rep.mutex);
169                         Boolean timedOut = false;
170 mike           1.2  
171 ouyang.jian    1.9  #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
172                         || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
173 mike           1.5      // Push cleanup function onto cleanup stack
174                         // The mutex will unlock if the thread is killed early
175 dave.sudlik    1.8      Threads::cleanup_push(&semaphore_cleanup, &_rep);
176 mike           1.2  #endif
177                     
178 mike           1.5      // Keep track of the number of waiters so that <sema_post> works correctly.
179                         _rep.waiters++;
180                     
181                         struct timeval now = { 0, 0 };
182                         struct timespec waittime = { 0, 0 };
183                         gettimeofday(&now, NULL);
184 venkat.puvvada 1.20 
185                         waittime.tv_sec = now.tv_sec + (milliseconds / 1000);
186                         milliseconds = milliseconds % 1000;
187 mike           1.5      waittime.tv_nsec = now.tv_usec + (milliseconds * 1000);     // microseconds
188                         waittime.tv_sec += (waittime.tv_nsec / 1000000);    // roll overflow into
189                         waittime.tv_nsec = (waittime.tv_nsec % 1000000);    // the "seconds" part
190                         waittime.tv_nsec = waittime.tv_nsec * 1000; // convert to nanoseconds
191 mike           1.2  
192 kumpf          1.16     while ((_rep.count == 0) && !timedOut)
193 mike           1.5      {
194                             int r = pthread_cond_timedwait(&_rep.cond, &_rep.mutex, &waittime);
195                     
196 marek          1.21 #ifdef PEGASUS_OS_ZOS
197                             if (((r==-1 && errno==EAGAIN) || (r==ETIMEDOUT)) && _rep.count==0)
198                     #else
199                             if (((r==-1 && errno==ETIMEDOUT) || (r==ETIMEDOUT)) && _rep.count==0)
200                     #endif
201 mike           1.5          {
202                                 timedOut = true;
203                             }
204                         }
205 mike           1.2  
206 mike           1.5      if (!timedOut)
207                         {
208                             // Decrement the semaphore's count.
209 kumpf          1.16         _rep.count--;
210 mike           1.5      }
211                     
212                         // Decrement the waiters count.
213                         _rep.waiters--;
214 mike           1.2  
215 ouyang.jian    1.9  #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
216                         || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
217 mike           1.5      // Since we push an unlock onto the cleanup stack
218                         // We will pop it off to release the mutex when leaving the critical
219                         // section.
220                         Threads::cleanup_pop(1);
221 mike           1.2  #endif
222                     
223 mike           1.5      // Release mutex to leave critical section.
224                         pthread_mutex_unlock(&_rep.mutex);
225 mike           1.2  
226 kumpf          1.14     return !timedOut;
227 mike           1.2  }
228                     
229                     // increment the count of the semaphore
230                     void Semaphore::signal()
231                     {
232 mike           1.5      pthread_mutex_lock(&_rep.mutex);
233 mike           1.2  
234 mike           1.5      // Always allow one thread to continue if it is waiting.
235                         if (_rep.waiters > 0)
236 dmitry.mikulin 1.11     {
237 mike           1.5          pthread_cond_signal(&_rep.cond);
238 dmitry.mikulin 1.11     }
239 mike           1.2  
240 mike           1.5      // Increment the semaphore's count.
241 kumpf          1.16     _rep.count++;
242 mike           1.2  
243 mike           1.5      pthread_mutex_unlock(&_rep.mutex);
244 mike           1.2  }
245                     
246                     #endif /* PEGASUS_USE_PTHREAD_SEMAPHORE */
247                     
248                     //==============================================================================
249                     //
250                     // PEGASUS_USE_POSIX_SEMAPHORE
251                     //
252                     //==============================================================================
253                     
254                     #if defined(PEGASUS_USE_POSIX_SEMAPHORE)
255                     
256                     Semaphore::Semaphore(Uint32 initial)
257                     {
258 mike           1.5      if (initial > PEGASUS_SEM_VALUE_MAX)
259 dmitry.mikulin 1.11     {
260 mike           1.5          initial = PEGASUS_SEM_VALUE_MAX - 1;
261 dmitry.mikulin 1.11     }
262                     
263 mike           1.5      _rep.owner = Threads::self();
264 dmitry.mikulin 1.11     if (sem_init(&_rep.sem, 0, initial) == -1)
265                         {
266 kumpf          1.16         throw Exception(MessageLoaderParms(
267                                 "Common.InternalException.SEMAPHORE_INIT_FAILED",
268                                 "Semaphore initialization failed: $0",
269                                 PEGASUS_SYSTEM_ERRORMSG_NLS));
270 dmitry.mikulin 1.11     }
271 mike           1.2  }
272                     
273                     Semaphore::~Semaphore()
274                     {
275 dmitry.mikulin 1.11     while (sem_destroy(&_rep.sem) == -1 && errno == EBUSY)
276 mike           1.5      {
277                             Threads::yield();
278                         }
279 mike           1.2  }
280                     
281                     // block until this semaphore is in a signalled state, or
282                     // throw an exception if the wait fails
283 kumpf          1.13 void Semaphore::wait()
284 mike           1.2  {
285                         do
286                         {
287                             int rc = sem_wait(&_rep.sem);
288                             if (rc == 0)
289                                 break;
290                     
291 kumpf          1.13         if (errno != EINTR)
292 mike           1.2          {
293 kumpf          1.16             throw Exception(MessageLoaderParms(
294                                     "Common.InternalException.SEMAPHORE_WAIT_FAILED",
295                                     "Semaphore wait failed: $0",
296                                     PEGASUS_SYSTEM_ERRORMSG_NLS));
297 dmitry.mikulin 1.11         }
298 mike           1.2  
299                             // keep going if above conditions fail
300 mike           1.5      }
301                         while (true);
302 mike           1.2  
303                     }
304                     
305                     // wait for milliseconds and throw an exception
306                     // if wait times out without gaining the semaphore
307 kumpf          1.14 Boolean Semaphore::time_wait(Uint32 milliseconds)
308 mike           1.2  {
309 kumpf          1.12     int retcode;
310 mike           1.5  
311                         struct timeval now, finish, remaining;
312                         Uint32 usec;
313 mike           1.2  
314 mike           1.5      gettimeofday(&finish, NULL);
315                         finish.tv_sec += (milliseconds / 1000);
316 marek          1.22     usec = finish.tv_usec + ((milliseconds % 1000) * 1000);
317 mike           1.5      finish.tv_sec += (usec / 1000000);
318                         finish.tv_usec = usec % 1000000;
319 mike           1.2  
320 mike           1.5      while (1)
321                         {
322                             do
323                             {
324                                 retcode = sem_trywait(&_rep.sem);
325                             }
326                             while (retcode == -1 && errno == EINTR);
327                     
328                             if (retcode == 0)
329 dmitry.mikulin 1.11         {
330 kumpf          1.14             break;
331 dmitry.mikulin 1.11         }
332 mike           1.5  
333                             if (retcode == -1 && errno != EAGAIN)
334 dmitry.mikulin 1.11         {
335 kumpf          1.16             throw Exception(MessageLoaderParms(
336                                     "Common.InternalException.SEMAPHORE_WAIT_FAILED",
337                                     "Semaphore wait failed: $0",
338                                     PEGASUS_SYSTEM_ERRORMSG_NLS));
339 dmitry.mikulin 1.11         }
340                     
341 mike           1.5          gettimeofday(&now, NULL);
342                             if (Time::subtract(&remaining, &finish, &now))
343 dmitry.mikulin 1.11         {
344 kumpf          1.14             return false;
345 dmitry.mikulin 1.11         }
346 marek          1.22         // yield just marks the thread as eligible to be not scheduled by 
347                             // hypervisor, sleep forces thread to actually take a break
348                             // which what is called for here to avoid CPU spikes from close loop
349                             Threads::sleep(milliseconds/100+1);
350 mike           1.5      }
351 kumpf          1.14 
352                         return true;
353 mike           1.2  }
354                     
355                     // increment the count of the semaphore
356                     void Semaphore::signal()
357                     {
358 dmitry.mikulin 1.11     if (sem_post(&_rep.sem) == -1)
359                         {
360 kumpf          1.16         throw Exception(MessageLoaderParms(
361                                 "Common.InternalException.SEMAPHORE_SIGNAL_FAILED",
362                                 "Failed to signal semaphore: $0",
363                                 PEGASUS_SYSTEM_ERRORMSG_NLS));
364 dmitry.mikulin 1.11     }
365 mike           1.2  }
366                     
367                     #endif /* PEGASUS_USE_POSIX_SEMAPHORE */
368                     
369                     //==============================================================================
370                     //
371                     // PEGASUS_USE_WINDOWS_SEMAPHORE
372                     //
373                     //==============================================================================
374                     
375                     #if defined(PEGASUS_USE_WINDOWS_SEMAPHORE)
376                     
377 mike           1.5  Semaphore::Semaphore(Uint32 initial)
378 mike           1.2  {
379 mike           1.5      if (initial > PEGASUS_SEM_VALUE_MAX)
380 dmitry.mikulin 1.11     {
381 mike           1.5          initial = PEGASUS_SEM_VALUE_MAX - 1;
382 dmitry.mikulin 1.11     }
383 mike           1.5      _rep.owner = Threads::self();
384                         _rep.sem = CreateSemaphore(NULL, initial, PEGASUS_SEM_VALUE_MAX, NULL);
385 kavita.gupta   1.19     if (_rep.sem == NULL)
386                         {
387                             throw Exception(MessageLoaderParms(
388                                 "Common.InternalException.SEMAPHORE_INIT_FAILED",
389                                 "Semaphore initialization failed: $0",
390                                 PEGASUS_SYSTEM_ERRORMSG_NLS));
391                         }
392 mike           1.2  }
393                     
394                     Semaphore::~Semaphore()
395                     {
396 mike           1.5      CloseHandle(_rep.sem);
397 mike           1.2  }
398                     
399                     // block until this semaphore is in a signalled state
400 kumpf          1.13 void Semaphore::wait()
401 mike           1.2  {
402                         DWORD errorcode = WaitForSingleObject(_rep.sem, INFINITE);
403 kumpf          1.16     if (errorcode == WAIT_FAILED)
404 dmitry.mikulin 1.11     {
405 kumpf          1.16         throw Exception(MessageLoaderParms(
406                                 "Common.InternalException.SEMAPHORE_WAIT_FAILED",
407                                 "Semaphore wait failed: $0",
408                                 PEGASUS_SYSTEM_ERRORMSG_NLS));
409 dmitry.mikulin 1.11     }
410 mike           1.2  }
411                     
412 kumpf          1.15 Boolean Semaphore::time_wait(Uint32 milliseconds)
413 mike           1.2  {
414                         DWORD errorcode = WaitForSingleObject(_rep.sem, milliseconds);
415 kumpf          1.14 
416                         if (errorcode == WAIT_TIMEOUT)
417 dmitry.mikulin 1.11     {
418 kumpf          1.14         return false;
419 dmitry.mikulin 1.11     }
420 kumpf          1.14 
421                         if (errorcode == WAIT_FAILED)
422                         {
423 kumpf          1.16         throw Exception(MessageLoaderParms(
424                                 "Common.InternalException.SEMAPHORE_WAIT_FAILED",
425                                 "Semaphore wait failed: $0",
426                                 PEGASUS_SYSTEM_ERRORMSG_NLS));
427 kumpf          1.14     }
428                     
429                         return true;
430 mike           1.2  }
431                     
432                     // increment the count of the semaphore
433                     void Semaphore::signal()
434                     {
435                         ReleaseSemaphore(_rep.sem, 1, NULL);
436                     }
437                     
438                     #endif /* PEGASUS_USE_WINDOWS_SEMAPHORE */
439                     
440                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2