1 mike 1.1 /*
2 **==============================================================================
3 **
4 ** Open Management Infrastructure (OMI)
5 **
6 ** Copyright (c) Microsoft Corporation
7 **
8 ** Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 ** use this file except in compliance with the License. You may obtain a copy
10 ** of the License at
11 **
12 ** http://www.apache.org/licenses/LICENSE-2.0
13 **
14 ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
16 ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
17 ** MERCHANTABLITY OR NON-INFRINGEMENT.
18 **
19 ** See the Apache 2 License for the specific language governing permissions
20 ** and limitations under the License.
21 **
22 mike 1.1 **==============================================================================
23 */
24
25 #include "selector.h"
26 #include <base/time.h>
27 #include "thread.h"
28 #include <base/log.h>
29 #include <base/result.h>
30 #include <base/atomic.h>
31
32 /* maximum number of items in the list */
33 #define LIST_SIZE_LIMIT 321
34
35 /* maximum instances that can be allocated */
36 #define MAX_ALLOCATED_INSTANCES 500
37
38
39 /*
40 **==============================================================================
41 **
42 **
43 mike 1.1 ** Windows Implementation
44 **
45 **
46 **==============================================================================
47 */
48 #if defined(CONFIG_OS_WINDOWS)
49 # include <winsock2.h>
50
51 /* Notification internal datastructure */
52 typedef struct _SelectorCallbacksItem
53 {
54 /* Links for inserting messages onto linked-lists */
55 struct _SelectorCallbacksItem* next;
56 struct _SelectorCallbacksItem* prev;
57
58 Selector_NotificationCallback callback;
59 void* callback_self;
60 /* message has to be add-refed when added and dec-refed upon callback invocation */
61 Message* message;
62 }
63 SelectorCallbacksItem;
64 mike 1.1
65 typedef struct _SelectorCallbacksList
66 {
67 /* Linked list of callbacks to call */
68 ListElem* head;
69 ListElem* tail;
70 int numberOfItem; /**/
71 }
72 SelectorCallbacksList;
73
74 typedef struct _SelectorRep
75 {
76 /* Linked list of event watchers */
77 ListElem* head;
78 ListElem* tail;
79
80 /* Object for detecting socket events */
81 WSAEVENT event;
82
83 /* other thread notification */
84 HANDLE callbacksAreAvailable;
85 mike 1.1
86 /* list object - never referred directly - only using a pointer */
87 SelectorCallbacksList __callback_list_object;
88 /* list of callbacks to notify ;
89 storing pointer here, so we can use interlock funciotns for list updating
90 */
91 SelectorCallbacksList* callbacksList;
92
93 /* Number of alive instance */
94 AtomicInt outstandingInstances;
95
96 /* io thread id */
97 ThreadHandle ioThreadHandle;
98
99 /* flag to stop running */
100 MI_Boolean keepRunning;
101
102 /* flag that allows empty selector running
103 when empty selector runs, it only can be interrupted by
104 internal funcitons, since it has no sockets to monitor
105 */
106 mike 1.1 MI_Boolean allowEmptySelector;
107 }
108 SelectorRep;
109
110 static SelectorCallbacksList* _GetList(SelectorRep* rep)
111 {
112 SelectorCallbacksList * list;
113
114 list = InterlockedExchangePointer( &rep->callbacksList, 0 );
115
116 while ( !list )
117 {
118 Sleep(0);
119 list = InterlockedExchangePointer( &rep->callbacksList, 0 );
120 }
121
122 return list;
123 }
124
125 static void _SetList(SelectorRep* rep, SelectorCallbacksList * list)
126 {
127 mike 1.1 InterlockedExchangePointer( &rep->callbacksList, list );
128 }
129
130 static MI_Result _SetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32 mask)
131 {
132 long e = 0;
133
134 if (mask & SELECTOR_READ)
135 e |= FD_ACCEPT | FD_READ | FD_CLOSE;
136
137 if (mask & SELECTOR_WRITE)
138 e |= FD_WRITE | FD_CONNECT;
139
140 if (mask & SELECTOR_EXCEPTION)
141 e |= FD_OOB;
142
143 if (WSAEventSelect(sock, event, e) == SOCKET_ERROR)
144 return MI_RESULT_FAILED;
145
146 return MI_RESULT_OK;
147 }
148 mike 1.1
149 static MI_Result _GetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32* mask)
150 {
151 WSANETWORKEVENTS networkEvents;
152 long x;
153
154 ZeroMemory(&networkEvents, sizeof(networkEvents));
155
156 if (WSAEnumNetworkEvents(sock, event, &networkEvents) != 0)
157 return MI_RESULT_FAILED;
158
159 x = networkEvents.lNetworkEvents;
160 *mask = 0;
161
162 if (x & FD_ACCEPT || x & FD_READ || x & FD_CLOSE || x & FD_CONNECT)
163 *mask |= SELECTOR_READ;
164
165 if (x & FD_WRITE)
166 *mask |= SELECTOR_WRITE;
167
168 if (x & FD_OOB)
169 mike 1.1 *mask |= SELECTOR_EXCEPTION;
170
171 return MI_RESULT_OK;
172 }
173
174 MI_Result Selector_Init(
175 Selector* self)
176 {
177 SelectorRep* rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));
178
179 if (!rep)
180 return MI_RESULT_FAILED;
181
182 rep->event = WSACreateEvent();
183
184 if (rep->event == WSA_INVALID_EVENT)
185 {
186 free(rep);
187 return MI_RESULT_FAILED;
188 }
189
190 mike 1.1 rep->callbacksList = &rep->__callback_list_object;
191
192 rep->callbacksAreAvailable = CreateEvent( 0, TRUE, FALSE, NULL );
193
194 if (NULL == rep->callbacksAreAvailable)
195 {
196 CloseHandle(rep->event);
197 free(rep);
198 return MI_RESULT_FAILED;
199 }
200
201
202 self->rep = rep;
203 return MI_RESULT_OK;
204 }
205
206 void Selector_Destroy(Selector* self)
207 {
208 SelectorRep* rep = (SelectorRep*)self->rep;
209 Handler* p;
210 Handler* next;
211 mike 1.1
212 /* Free all watchers */
213 for (p = (Handler*)rep->head; p; p = next)
214 {
215 next = (Handler*)p->next;
216
217 /* Unselect events on this socket */
218 _SetSockEvents(rep->event, p->sock, 0);
219
220 /* Invoke user callback */
221 (*p->callback)(self, p, SELECTOR_DESTROY, 0);
222
223 p = next;
224 }
225
226 CloseHandle(rep->event);
227 CloseHandle(rep->callbacksAreAvailable);
228
229 /* free messages in callbacks list */
230 while (rep->callbacksList->head)
231 {
232 mike 1.1 SelectorCallbacksItem* item = (SelectorCallbacksItem*)rep->callbacksList->head;
233
234 List_Remove(&rep->callbacksList->head, &rep->callbacksList->tail, (ListElem*)item);
235 Message_Release(item->message);
236 }
237
238 free(rep);
239 }
240
241 MI_Result Selector_AddHandler(
242 Selector* self,
243 Handler* handler)
244 {
245 SelectorRep* rep = (SelectorRep*)self->rep;
246 Handler* p;
247 MI_Uint64 currentTimeUsec = 0;
248
249 if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
250 return MI_RESULT_FAILED;
251
252 /* Reject duplicates */
253 mike 1.1 for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
254 {
255 if (p == handler)
256 return MI_RESULT_ALREADY_EXISTS;
257 }
258
259 /* Add new handler to list */
260 List_Append(&rep->head, &rep->tail, (ListElem*)handler);
261
262 (*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec);
263
264 return MI_RESULT_OK;
265 }
266
267 MI_Result Selector_RemoveHandler(
268 Selector* self,
269 Handler* handler)
270 {
271 SelectorRep* rep = (SelectorRep*)self->rep;
272 Handler* p;
273
274 mike 1.1 /* Find and remove handler from list */
275 for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
276 {
277 if (p == handler)
278 {
279 /* Remove handler */
280 List_Remove(&rep->head, &rep->tail, (ListElem*)p);
281
282 /* Unselect events on this socket */
283 _SetSockEvents(rep->event, p->sock, 0);
284
285 /* Notify handler of removal */
286 (*handler->callback)(self, p, SELECTOR_REMOVE, 0);
287
288 return MI_RESULT_OK;
289 }
290 }
291
292 return MI_RESULT_NOT_FOUND;
293 }
294
295 mike 1.1 MI_Result Selector_RemoveAllHandlers(
296 Selector* self)
297 {
298 SelectorRep* rep = (SelectorRep*)self->rep;
299 Handler* p;
300
301 /* Find and remove handler from list */
302 for (p = (Handler*)rep->head; p; )
303 {
304 Handler* next = p->next;
305
306 /* Remove handler */
307 List_Remove(&rep->head, &rep->tail, (ListElem*)p);
308
309 /* Unselect events on this socket */
310 _SetSockEvents(rep->event, p->sock, 0);
311
312 /* Notify handler of removal */
313 (*p->callback)(self, p, SELECTOR_REMOVE, 0);
314
315 p = next;
316 mike 1.1 }
317
318 return MI_RESULT_OK;
319 }
320
321 static void _ProcessCallbacks(
322 SelectorRep* rep)
323 {
324 SelectorCallbacksItem* item = 0;
325 SelectorCallbacksList * list;
326
327 /* remove all items from the list */
328 list = _GetList(rep);
329 item = (SelectorCallbacksItem*)list->head;
330 list->head = list->tail = 0;
331 list->numberOfItem = 0;
332 ResetEvent(rep->callbacksAreAvailable);
333 _SetList(rep, list);
334
335 /* process all items */
336 while (item)
337 mike 1.1 {
338 SelectorCallbacksItem* next = item->next;
339
340 (*item->callback) (item->callback_self, item->message);
341 Message_Release(item->message);
342 item = next;
343 }
344 }
345
346 MI_Result Selector_Run(
347 Selector* self,
348 MI_Uint64 timeoutUsec)
349 {
350 SelectorRep* rep = (SelectorRep*)self->rep;
351 MI_Uint64 timeoutSelectorAt = TIME_NEVER;
352 HANDLE handles[2];
353
354 handles[0] = rep->event;
355 handles[1] = rep->callbacksAreAvailable;
356
357 if ( TIME_NEVER != timeoutUsec )
358 mike 1.1 {
359 if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))
360 return MI_RESULT_FAILED;
361
362 /* calculate when to terminate selector */
363 timeoutSelectorAt += timeoutUsec;
364 }
365
366 rep->ioThreadHandle = ThreadSelf();
367
368 for (rep->keepRunning = MI_TRUE; rep->keepRunning;)
369 {
370 Handler* p;
371 DWORD result;
372 DWORD timeoutMsec;
373 MI_Result r;
374 MI_Boolean more;
375 MI_Uint64 currentTimeUsec = 0;
376 MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
377
378 if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
379 mike 1.1 return MI_RESULT_FAILED;
380
381 if (TIME_NEVER != timeoutSelectorAt)
382 {
383 if (currentTimeUsec >= timeoutSelectorAt)
384 return MI_RESULT_TIME_OUT;
385
386 breakCurrentSelectAt = timeoutSelectorAt;
387 }
388
389 /* calculate timeout */
390 for (p = (Handler*)rep->head; p; )
391 {
392 Handler* next = p->next;
393
394 /* update event mask */
395 if (p->sock != INVALID_SOCK)
396 {
397 r = _SetSockEvents(rep->event, p->sock, p->mask);
398
399 if (r != MI_RESULT_OK)
400 mike 1.1 return r;
401 }
402
403 /* find the minimum timeout form the list */
404 if (TIME_NEVER != p->fireTimeoutAt)
405 {
406 /* if expired - don't wait; notification will be issued later in next loop */
407 if (currentTimeUsec >= p->fireTimeoutAt)
408 breakCurrentSelectAt = currentTimeUsec;
409 else if ( p->fireTimeoutAt < breakCurrentSelectAt)
410 breakCurrentSelectAt = p->fireTimeoutAt;
411 }
412
413 p = next;
414 }
415
416 /* empty list - return */
417 if (!rep->head && !rep->allowEmptySelector)
418 return MI_RESULT_FAILED;
419
420 /* Wait for events on any of the sockets */
421 mike 1.1 timeoutMsec =
422 (breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */
423 INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000);
424 result = WaitForMultipleObjects(MI_COUNT(handles), handles, FALSE, timeoutMsec);
425
426 if (result == WAIT_FAILED)
427 return MI_RESULT_FAILED;
428
429 //if ((WAIT_OBJECT_0 + 1) == result) /* other thread wants to call callback */
430 _ProcessCallbacks(rep);
431
432 if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
433 return MI_RESULT_FAILED;
434
435 /* Dispatch events on each socket */
436 for (p = (Handler*)rep->head; p; )
437 {
438 Handler* next = p->next;
439 MI_Uint32 mask = 0;
440
441 /* Get event mask for this socket */
442 mike 1.1 if (p->sock != INVALID_SOCK)
443 {
444 r = _GetSockEvents(rep->event, p->sock, &mask);
445
446 if (r != MI_RESULT_OK)
447 return r;
448 }
449
450 if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
451 {
452 mask |= SELECTOR_TIMEOUT;
453 }
454
455 /* If there were any events on this socket, dispatch them */
456 if (mask)
457 {
458 /*MI_Uint32 oldMask = p->mask;*/
459 more = (*p->callback)(self, p, mask, currentTimeUsec);
460
461 /* If callback wants to continue getting events */
462 if (!more)
463 mike 1.1 {
464 /* Remove handler */
465 List_Remove(&rep->head, &rep->tail, (ListElem*)p);
466
467 /* Unselect events on this socket */
468 _SetSockEvents(rep->event, p->sock, 0);
469
470 /* Notify handler of removal */
471 (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
472 }
473 }
474
475 p = next;
476 }
477 }
478 return MI_RESULT_OK;
479 }
480
481 /* Wakes up selector's thread
482 Typical usage is to recalculate timeouts on handlers
483 when selector's Run is running in different thread */
484 mike 1.1 MI_Result Selector_Wakeup(
485 Selector* self)
486 {
487 SelectorRep* rep = (SelectorRep*)self->rep;
488
489 /* notify running thread (if different) */
490 if (rep->ioThreadHandle != ThreadSelf())
491 {
492 SetEvent(rep->callbacksAreAvailable);
493 }
494
495 return MI_RESULT_OK;
496 }
497
498 MI_Result Selector_StopRunning(
499 Selector* self)
500 {
501 SelectorRep* rep = (SelectorRep*)self->rep;
502
503 rep->keepRunning = MI_FALSE;
504
505 mike 1.1 return Selector_Wakeup(self);
506 }
507
508 /*
509 * This function guaranties that callback is called in 'Run'/'IO' thread context,
510 * so no locking is required for accessing sokcet objects, updating buffers etc
511 */
512 MI_Result Selector_CallInIOThread(
513 Selector* self,
514 Selector_NotificationCallback callback,
515 void* callback_self,
516 Message* message)
517 {
518 SelectorRep* rep = (SelectorRep*)self->rep;
519 SelectorCallbacksItem* newItem;
520 SelectorCallbacksList * list;
521 int itemsInList;
522
523 if (rep->ioThreadHandle == ThreadSelf())
524 {
525 /* direct call - we can write to socket instantly */
526 mike 1.1
527 (*callback)(callback_self, message);
528 return MI_RESULT_OK;
529 }
530
531 /* add item to the list and set event */
532 newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
533
534 if (!newItem)
535 return MI_RESULT_FAILED;
536
537 newItem->callback = callback;
538 newItem->callback_self = callback_self;
539 newItem->message = message;
540
541 Message_AddRef(message);
542 list = _GetList(rep);
543 List_Append(&list->head, &list->tail, (ListElem*)newItem);
544 list->numberOfItem++;
545 itemsInList = list->numberOfItem;
546 _SetList(rep, list);
547 mike 1.1 list = 0;
548 SetEvent(rep->callbacksAreAvailable);
549
550 while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)
551 Sleep_ms(1);
552 #if 0
553 /* wait until list is empty to avoid memory explosion */
554 for ( i = 0; itemsInList > LIST_SIZE_LIMIT && i < 500; i++ )
555 {
556 Sleep(1);
557 list = _GetList(rep);
558 itemsInList = list->numberOfItem;
559 _SetList(rep, list);
560 }
561 #endif
562
563 return MI_RESULT_OK;
564 }
565
566 #endif /* defined(CONFIG_OS_WINDOWS) */
567
568 mike 1.1 /*
569 **==============================================================================
570 **
571 **
572 ** POSIX Implementation
573 **
574 **
575 **==============================================================================
576 */
577
578 #if defined(CONFIG_POSIX)
579 # include <string.h>
580 # include <unistd.h>
581 # include <errno.h>
582 # include <sys/socket.h>
583 # include <netinet/tcp.h>
584 # include <netinet/in.h>
585 # include <sys/time.h>
586 # include <sys/types.h>
587 # include <netdb.h>
588 # include <fcntl.h>
589 mike 1.1 # include <arpa/inet.h>
590
591 typedef struct _SelectorCallbacksItem
592 {
593 Selector_NotificationCallback callback;
594 void* callback_self;
595 /* message has to be add-refed when added and dec-refed upon callback invocation */
596 Message* message;
597 }
598 SelectorCallbacksItem;
599
600 typedef struct _SelectorRep
601 {
602 /* File descriptor sets */
603 fd_set readSet;
604 fd_set writeSet;
605 fd_set exceptSet;
606
607 /* Linked list of event watchers */
608 ListElem* head;
609 ListElem* tail;
610 mike 1.1
611 /* notifications channel */
612 int notificationSockets[2];
613
614 /* Number of notification items in a queue */
615 //AtomicInt queueLength;
616
617 /* Number of alive instance */
618 AtomicInt outstandingInstances;
619
620 /* flag to stop running */
621 MI_Boolean keepRunning;
622
623 /* flag that allows empty selector running
624 when empty selector runs, it only can be interrupted by
625 internal funcitons, since it has no sockets to monitor
626 */
627 MI_Boolean allowEmptySelector;
628
629 /* io thread id */
630 ThreadHandle ioThreadHandle;
631 mike 1.1 }
632 SelectorRep;
633
634 static int _Select(
635 fd_set* readSet,
636 fd_set* writeSet,
637 fd_set* exceptSet,
638 MI_Uint64 timeoutUsec)
639 {
640 int n = FD_SETSIZE;
641 struct timeval tv;
642 struct timeval* _tv = 0;
643
644 if ((MI_Uint64)-1 != timeoutUsec)
645 {
646 tv.tv_sec = (long)(timeoutUsec / 1000000);
647 tv.tv_usec = (long)(timeoutUsec % 1000000);
648 _tv = &tv;
649 }
650
651 return select(n, readSet, writeSet, exceptSet, _tv);
652 mike 1.1 }
653
654 MI_INLINE void _FDSet(Sock sock, fd_set* set)
655 {
656 FD_SET(sock, set);
657 }
658
659 MI_INLINE void _FDClr(Sock sock, fd_set* set)
660 {
661 FD_CLR(sock, set);
662 }
663
664 MI_Result Selector_Init(
665 Selector* self)
666 {
667 self->rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));
668
669 if (!self->rep)
670 return MI_RESULT_FAILED;
671
672 if (pipe(self->rep->notificationSockets) != 0)
673 mike 1.1 return MI_RESULT_FAILED;
674
675 /* set non-blocking for reader [0] */
676 Sock_SetBlocking( self->rep->notificationSockets[0], MI_FALSE);
677
678 /* Protect notification sockets from child processes */
679 if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) ||
680 MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE))
681 {
682 LOGW_CHAR(("fcntl(F_SETFD) failed"));
683 }
684
685 return MI_RESULT_OK;
686 }
687
688 void Selector_Destroy(Selector* self)
689 {
690 SelectorRep* rep = (SelectorRep*)self->rep;
691 Handler* p;
692 Handler* next;
693
694 mike 1.1 /* Free all watchers */
695 for (p = (Handler*)rep->head; p; p = next)
696 {
697 next = (Handler*)p->next;
698
699 (*p->callback)(self, p, SELECTOR_DESTROY, 0);
700 p = next;
701 }
702
703 Sock_Close(rep->notificationSockets[0]);
704 Sock_Close(rep->notificationSockets[1]);
705
706 free(rep);
707 }
708
709 MI_Result Selector_AddHandler(
710 Selector* self,
711 Handler* handler)
712 {
713 SelectorRep* rep = (SelectorRep*)self->rep;
714 Handler* p;
715 mike 1.1 MI_Uint64 currentTimeUsec = 0;
716
717 if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
718 return MI_RESULT_FAILED;
719
720 /* Reject duplicates */
721 for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
722 {
723 if (p == handler)
724 return MI_RESULT_ALREADY_EXISTS;
725 }
726
727 /* Add new handler to list */
728 List_Append(&rep->head, &rep->tail, (ListElem*)handler);
729
730 (*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec);
731
732 return MI_RESULT_OK;
733 }
734
735 MI_Result Selector_RemoveHandler(
736 mike 1.1 Selector* self,
737 Handler* handler)
738 {
739 SelectorRep* rep = (SelectorRep*)self->rep;
740 Handler* p;
741
742 /* Find and remove handler from list */
743 for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
744 {
745 if (p == handler)
746 {
747 /* Remove handler */
748 List_Remove(&rep->head, &rep->tail, (ListElem*)p);
749
750 /* Notify handler of removal */
751 (*handler->callback)(self, p, SELECTOR_REMOVE, 0);
752
753 return MI_RESULT_OK;
754 }
755 }
756
757 mike 1.1 return MI_RESULT_NOT_FOUND;
758 }
759
760 MI_Result Selector_RemoveAllHandlers(
761 Selector* self)
762 {
763 SelectorRep* rep = (SelectorRep*)self->rep;
764 Handler* p;
765
766 /* Find and remove handler from list */
767 for (p = (Handler*)rep->head; p; )
768 {
769 Handler* next = p->next;
770
771 /* Remove handler */
772 List_Remove(&rep->head, &rep->tail, (ListElem*)p);
773
774 /* Notify handler of removal */
775 (*p->callback)(self, p, SELECTOR_REMOVE, 0);
776
777 p = next;
778 mike 1.1 }
779
780 return MI_RESULT_OK;
781 }
782
783
784
785 static void _ProcessCallbacks(
786 SelectorRep* rep)
787 {
788 SelectorCallbacksItem* item = 0;
789 size_t read = 0;
790
791 while( MI_RESULT_OK == Sock_Read( rep->notificationSockets[0], &item, sizeof(item), &read) &&
792 read == sizeof(item) )
793 {
794 //printf("pipe read: p = %p, sent = %d\n", item, (int)read);
795 if (item)
796 {
797 (*item->callback) (item->callback_self, item->message);
798 Message_Release(item->message);
799 mike 1.1 //AtomicDec( &rep->queueLength );
800 }
801 }
802 }
803
804 MI_Result Selector_Run(
805 Selector* self,
806 MI_Uint64 timeoutUsec)
807 {
808 SelectorRep* rep = (SelectorRep*)self->rep;
809 MI_Uint64 timeoutSelectorAt = TIME_NEVER;
810
811 if ( TIME_NEVER != timeoutUsec )
812 {
813 if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))
814 return MI_RESULT_FAILED;
815
816 /* calculate when to terminate selector */
817 timeoutSelectorAt += timeoutUsec;
818 }
819
820 mike 1.1 rep->ioThreadHandle = ThreadSelf();
821
822 /* Loop while detecting and dispatching events */
823 for (rep->keepRunning = MI_TRUE; rep->keepRunning;)
824 {
825 Handler* p;
826 MI_Uint64 currentTimeUsec = 0;
827 MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
828 int n;
829 MI_Uint32 count = 0;
830 MI_Boolean stressed = Selector_IsStressed(self);
831
832 if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
833 return MI_RESULT_FAILED;
834
835 if (TIME_NEVER != timeoutSelectorAt)
836 {
837 if (currentTimeUsec >= timeoutSelectorAt)
838 return MI_RESULT_TIME_OUT;
839
840 breakCurrentSelectAt = timeoutSelectorAt;
841 mike 1.1 }
842
843 /* Set up FD sets from handlers */
844 memset(&rep->readSet, 0, sizeof(rep->readSet));
845 memset(&rep->writeSet, 0, sizeof(rep->writeSet));
846 memset(&rep->exceptSet, 0, sizeof(rep->exceptSet));
847
848 for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
849 {
850 if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)
851 {
852 if (p->mask & SELECTOR_READ)
853 {
854 _FDSet(p->sock, &rep->readSet);
855 count++;
856 }
857 if (p->mask & SELECTOR_WRITE)
858 {
859 _FDSet(p->sock, &rep->writeSet);
860 count++;
861 }
862 mike 1.1 if (p->mask & SELECTOR_EXCEPTION)
863 {
864 _FDSet(p->sock, &rep->exceptSet);
865 count++;
866 }
867 }
868 /* find the minimum timeout form the list */
869 if (TIME_NEVER != p->fireTimeoutAt)
870 {
871 /* if expired - don't wait; notification will be issued later in next loop */
872 if (currentTimeUsec >= p->fireTimeoutAt)
873 breakCurrentSelectAt = currentTimeUsec;
874 else if ( p->fireTimeoutAt < breakCurrentSelectAt)
875 breakCurrentSelectAt = p->fireTimeoutAt;
876 }
877 }
878
879 _FDSet(rep->notificationSockets[0], &rep->readSet);
880
881 /* empty list - return */
882 if (!rep->head && !rep->allowEmptySelector)
883 mike 1.1 return MI_RESULT_FAILED;
884
885
886 /* Perform system select */
887 n = _Select(&rep->readSet, &rep->writeSet, &rep->exceptSet,
888 breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec);
889
890 /* ignore signals, since it canbe part of normal operation */
891 if (-1 == n && errno != EINTR)
892 return MI_RESULT_FAILED;
893
894 if (FD_ISSET(rep->notificationSockets[0], &rep->readSet))
895 _ProcessCallbacks(rep);
896
897 /* Notify handlers of outstanding events */
898 for (p = (Handler*)rep->head; p; )
899 {
900 MI_Uint32 mask = 0;
901 Handler* next = p->next;
902
903 if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)
904 mike 1.1 {
905 if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION))
906 {
907 /* Check for read event */
908 if (FD_ISSET(p->sock, &rep->readSet))
909 {
910 mask |= SELECTOR_READ;
911 _FDClr(p->sock, &rep->readSet);
912 }
913
914 /* Check for write event */
915 if (FD_ISSET(p->sock, &rep->writeSet))
916 {
917 mask |= SELECTOR_WRITE;
918 _FDClr(p->sock, &rep->writeSet);
919 }
920
921 /* Check for exception event */
922 if (FD_ISSET(p->sock, &rep->exceptSet))
923 {
924 mask |= SELECTOR_EXCEPTION;
925 mike 1.1 _FDClr(p->sock, &rep->exceptSet);
926 }
927 }
928 }
929 if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
930 {
931 mask |= SELECTOR_TIMEOUT;
932 }
933
934 /* If any events, notify watcher */
935 if (mask)
936 {
937 MI_Boolean more;
938
939 more = (*p->callback)(self, p, mask, currentTimeUsec);
940
941 if (!more)
942 {
943 /* Remove handler */
944 List_Remove(&rep->head, &rep->tail, (ListElem*)p);
945
946 mike 1.1 /* Notify handler of removal */
947 (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
948 }
949
950 }
951
952 /* Advance to next handler */
953 p = next;
954 }
955 }
956
957 return MI_RESULT_OK;
958 }
959
960 /* Wakes up selector's thread
961 Typical usage is to recalculate timeouts on handlers
962 when selector's Run is running in different thread */
963 MI_Result Selector_Wakeup(
964 Selector* self)
965 {
966 SelectorRep* rep = (SelectorRep*)self->rep;
967 mike 1.1
968 /* notify running thread */
969 {
970 void* p = 0;
971 size_t sent = 0;
972
973 Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent);
974 }
975
976 return MI_RESULT_OK;
977 }
978
979
980 MI_Result Selector_StopRunning(
981 Selector* self)
982 {
983 SelectorRep* rep = (SelectorRep*)self->rep;
984
985 rep->keepRunning = MI_FALSE;
986
987 return Selector_Wakeup(self);
988 mike 1.1 }
989
990 /*
991 * This funciton guaranties that callback is called in 'Run'/'IO' thread context,
992 * so locking is required for accessing sokcet objects, updating buffers etc
993 */
994 MI_Result Selector_CallInIOThread(
995 Selector* self,
996 Selector_NotificationCallback callback,
997 void* callback_self,
998 Message* message)
999 {
1000 SelectorRep* rep = (SelectorRep*)self->rep;
1001 SelectorCallbacksItem* newItem;
1002 MI_Result r;
1003 size_t sent = 0;
1004
1005 if (rep->ioThreadHandle == ThreadSelf())
1006 {
1007 /* direct call - we can write to socket instantly */
1008
1009 mike 1.1 (*callback)(callback_self, message);
1010 return MI_RESULT_OK;
1011 }
1012
1013 /* add item to the list and set event */
1014 newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
1015
1016 if (!newItem)
1017 return MI_RESULT_FAILED;
1018
1019 newItem->callback = callback;
1020 newItem->callback_self = callback_self;
1021 newItem->message = message;
1022
1023 //AtomicInc( &rep->queueLength );
1024
1025 while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)
1026 Sleep_ms(1);
1027
1028 #if 0
1029 {
1030 mike 1.1 int counter = 0;
1031
1032 while (rep->queueLength > LIST_SIZE_LIMIT )
1033 {
1034 counter++;
1035 /* give system a chance to clear backlog */
1036 Sleep_ms(1);
1037
1038 if (counter > 1000)
1039 {
1040 LOGW((MI_T("cannot send message: queue overflow)\n") ));
1041 return MI_RESULT_FAILED;
1042 }
1043 }
1044 }
1045 #endif
1046
1047 Message_AddRef(message);
1048 r = Sock_Write( rep->notificationSockets[1], &newItem, sizeof(newItem), &sent);
1049
1050
1051 mike 1.1 //printf("pipe write: p = %p, sent = %d\n", newItem, (int)sent);
1052 if ( MI_RESULT_OK != r )
1053 Message_Release(message);
1054
1055 return r;
1056 }
1057
1058 #endif /* defined(CONFIG_POSIX) */
1059
1060 /************************************/
1061 /* generic functionality */
1062
1063 MI_Result Selector_ContainsHandler(
1064 Selector* self,
1065 Handler* handler)
1066 {
1067 SelectorRep* rep = (SelectorRep*)self->rep;
1068 Handler* p;
1069
1070 for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
1071 {
1072 mike 1.1 if (p == handler)
1073 {
1074 return MI_RESULT_OK;
1075 }
1076 }
1077
1078 return MI_RESULT_NOT_FOUND;
1079 }
1080
1081 static void _dtor(Message* message, void* callbackData)
1082 {
1083 SelectorRep* rep = (SelectorRep*)callbackData;
1084
1085 MI_UNUSED(message);
1086
1087 AtomicDec( &rep->outstandingInstances );
1088 }
1089
1090 void Selector_NewInstanceCreated(
1091 Selector* self,
1092 Message* msg)
1093 mike 1.1 {
1094 SelectorRep* rep = (SelectorRep*)self->rep;
1095
1096 msg->dtor = _dtor;
1097 msg->dtorData = rep;
1098
1099 AtomicInc( &rep->outstandingInstances );
1100 }
1101
1102 MI_Boolean Selector_IsStressed(
1103 Selector* self)
1104 {
1105 SelectorRep* rep = (SelectorRep*)self->rep;
1106
1107 /* Are we close to be stressed? */
1108 if (rep->outstandingInstances >= (MAX_ALLOCATED_INSTANCES * 9 / 10))
1109 return MI_TRUE;
1110
1111 return MI_FALSE;
1112 }
1113
1114 mike 1.1 void Selector_SetAllowEmptyFlag(
1115 Selector* self,
1116 MI_Boolean allowEmptySelector)
1117 {
1118 SelectorRep* rep = (SelectorRep*)self->rep;
1119
1120 rep->allowEmptySelector = allowEmptySelector;
1121 }
1122
|