1 krisbash 1.1 #include <stdlib.h>
2 #include "lock.h"
3 #include "cpu.h"
4 #include "atomic.h"
5 #include "sem.h"
6 #include "sleep.h"
7
8 #define CACHE_LINE_SIZE 128
9
10 #if defined(CONFIG_FAVORSIZE)
11 #define WAIT_BUCKETS 16
12 #else
13 #define WAIT_BUCKETS 1024
14 #endif /* defined(CONFIG_FAVORSIZE) */
15
16 #define WAIT_BUCKET_MASK (WAIT_BUCKETS - 1)
17 #define WAIT_RESETTING 1
18 #define WAIT_BROADCASTING 0x80000000
19 #define WAIT_COUNT 0x7fffffff
20 #define SEM_POOL_SIZE 64
21
22 krisbash 1.1 #define KEY_EMPTY 0
23 #define KEY_COLLISION -1
24
25 #define SPIN_SIGN 0x80
26 #define SPIN_MAX 0xff
27
28 #define SLEEP_MIN 5
29 #define SLEEP_MAX 1500
30
31 typedef struct _WaitRefs
32 {
33 volatile ptrdiff_t sem; /* Assumes Sem is isomorphic to ptrdiff_t! */
34 volatile ptrdiff_t refs; /* Counts threads that call Sem_Wait. */
35 volatile ptrdiff_t key; /* Detects hash collisions. */
36 } WaitRefs;
37
38 typedef struct _WaitBucket
39 {
40 WaitRefs state[2];
41 volatile ptrdiff_t signals;
42 unsigned char spinState;
43 krisbash 1.1
44 char padding[CACHE_LINE_SIZE
45 - 2*sizeof(WaitRefs)
46 - 2*sizeof(ptrdiff_t)];
47 } WaitBucket;
48
49 typedef enum _WaitPoolState
50 {
51 Unloaded,
52 Loaded,
53 } WaitPoolState;
54
55 static WaitBucket s_waitPool[WAIT_BUCKETS] = {{{{0}}}};
56 static size_t s_highSpinCount = 32768;
57 static size_t s_lowSpinCount = 1024;
58 static volatile ptrdiff_t s_waitPoolState = Unloaded;
59
60 static ptrdiff_t s_semPool[SEM_POOL_SIZE] = {0};
61 static volatile ptrdiff_t s_semPoolCount = 0;
62
63 static void ATEXIT_API ShutdownWaitPool(void)
64 krisbash 1.1 {
65 /* Possible this function could be run multiple times. */
66 ptrdiff_t count = Atomic_Swap(&s_semPoolCount, 0);
67 ptrdiff_t i;
68
69 for (i = 0; i < count; i++)
70 Sem_Destroy((Sem *)&s_semPool[i]);
71 }
72
73 static void InitializeWaitPool()
74 {
75 if (CPU_GetCount() == 1)
76 {
77 /* Spinning is useless on single-CPU machines. */
78 s_highSpinCount = 0;
79 s_lowSpinCount = 0;
80 }
81
82 /* Possible multiple threads could get here, but that's okay. */
83 PAL_Atexit(ShutdownWaitPool);
84
85 krisbash 1.1 /* Atomic swap used as memory barrier. */
86 Atomic_Swap(&s_waitPoolState, Loaded);
87 }
88
89 static ptrdiff_t GetPooledSemaphore()
90 {
91 ptrdiff_t oldCount, newCount, swapCount;
92 ptrdiff_t temp;
93
94 for(;;)
95 {
96 oldCount = PAL_PREFETCH(&s_semPoolCount);
97
98 if (oldCount == 0)
99 {
100 /* The pool is empty. Create the semaphore the normal way. */
101 if (Sem_Init((Sem*)&temp, SEM_USER_ACCESS_DEFAULT, 0) != 0)
102 return 0;
103 return temp;
104 }
105
106 krisbash 1.1 newCount = oldCount - 1;
107 temp = PAL_PREFETCH(s_semPool + newCount);
108
109 /* Check to see if the slot is NULL. If so, spin. */
110 if (temp == 0)
111 continue;
112
113 /* Try to take ownership of the semaphore. */
114 temp = Atomic_Swap(s_semPool + newCount, 0);
115 if (temp == 0)
116 continue;
117
118 /* Got ownership. Try to decrement the count. */
119 swapCount = Atomic_CompareAndSwap(&s_semPoolCount, oldCount, newCount);
120 if (swapCount != oldCount)
121 {
122 /* The count changed. Put the semaphore back and retry. */
123 s_semPool[newCount] = temp;
124 continue;
125 }
126
127 krisbash 1.1 /* Successfully decremented the count. */
128 return temp;
129 }
130 }
131
132 static void RecyclePooledSemaphore(
133 _In_ ptrdiff_t sem
134 )
135 {
136 ptrdiff_t oldCount, newCount, swapCount;
137
138 for (;;)
139 {
140 oldCount = PAL_PREFETCH(&s_semPoolCount);
141
142 if (oldCount == SEM_POOL_SIZE)
143 {
144 /* The pool is full. Destroy the semaphore the normal way. */
145 Sem_Destroy((Sem*)&sem);
146 return;
147 }
148 krisbash 1.1
149 newCount = oldCount + 1;
150
151 /* Try to allocate space for the semaphore. */
152 swapCount = Atomic_CompareAndSwap(&s_semPoolCount, oldCount, newCount);
153 if (swapCount != oldCount)
154 continue;
155
156 /* We own this slot until we make it non-NULL. */
157 s_semPool[oldCount] = sem;
158 return;
159 }
160 }
161
162 static void LeaveWaitPool(
163 _Inout_ WaitRefs* state
164 )
165 {
166 ptrdiff_t sem;
167 ptrdiff_t newRefs;
168
169 krisbash 1.1 newRefs = Atomic_Dec(&state->refs) & WAIT_COUNT;
170 if (newRefs == WAIT_RESETTING)
171 {
172 /* The WAIT_RESETTING state is used to clear sem/key safely.
173 * No one can change states except us. */
174 sem = state->sem;
175
176 state->sem = 0;
177 state->key = KEY_EMPTY;
178
179 /* Atomic swap used as memory barrier. */
180 Atomic_Swap(&state->refs, 0);
181
182 if (sem != 0)
183 RecyclePooledSemaphore(sem);
184 }
185 }
186
187 static int EnterWaitPool(
188 _Inout_ WaitRefs* state,
189 int broadcast
190 krisbash 1.1 )
191 {
192 ptrdiff_t sem, swapSem;
193 ptrdiff_t oldRefs, newRefs, swapRefs;
194
195 /* Check for one-time initialization work. */
196 if (s_waitPoolState != Loaded)
197 InitializeWaitPool();
198
199 for (;;)
200 {
201 oldRefs = PAL_PREFETCH(&state->refs);
202
203 if (oldRefs & WAIT_BROADCASTING)
204 return 1; /* Someone is broadcasting. Caller should yield. */
205 else if (oldRefs == 0)
206 newRefs = 2; /* Extra reference for WAIT_RESETTING. */
207 else if (oldRefs > 1)
208 newRefs = oldRefs + 1;
209 else
210 return 1; /* oldRefs == WAIT_RESETTING. Caller should yield. */
211 krisbash 1.1
212 if (broadcast)
213 newRefs |= WAIT_BROADCASTING;
214
215 swapRefs = Atomic_CompareAndSwap(&state->refs, oldRefs, newRefs);
216 if (swapRefs == oldRefs)
217 break;
218 }
219
220 if (state->sem == 0)
221 {
222 sem = GetPooledSemaphore();
223 if (sem == 0)
224 {
225 /* This is not fatal. The caller should yield and try again. */
226 NitsIgnoringError();
227 LeaveWaitPool(state);
228 return 1;
229 }
230
231 /* Try to install the semaphore. Somebody might beat us to it. */
232 krisbash 1.1 swapSem = Atomic_CompareAndSwap(&state->sem, 0, sem);
233 if (swapSem != 0)
234 RecyclePooledSemaphore(sem);
235 }
236
237 /* The WaitRefs structure now has a valid semaphore and refs >= 2. */
238 return 0;
239 }
240
241 #define WaitBucketHash(x) ((x) ^ ((x) >> 3) ^ ((x) >> 12) ^ ((x) >> 22))
242 #define WaitBucketForKey(x) (s_waitPool + (WaitBucketHash(x) & WAIT_BUCKET_MASK))
243
244 int CondLock_Wait(
245 ptrdiff_t key,
246 _In_ volatile const ptrdiff_t* destination,
247 ptrdiff_t comparand,
248 size_t spinCount)
249 {
250 WaitBucket* bucket;
251 WaitRefs* state;
252 size_t spins;
253 krisbash 1.1 ptrdiff_t startValue;
254 ptrdiff_t oldKey, swapKey;
255 int sleep = SLEEP_MIN;
256
257 bucket = WaitBucketForKey(key);
258 startValue = bucket->signals;
259
260 /* Read bucket->signals before reading *destination. */
261 NonX86MemoryBarrier();
262
263 /* Determine whether we already missed a state change. */
264 if (*destination != comparand)
265 return 1;
266
267 /* Select a spin count. */
268 if (spinCount == CONDLOCK_DEFAULT_SPINCOUNT)
269 spinCount = (bucket->spinState >= SPIN_SIGN) ?
270 s_lowSpinCount :
271 s_highSpinCount;
272 else if (spinCount == CONDLOCK_HIGH_SPINCOUNT)
273 spinCount = s_highSpinCount;
274 krisbash 1.1 else if (spinCount == CONDLOCK_LOW_SPINCOUNT)
275 spinCount = s_lowSpinCount;
276
277 for (spins = 0; spins < spinCount; spins++)
278 {
279 if (bucket->signals != startValue)
280 {
281 /* Signal/Broadcast was called during spinning. */
282 /* Deliberately imprecise. */
283 bucket->spinState = (bucket->spinState > 2) ?
284 (bucket->spinState - 2) : 0;
285 return 1;
286 }
287 }
288
289 /* Select one of the two WaitRefs structures. */
290 state = bucket->state + (startValue & 1);
291
292 for (;;)
293 {
294 if (bucket->signals != startValue)
295 krisbash 1.1 break;
296
297 /* This prevents the semaphore from becoming invalid. */
298 /* All failure paths yield, then try again. */
299 if (EnterWaitPool(state, 0) != 0)
300 {
301 sleep += sleep;
302 if (sleep > SLEEP_MAX)
303 sleep = SLEEP_MAX;
304
305 Sleep_Milliseconds(sleep);
306 continue;
307 }
308
309 for (;;)
310 {
311 /* KEY_EMPTY is a special value meaning no key is set. */
312 if (key == KEY_EMPTY)
313 key++;
314
315 /* Determine if the stored key is the same. */
316 krisbash 1.1 oldKey = state->key;
317 if (key == oldKey)
318 break;
319
320 /* If the stored key is non-empty, we have a collision. */
321 if (oldKey != KEY_EMPTY)
322 key = KEY_COLLISION;
323
324 swapKey = Atomic_CompareAndSwap(&state->key, oldKey, key);
325 if (swapKey == oldKey)
326 break;
327 }
328
329 if (bucket->signals == startValue)
330 Sem_Wait((Sem*)&state->sem);
331
332 LeaveWaitPool(state);
333 }
334
335 /* Didn't succeed with spinning; woke up later. */
336 /* Deliberately imprecise. */
337 krisbash 1.1 bucket->spinState = (bucket->spinState < SPIN_MAX) ?
338 (bucket->spinState + 1) :
339 bucket->spinState;
340 return 0;
341 }
342
343 void CondLock_Broadcast(
344 ptrdiff_t key)
345 {
346 WaitBucket* bucket;
347 WaitRefs* state;
348 ptrdiff_t startValue;
349 ptrdiff_t refs;
350
351 bucket = WaitBucketForKey(key);
352
353 /* Bump the number of signals. Spinning Wait calls will complete. */
354 startValue = Atomic_Inc(&bucket->signals) - 1;
355
356 /* Choose the same WaitRefs that the Wait calls did. */
357 state = bucket->state + (startValue & 1);
358 krisbash 1.1
359 /* Short-circuit if there is no one to wake up. */
360 if (state->refs == 0)
361 return;
362
363 /* This prevents the semaphore from becoming invalid. */
364 if (EnterWaitPool(state, 1) != 0)
365 return;
366
367 /* Signal everyone except us and the ref for WAIT_RESETTING. */
368 /* Spurious wakeups are possible, but this is transient and rare. */
369 refs = (state->refs & WAIT_COUNT) - 2;
370 Sem_Post((Sem*)&state->sem, (unsigned int)refs);
371
372 LeaveWaitPool(state);
373 }
374
375 void CondLock_BroadcastSpinners(
376 ptrdiff_t key)
377 {
378 WaitBucket* bucket;
379 krisbash 1.1
380 bucket = WaitBucketForKey(key);
381
382 /* Bump signals without changing which semaphore gets posted next. */
383 Atomic_Add(&bucket->signals, 2);
384 }
385
386 void CondLock_Signal(
387 ptrdiff_t key)
388 {
389 WaitBucket* bucket;
390 WaitRefs* state;
391 ptrdiff_t startValue;
392 ptrdiff_t refs;
393
394 bucket = WaitBucketForKey(key);
395
396 /* Bump the number of signals. Spinning Wait calls will complete. */
397 startValue = Atomic_Inc(&bucket->signals) - 1;
398
399 /* Choose the same WaitRefs that the Wait calls did. */
400 krisbash 1.1 state = bucket->state + (startValue & 1);
401
402 /* Short-circuit if there is no one to wake up. */
403 if (state->refs == 0)
404 return;
405
406 /* This prevents the semaphore from becoming invalid. */
407 if (EnterWaitPool(state, 0) != 0)
408 return;
409
410 Sem_Post((Sem*)&state->sem, 1);
411
412 if (state->key == KEY_COLLISION &&
413 EnterWaitPool(state, 1) == 0)
414 {
415 /* There is no way to prevent waking the wrong thread.
416 * Fall back to waking everyone. */
417
418 /* Cannot factor in the Post we already made, because new threads were
419 * not notified of the broadcast flag until now. */
420 refs = (state->refs & WAIT_COUNT) - 2;
421 krisbash 1.1 Sem_Post((Sem*)&state->sem, (unsigned int)refs);
422
423 LeaveWaitPool(state);
424 }
425
426 LeaveWaitPool(state);
427 }
|