(file) Return to condlockatomic.c CVS log (file) (dir) Up to [OMI] / omi / pal

  1 krisbash 1.1 #include <stdlib.h>
  2              #include "lock.h"
  3              #include "cpu.h"
  4              #include "atomic.h"
  5              #include "sem.h"
  6              #include "sleep.h"
  7              
  8              #define CACHE_LINE_SIZE 128
  9              
 10              #if defined(CONFIG_FAVORSIZE)
 11              #define WAIT_BUCKETS 16
 12              #else
 13              #define WAIT_BUCKETS 1024
 14              #endif /* defined(CONFIG_FAVORSIZE) */
 15              
 16              #define WAIT_BUCKET_MASK (WAIT_BUCKETS - 1)
 17              #define WAIT_RESETTING 1
 18              #define WAIT_BROADCASTING 0x80000000
 19              #define WAIT_COUNT 0x7fffffff
 20              #define SEM_POOL_SIZE 64
 21              
 22 krisbash 1.1 #define KEY_EMPTY     0
 23              #define KEY_COLLISION -1
 24              
 25              #define SPIN_SIGN 0x80
 26              #define SPIN_MAX  0xff
 27              
 28              #define SLEEP_MIN 5 
 29              #define SLEEP_MAX 1500
 30              
 31              typedef struct _WaitRefs
 32              {
 33                  volatile ptrdiff_t sem;  /* Assumes Sem is isomorphic to ptrdiff_t! */
 34                  volatile ptrdiff_t refs; /* Counts threads that call Sem_Wait. */
 35                  volatile ptrdiff_t key;  /* Detects hash collisions. */
 36              } WaitRefs;
 37              
 38              typedef struct _WaitBucket
 39              {
 40                  WaitRefs state[2];
 41                  volatile ptrdiff_t signals;
 42                  unsigned char spinState;
 43 krisbash 1.1 
 44                  char padding[CACHE_LINE_SIZE
 45                      - 2*sizeof(WaitRefs)
 46                      - 2*sizeof(ptrdiff_t)];
 47              } WaitBucket;
 48              
 49              typedef enum _WaitPoolState
 50              {
 51                  Unloaded,
 52                  Loaded,
 53              } WaitPoolState;
 54              
 55              static WaitBucket s_waitPool[WAIT_BUCKETS] = {{{{0}}}};
 56              static size_t s_highSpinCount = 32768;
 57              static size_t s_lowSpinCount = 1024;
 58              static volatile ptrdiff_t s_waitPoolState = Unloaded;
 59              
 60              static ptrdiff_t s_semPool[SEM_POOL_SIZE] = {0};
 61              static volatile ptrdiff_t s_semPoolCount = 0;
 62              
 63              static void ATEXIT_API ShutdownWaitPool(void)
 64 krisbash 1.1 {
 65                  /* Possible this function could be run multiple times. */
 66                  ptrdiff_t count = Atomic_Swap(&s_semPoolCount, 0);
 67                  ptrdiff_t i;
 68              
 69                  for (i = 0; i < count; i++)
 70                      Sem_Destroy((Sem *)&s_semPool[i]);
 71              }
 72              
 73              static void InitializeWaitPool()
 74              {
 75                  if (CPU_GetCount() == 1)
 76                  {
 77                      /* Spinning is useless on single-CPU machines. */
 78                      s_highSpinCount = 0;
 79                      s_lowSpinCount = 0;
 80                  }
 81              
 82                  /* Possible multiple threads could get here, but that's okay. */
 83                  PAL_Atexit(ShutdownWaitPool);
 84              
 85 krisbash 1.1     /* Atomic swap used as memory barrier. */
 86                  Atomic_Swap(&s_waitPoolState, Loaded);
 87              }
 88              
 89              static ptrdiff_t GetPooledSemaphore()
 90              {
 91                  ptrdiff_t oldCount, newCount, swapCount;
 92                  ptrdiff_t temp;
 93              
 94                  for(;;)
 95                  {
 96                      oldCount = PAL_PREFETCH(&s_semPoolCount);
 97              
 98                      if (oldCount == 0)
 99                      {
100                          /* The pool is empty. Create the semaphore the normal way. */
101                          if (Sem_Init((Sem*)&temp, SEM_USER_ACCESS_DEFAULT, 0) != 0)
102                              return 0;
103                          return temp;
104                      }
105              
106 krisbash 1.1         newCount = oldCount - 1;
107                      temp = PAL_PREFETCH(s_semPool + newCount);
108              
109                      /* Check to see if the slot is NULL. If so, spin. */
110                      if (temp == 0)
111                          continue;
112              
113                      /* Try to take ownership of the semaphore. */
114                      temp = Atomic_Swap(s_semPool + newCount, 0);
115                      if (temp == 0)
116                          continue;
117              
118                      /* Got ownership. Try to decrement the count. */
119                      swapCount = Atomic_CompareAndSwap(&s_semPoolCount, oldCount, newCount);
120                      if (swapCount != oldCount)
121                      {
122                          /* The count changed. Put the semaphore back and retry. */
123                          s_semPool[newCount] = temp;
124                          continue;
125                      }
126              
127 krisbash 1.1         /* Successfully decremented the count. */
128                      return temp;
129                  }
130              }
131              
132              static void RecyclePooledSemaphore(
133                  _In_ ptrdiff_t sem
134              )
135              {
136                  ptrdiff_t oldCount, newCount, swapCount;
137              
138                  for (;;)
139                  {
140                      oldCount = PAL_PREFETCH(&s_semPoolCount);
141              
142                      if (oldCount == SEM_POOL_SIZE)
143                      {
144                          /* The pool is full. Destroy the semaphore the normal way. */
145                          Sem_Destroy((Sem*)&sem);
146                          return;
147                      }
148 krisbash 1.1 
149                      newCount = oldCount + 1;
150              
151                      /* Try to allocate space for the semaphore. */
152                      swapCount = Atomic_CompareAndSwap(&s_semPoolCount, oldCount, newCount);
153                      if (swapCount != oldCount)
154                          continue;
155              
156                      /* We own this slot until we make it non-NULL. */
157                      s_semPool[oldCount] = sem;
158                      return;
159                  }
160              }
161              
162              static void LeaveWaitPool(
163                  _Inout_ WaitRefs* state
164              )
165              {
166                  ptrdiff_t sem;
167                  ptrdiff_t newRefs;
168              
169 krisbash 1.1     newRefs = Atomic_Dec(&state->refs) & WAIT_COUNT;
170                  if (newRefs == WAIT_RESETTING)
171                  {
172                      /* The WAIT_RESETTING state is used to clear sem/key safely.
173                       * No one can change states except us. */
174                      sem = state->sem;
175              
176                      state->sem = 0;
177                      state->key = KEY_EMPTY;
178              
179                      /* Atomic swap used as memory barrier. */
180                      Atomic_Swap(&state->refs, 0);
181              
182                      if (sem != 0)
183                          RecyclePooledSemaphore(sem);
184                  }
185              }
186              
187              static int EnterWaitPool(
188                  _Inout_ WaitRefs* state,
189                          int broadcast
190 krisbash 1.1 )
191              {
192                  ptrdiff_t sem, swapSem;
193                  ptrdiff_t oldRefs, newRefs, swapRefs;
194              
195                  /* Check for one-time initialization work. */
196                  if (s_waitPoolState != Loaded)
197                      InitializeWaitPool();
198              
199                  for (;;)
200                  {
201                      oldRefs = PAL_PREFETCH(&state->refs);
202              
203                      if (oldRefs & WAIT_BROADCASTING)
204                          return 1; /* Someone is broadcasting. Caller should yield. */
205                      else if (oldRefs == 0)
206                          newRefs = 2; /* Extra reference for WAIT_RESETTING. */
207                      else if (oldRefs > 1)
208                          newRefs = oldRefs + 1;
209                      else
210                          return 1; /* oldRefs == WAIT_RESETTING. Caller should yield. */
211 krisbash 1.1 
212                      if (broadcast)
213                          newRefs |= WAIT_BROADCASTING;
214              
215                      swapRefs = Atomic_CompareAndSwap(&state->refs, oldRefs, newRefs);
216                      if (swapRefs == oldRefs)
217                          break;
218                  }
219              
220                  if (state->sem == 0)
221                  {
222                      sem = GetPooledSemaphore();
223                      if (sem == 0)
224                      {
225                          /* This is not fatal. The caller should yield and try again. */            
226                          NitsIgnoringError();
227                          LeaveWaitPool(state);
228                          return 1;
229                      }
230              
231                      /* Try to install the semaphore. Somebody might beat us to it. */
232 krisbash 1.1         swapSem = Atomic_CompareAndSwap(&state->sem, 0, sem);
233                      if (swapSem != 0)
234                          RecyclePooledSemaphore(sem);
235                  }
236              
237                  /* The WaitRefs structure now has a valid semaphore and refs >= 2. */
238                  return 0;
239              }
240              
241              #define WaitBucketHash(x)  ((x) ^ ((x) >> 3) ^ ((x) >> 12) ^ ((x) >> 22))
242              #define WaitBucketForKey(x) (s_waitPool + (WaitBucketHash(x) & WAIT_BUCKET_MASK))
243              
244              int CondLock_Wait(
245                                      ptrdiff_t key,
246                  _In_ volatile const ptrdiff_t* destination,
247                                      ptrdiff_t comparand,
248                                      size_t spinCount)
249              {
250                  WaitBucket* bucket;
251                  WaitRefs* state;
252                  size_t spins;
253 krisbash 1.1     ptrdiff_t startValue;
254                  ptrdiff_t oldKey, swapKey;
255                  int sleep = SLEEP_MIN;
256              
257                  bucket = WaitBucketForKey(key);
258                  startValue = bucket->signals;
259              
260                  /* Read bucket->signals before reading *destination. */
261                  NonX86MemoryBarrier();
262              
263                  /* Determine whether we already missed a state change. */
264                  if (*destination != comparand)
265                      return 1;
266              
267                  /* Select a spin count. */
268                  if (spinCount == CONDLOCK_DEFAULT_SPINCOUNT)
269                      spinCount = (bucket->spinState >= SPIN_SIGN) ?
270                          s_lowSpinCount :
271                          s_highSpinCount;
272                  else if (spinCount == CONDLOCK_HIGH_SPINCOUNT)
273                      spinCount = s_highSpinCount;
274 krisbash 1.1     else if (spinCount == CONDLOCK_LOW_SPINCOUNT)
275                      spinCount = s_lowSpinCount;
276              
277                  for (spins = 0; spins < spinCount; spins++)
278                  {
279                      if (bucket->signals != startValue)
280                      {
281                          /* Signal/Broadcast was called during spinning. */
282                          /* Deliberately imprecise. */
283                          bucket->spinState = (bucket->spinState > 2) ?
284                              (bucket->spinState - 2) : 0;
285                          return 1;
286                      }
287                  }
288              
289                  /* Select one of the two WaitRefs structures. */
290                  state = bucket->state + (startValue & 1);
291              
292                  for (;;)
293                  {
294                      if (bucket->signals != startValue)
295 krisbash 1.1             break;
296              
297                      /* This prevents the semaphore from becoming invalid. */
298                      /* All failure paths yield, then try again. */
299                      if (EnterWaitPool(state, 0) != 0)
300                      {
301                          sleep += sleep;
302                          if (sleep > SLEEP_MAX)
303                              sleep = SLEEP_MAX;
304              
305                          Sleep_Milliseconds(sleep);
306                          continue;
307                      }
308              
309                      for (;;)
310                      {
311                          /* KEY_EMPTY is a special value meaning no key is set. */
312                          if (key == KEY_EMPTY)
313                              key++;
314              
315                          /* Determine if the stored key is the same. */
316 krisbash 1.1             oldKey = state->key;
317                          if (key == oldKey)
318                              break;
319              
320                          /* If the stored key is non-empty, we have a collision. */
321                          if (oldKey != KEY_EMPTY)
322                              key = KEY_COLLISION;
323              
324                          swapKey = Atomic_CompareAndSwap(&state->key, oldKey, key);
325                          if (swapKey == oldKey)
326                              break;
327                      }
328              
329                      if (bucket->signals == startValue)
330                          Sem_Wait((Sem*)&state->sem);
331              
332                      LeaveWaitPool(state);
333                  }
334                  
335                  /* Didn't succeed with spinning; woke up later. */
336                  /* Deliberately imprecise. */
337 krisbash 1.1     bucket->spinState = (bucket->spinState < SPIN_MAX) ?
338                      (bucket->spinState + 1) :
339                      bucket->spinState;
340                  return 0;
341              }
342              
343              void CondLock_Broadcast(
344                  ptrdiff_t key)
345              {
346                  WaitBucket* bucket;
347                  WaitRefs* state;
348                  ptrdiff_t startValue;
349                  ptrdiff_t refs;
350                  
351                  bucket = WaitBucketForKey(key);
352              
353                  /* Bump the number of signals. Spinning Wait calls will complete. */
354                  startValue = Atomic_Inc(&bucket->signals) - 1;
355              
356                  /* Choose the same WaitRefs that the Wait calls did. */
357                  state = bucket->state + (startValue & 1);
358 krisbash 1.1 
359                  /* Short-circuit if there is no one to wake up. */
360                  if (state->refs == 0)
361                      return;
362              
363                  /* This prevents the semaphore from becoming invalid. */
364                  if (EnterWaitPool(state, 1) != 0)
365                      return;
366              
367                  /* Signal everyone except us and the ref for WAIT_RESETTING. */
368                  /* Spurious wakeups are possible, but this is transient and rare. */
369                  refs = (state->refs & WAIT_COUNT) - 2;
370                  Sem_Post((Sem*)&state->sem, (unsigned int)refs);
371              
372                  LeaveWaitPool(state);
373              }
374              
375              void CondLock_BroadcastSpinners(
376                  ptrdiff_t key)
377              {
378                  WaitBucket* bucket;
379 krisbash 1.1     
380                  bucket = WaitBucketForKey(key);
381              
382                  /* Bump signals without changing which semaphore gets posted next. */
383                  Atomic_Add(&bucket->signals, 2);
384              }
385              
386              void CondLock_Signal(
387                  ptrdiff_t key)
388              {
389                  WaitBucket* bucket;
390                  WaitRefs* state;
391                  ptrdiff_t startValue;
392                  ptrdiff_t refs;
393              
394                  bucket = WaitBucketForKey(key);
395              
396                  /* Bump the number of signals. Spinning Wait calls will complete. */
397                  startValue = Atomic_Inc(&bucket->signals) - 1;
398              
399                  /* Choose the same WaitRefs that the Wait calls did. */
400 krisbash 1.1     state = bucket->state + (startValue & 1);
401              
402                  /* Short-circuit if there is no one to wake up. */
403                  if (state->refs == 0)
404                      return;
405              
406                  /* This prevents the semaphore from becoming invalid. */
407                  if (EnterWaitPool(state, 0) != 0)
408                      return;
409              
410                  Sem_Post((Sem*)&state->sem, 1);
411              
412                  if (state->key == KEY_COLLISION &&
413                      EnterWaitPool(state, 1) == 0)
414                  {
415                      /* There is no way to prevent waking the wrong thread.
416                       * Fall back to waking everyone. */
417              
418                      /* Cannot factor in the Post we already made, because new threads were
419                       * not notified of the broadcast flag until now. */
420                      refs = (state->refs & WAIT_COUNT) - 2;
421 krisbash 1.1         Sem_Post((Sem*)&state->sem, (unsigned int)refs);
422              
423                      LeaveWaitPool(state);
424                  }
425              
426                  LeaveWaitPool(state);
427              }

ViewCVS 0.9.2