1 martin 1.17 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.18 //
|
3 martin 1.17 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.18 //
|
10 martin 1.17 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.18 //
|
17 martin 1.17 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.18 //
|
20 martin 1.17 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.18 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.17 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.18 //
|
28 martin 1.17 //////////////////////////////////////////////////////////////////////////
|
29 mike 1.2 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Time.h>
|
33 kumpf 1.16 #include <Pegasus/Common/Exception.h>
34 #include <Pegasus/Common/System.h>
|
35 mike 1.2 #include "Semaphore.h"
36
|
37 kumpf 1.7 PEGASUS_NAMESPACE_BEGIN
|
38 mike 1.6
39 static const Uint32 PEGASUS_SEM_VALUE_MAX = 0x0000ffff;
|
40 mike 1.2
41 //==============================================================================
42 //
43 // PEGASUS_USE_PTHREAD_SEMAPHORE
44 //
45 //==============================================================================
46
47 #if defined(PEGASUS_USE_PTHREAD_SEMAPHORE)
48
49 Semaphore::Semaphore(Uint32 initial)
50 {
|
51 mike 1.5 pthread_mutex_init(&_rep.mutex, NULL);
52 pthread_cond_init(&_rep.cond, NULL);
|
53 mike 1.2
54 if (initial > PEGASUS_SEM_VALUE_MAX)
|
55 dmitry.mikulin 1.11 {
|
56 kumpf 1.16 _rep.count = PEGASUS_SEM_VALUE_MAX - 1;
|
57 dmitry.mikulin 1.11 }
|
58 mike 1.2 else
|
59 dmitry.mikulin 1.11 {
|
60 kumpf 1.16 _rep.count = initial;
|
61 dmitry.mikulin 1.11 }
|
62 mike 1.2
63 _rep.owner = Threads::self();
64 _rep.waiters = 0;
65 }
66
67 Semaphore::~Semaphore()
68 {
|
69 ouyang.jian 1.9 #if !defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
70 && !defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
|
71 mike 1.5 pthread_mutex_lock(&_rep.mutex);
72 int r = 0;
73 while ((r = pthread_cond_destroy(&_rep.cond) == EBUSY) ||
74 (r == -1 && errno == EBUSY))
75 {
76 pthread_mutex_unlock(&_rep.mutex);
77 Threads::yield();
78 pthread_mutex_lock(&_rep.mutex);
79 }
80 pthread_mutex_unlock(&_rep.mutex);
81 pthread_mutex_destroy(&_rep.mutex);
|
82 mike 1.2 #else
|
83 mike 1.5 int val;
84 val = pthread_mutex_destroy(&_rep.mutex);
85
86 if (val != 0)
|
87 dmitry.mikulin 1.11 {
|
88 mike 1.5 pthread_cond_destroy(&_rep.cond);
|
89 dmitry.mikulin 1.11 }
|
90 mike 1.5 else
|
91 dmitry.mikulin 1.11 {
|
92 mike 1.5 val = pthread_cond_destroy(&_rep.cond);
|
93 dmitry.mikulin 1.11 }
|
94 mike 1.5
95 while (EBUSY == val)
96 {
97 Threads::yield();
98 val = pthread_mutex_destroy(&_rep.mutex);
99 if (val != 0)
|
100 dmitry.mikulin 1.11 {
|
101 mike 1.5 pthread_cond_destroy(&_rep.cond);
|
102 dmitry.mikulin 1.11 }
|
103 mike 1.5 else
|
104 dmitry.mikulin 1.11 {
|
105 mike 1.5 val = pthread_cond_destroy(&_rep.cond);
|
106 dmitry.mikulin 1.11 }
|
107 mike 1.5 }
|
108 mike 1.2 #endif
109 }
110
|
111 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
112 || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
|
113 mike 1.2 // cleanup function
114 static void semaphore_cleanup(void *arg)
115 {
|
116 mike 1.5 // cast back to proper type and unlock mutex
|
117 dave.sudlik 1.8 SemaphoreRep *s = (SemaphoreRep *) arg;
|
118 mike 1.5 pthread_mutex_unlock(&s->mutex);
|
119 mike 1.2 }
120 #endif
121
122 // block until this semaphore is in a signalled state or
123 // throw an exception if the wait fails
|
124 kumpf 1.13 void Semaphore::wait()
|
125 mike 1.2 {
|
126 mike 1.5 // Acquire mutex to enter critical section.
127 pthread_mutex_lock(&_rep.mutex);
|
128 mike 1.2
|
129 mike 1.5 // Push cleanup function onto cleanup stack
130 // The mutex will unlock if the thread is killed early
|
131 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
132 || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
|
133 mike 1.2 Threads::cleanup_push(&semaphore_cleanup, &_rep);
134 #endif
135
|
136 mike 1.5 // Keep track of the number of waiters so that <sema_post> works correctly.
137 _rep.waiters++;
|
138 mike 1.2
|
139 mike 1.5 // Wait until the semaphore count is > 0, then atomically release
140 // <lock_> and wait for <count_nonzero_> to be signaled.
|
141 kumpf 1.16 while (_rep.count == 0)
|
142 dmitry.mikulin 1.11 {
|
143 mike 1.5 pthread_cond_wait(&_rep.cond, &_rep.mutex);
|
144 dmitry.mikulin 1.11 }
|
145 mike 1.2
|
146 mike 1.5 // <_rep.mutex> is now held.
|
147 mike 1.2
|
148 mike 1.5 // Decrement the waiters count.
149 _rep.waiters--;
|
150 mike 1.2
|
151 mike 1.5 // Decrement the semaphore's count.
|
152 kumpf 1.16 _rep.count--;
|
153 mike 1.2
154 // Since we push an unlock onto the cleanup stack
|
155 mike 1.5 // We will pop it off to release the mutex when leaving the critical
156 // section.
|
157 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
158 || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
|
159 mike 1.5 Threads::cleanup_pop(1);
|
160 mike 1.2 #endif
|
161 mike 1.5 // Release mutex to leave critical section.
162 pthread_mutex_unlock(&_rep.mutex);
|
163 mike 1.2 }
164
|
165 kumpf 1.14 Boolean Semaphore::time_wait(Uint32 milliseconds)
|
166 mike 1.2 {
|
167 mike 1.5 // Acquire mutex to enter critical section.
168 pthread_mutex_lock(&_rep.mutex);
169 Boolean timedOut = false;
|
170 mike 1.2
|
171 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
172 || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
|
173 mike 1.5 // Push cleanup function onto cleanup stack
174 // The mutex will unlock if the thread is killed early
|
175 dave.sudlik 1.8 Threads::cleanup_push(&semaphore_cleanup, &_rep);
|
176 mike 1.2 #endif
177
|
178 mike 1.5 // Keep track of the number of waiters so that <sema_post> works correctly.
179 _rep.waiters++;
180
181 struct timeval now = { 0, 0 };
182 struct timespec waittime = { 0, 0 };
183 gettimeofday(&now, NULL);
|
184 venkat.puvvada 1.20
185 waittime.tv_sec = now.tv_sec + (milliseconds / 1000);
186 milliseconds = milliseconds % 1000;
|
187 mike 1.5 waittime.tv_nsec = now.tv_usec + (milliseconds * 1000); // microseconds
188 waittime.tv_sec += (waittime.tv_nsec / 1000000); // roll overflow into
189 waittime.tv_nsec = (waittime.tv_nsec % 1000000); // the "seconds" part
190 waittime.tv_nsec = waittime.tv_nsec * 1000; // convert to nanoseconds
|
191 mike 1.2
|
192 kumpf 1.16 while ((_rep.count == 0) && !timedOut)
|
193 mike 1.5 {
194 int r = pthread_cond_timedwait(&_rep.cond, &_rep.mutex, &waittime);
195
|
196 marek 1.21 #ifdef PEGASUS_OS_ZOS
197 if (((r==-1 && errno==EAGAIN) || (r==ETIMEDOUT)) && _rep.count==0)
198 #else
199 if (((r==-1 && errno==ETIMEDOUT) || (r==ETIMEDOUT)) && _rep.count==0)
200 #endif
|
201 mike 1.5 {
202 timedOut = true;
203 }
204 }
|
205 mike 1.2
|
206 mike 1.5 if (!timedOut)
207 {
208 // Decrement the semaphore's count.
|
209 kumpf 1.16 _rep.count--;
|
210 mike 1.5 }
211
212 // Decrement the waiters count.
213 _rep.waiters--;
|
214 mike 1.2
|
215 ouyang.jian 1.9 #if defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) \
216 || defined(PEGASUS_PLATFORM_PASE_ISERIES_IBMCXX)
|
217 mike 1.5 // Since we push an unlock onto the cleanup stack
218 // We will pop it off to release the mutex when leaving the critical
219 // section.
220 Threads::cleanup_pop(1);
|
221 mike 1.2 #endif
222
|
223 mike 1.5 // Release mutex to leave critical section.
224 pthread_mutex_unlock(&_rep.mutex);
|
225 mike 1.2
|
226 kumpf 1.14 return !timedOut;
|
227 mike 1.2 }
228
229 // increment the count of the semaphore
230 void Semaphore::signal()
231 {
|
232 mike 1.5 pthread_mutex_lock(&_rep.mutex);
|
233 mike 1.2
|
234 mike 1.5 // Always allow one thread to continue if it is waiting.
235 if (_rep.waiters > 0)
|
236 dmitry.mikulin 1.11 {
|
237 mike 1.5 pthread_cond_signal(&_rep.cond);
|
238 dmitry.mikulin 1.11 }
|
239 mike 1.2
|
240 mike 1.5 // Increment the semaphore's count.
|
241 kumpf 1.16 _rep.count++;
|
242 mike 1.2
|
243 mike 1.5 pthread_mutex_unlock(&_rep.mutex);
|
244 mike 1.2 }
245
246 #endif /* PEGASUS_USE_PTHREAD_SEMAPHORE */
247
248 //==============================================================================
249 //
250 // PEGASUS_USE_POSIX_SEMAPHORE
251 //
252 //==============================================================================
253
254 #if defined(PEGASUS_USE_POSIX_SEMAPHORE)
255
256 Semaphore::Semaphore(Uint32 initial)
257 {
|
258 mike 1.5 if (initial > PEGASUS_SEM_VALUE_MAX)
|
259 dmitry.mikulin 1.11 {
|
260 mike 1.5 initial = PEGASUS_SEM_VALUE_MAX - 1;
|
261 dmitry.mikulin 1.11 }
262
|
263 mike 1.5 _rep.owner = Threads::self();
|
264 dmitry.mikulin 1.11 if (sem_init(&_rep.sem, 0, initial) == -1)
265 {
|
266 kumpf 1.16 throw Exception(MessageLoaderParms(
267 "Common.InternalException.SEMAPHORE_INIT_FAILED",
268 "Semaphore initialization failed: $0",
269 PEGASUS_SYSTEM_ERRORMSG_NLS));
|
270 dmitry.mikulin 1.11 }
|
271 mike 1.2 }
272
273 Semaphore::~Semaphore()
274 {
|
275 dmitry.mikulin 1.11 while (sem_destroy(&_rep.sem) == -1 && errno == EBUSY)
|
276 mike 1.5 {
277 Threads::yield();
278 }
|
279 mike 1.2 }
280
281 // block until this semaphore is in a signalled state, or
282 // throw an exception if the wait fails
|
283 kumpf 1.13 void Semaphore::wait()
|
284 mike 1.2 {
285 do
286 {
287 int rc = sem_wait(&_rep.sem);
288 if (rc == 0)
289 break;
290
|
291 kumpf 1.13 if (errno != EINTR)
|
292 mike 1.2 {
|
293 kumpf 1.16 throw Exception(MessageLoaderParms(
294 "Common.InternalException.SEMAPHORE_WAIT_FAILED",
295 "Semaphore wait failed: $0",
296 PEGASUS_SYSTEM_ERRORMSG_NLS));
|
297 dmitry.mikulin 1.11 }
|
298 mike 1.2
299 // keep going if above conditions fail
|
300 mike 1.5 }
301 while (true);
|
302 mike 1.2
303 }
304
305 // wait for milliseconds and throw an exception
306 // if wait times out without gaining the semaphore
|
307 kumpf 1.14 Boolean Semaphore::time_wait(Uint32 milliseconds)
|
308 mike 1.2 {
|
309 kumpf 1.12 int retcode;
|
310 mike 1.5
311 struct timeval now, finish, remaining;
312 Uint32 usec;
|
313 mike 1.2
|
314 mike 1.5 gettimeofday(&finish, NULL);
315 finish.tv_sec += (milliseconds / 1000);
|
316 marek 1.22 usec = finish.tv_usec + ((milliseconds % 1000) * 1000);
|
317 mike 1.5 finish.tv_sec += (usec / 1000000);
318 finish.tv_usec = usec % 1000000;
|
319 mike 1.2
|
320 mike 1.5 while (1)
321 {
322 do
323 {
324 retcode = sem_trywait(&_rep.sem);
325 }
326 while (retcode == -1 && errno == EINTR);
327
328 if (retcode == 0)
|
329 dmitry.mikulin 1.11 {
|
330 kumpf 1.14 break;
|
331 dmitry.mikulin 1.11 }
|
332 mike 1.5
333 if (retcode == -1 && errno != EAGAIN)
|
334 dmitry.mikulin 1.11 {
|
335 kumpf 1.16 throw Exception(MessageLoaderParms(
336 "Common.InternalException.SEMAPHORE_WAIT_FAILED",
337 "Semaphore wait failed: $0",
338 PEGASUS_SYSTEM_ERRORMSG_NLS));
|
339 dmitry.mikulin 1.11 }
340
|
341 mike 1.5 gettimeofday(&now, NULL);
342 if (Time::subtract(&remaining, &finish, &now))
|
343 dmitry.mikulin 1.11 {
|
344 kumpf 1.14 return false;
|
345 dmitry.mikulin 1.11 }
|
346 marek 1.22 // yield just marks the thread as eligible to be not scheduled by
347 // hypervisor, sleep forces thread to actually take a break
348 // which what is called for here to avoid CPU spikes from close loop
349 Threads::sleep(milliseconds/100+1);
|
350 mike 1.5 }
|
351 kumpf 1.14
352 return true;
|
353 mike 1.2 }
354
355 // increment the count of the semaphore
356 void Semaphore::signal()
357 {
|
358 dmitry.mikulin 1.11 if (sem_post(&_rep.sem) == -1)
359 {
|
360 kumpf 1.16 throw Exception(MessageLoaderParms(
361 "Common.InternalException.SEMAPHORE_SIGNAL_FAILED",
362 "Failed to signal semaphore: $0",
363 PEGASUS_SYSTEM_ERRORMSG_NLS));
|
364 dmitry.mikulin 1.11 }
|
365 mike 1.2 }
366
367 #endif /* PEGASUS_USE_POSIX_SEMAPHORE */
368
369 //==============================================================================
370 //
371 // PEGASUS_USE_WINDOWS_SEMAPHORE
372 //
373 //==============================================================================
374
375 #if defined(PEGASUS_USE_WINDOWS_SEMAPHORE)
376
|
377 mike 1.5 Semaphore::Semaphore(Uint32 initial)
|
378 mike 1.2 {
|
379 mike 1.5 if (initial > PEGASUS_SEM_VALUE_MAX)
|
380 dmitry.mikulin 1.11 {
|
381 mike 1.5 initial = PEGASUS_SEM_VALUE_MAX - 1;
|
382 dmitry.mikulin 1.11 }
|
383 mike 1.5 _rep.owner = Threads::self();
384 _rep.sem = CreateSemaphore(NULL, initial, PEGASUS_SEM_VALUE_MAX, NULL);
|
385 kavita.gupta 1.19 if (_rep.sem == NULL)
386 {
387 throw Exception(MessageLoaderParms(
388 "Common.InternalException.SEMAPHORE_INIT_FAILED",
389 "Semaphore initialization failed: $0",
390 PEGASUS_SYSTEM_ERRORMSG_NLS));
391 }
|
392 mike 1.2 }
393
394 Semaphore::~Semaphore()
395 {
|
396 mike 1.5 CloseHandle(_rep.sem);
|
397 mike 1.2 }
398
399 // block until this semaphore is in a signalled state
|
400 kumpf 1.13 void Semaphore::wait()
|
401 mike 1.2 {
402 DWORD errorcode = WaitForSingleObject(_rep.sem, INFINITE);
|
403 kumpf 1.16 if (errorcode == WAIT_FAILED)
|
404 dmitry.mikulin 1.11 {
|
405 kumpf 1.16 throw Exception(MessageLoaderParms(
406 "Common.InternalException.SEMAPHORE_WAIT_FAILED",
407 "Semaphore wait failed: $0",
408 PEGASUS_SYSTEM_ERRORMSG_NLS));
|
409 dmitry.mikulin 1.11 }
|
410 mike 1.2 }
411
|
412 kumpf 1.15 Boolean Semaphore::time_wait(Uint32 milliseconds)
|
413 mike 1.2 {
414 DWORD errorcode = WaitForSingleObject(_rep.sem, milliseconds);
|
415 kumpf 1.14
416 if (errorcode == WAIT_TIMEOUT)
|
417 dmitry.mikulin 1.11 {
|
418 kumpf 1.14 return false;
|
419 dmitry.mikulin 1.11 }
|
420 kumpf 1.14
421 if (errorcode == WAIT_FAILED)
422 {
|
423 kumpf 1.16 throw Exception(MessageLoaderParms(
424 "Common.InternalException.SEMAPHORE_WAIT_FAILED",
425 "Semaphore wait failed: $0",
426 PEGASUS_SYSTEM_ERRORMSG_NLS));
|
427 kumpf 1.14 }
428
429 return true;
|
430 mike 1.2 }
431
432 // increment the count of the semaphore
433 void Semaphore::signal()
434 {
435 ReleaseSemaphore(_rep.sem, 1, NULL);
436 }
437
438 #endif /* PEGASUS_USE_WINDOWS_SEMAPHORE */
439
440 PEGASUS_NAMESPACE_END
|