/* **============================================================================== ** ** Open Management Infrastructure (OMI) ** ** Copyright (c) Microsoft Corporation ** ** Licensed under the Apache License, Version 2.0 (the "License"); you may not ** use this file except in compliance with the License. You may obtain a copy ** of the License at ** ** http://www.apache.org/licenses/LICENSE-2.0 ** ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABLITY OR NON-INFRINGEMENT. ** ** See the Apache 2 License for the specific language governing permissions ** and limitations under the License. ** **============================================================================== */ #include "selector.h" #include #include "thread.h" #include #include #include /* maximum number of items in the list */ #define LIST_SIZE_LIMIT 321 /* maximum instances that can be allocated */ #define MAX_ALLOCATED_INSTANCES 500 /* **============================================================================== ** ** ** Windows Implementation ** ** **============================================================================== */ #if defined(CONFIG_OS_WINDOWS) # include /* Notification internal datastructure */ typedef struct _SelectorCallbacksItem { /* Links for inserting messages onto linked-lists */ struct _SelectorCallbacksItem* next; struct _SelectorCallbacksItem* prev; Selector_NotificationCallback callback; void* callback_self; /* message has to be add-refed when added and dec-refed upon callback invocation */ Message* message; } SelectorCallbacksItem; typedef struct _SelectorCallbacksList { /* Linked list of callbacks to call */ ListElem* head; ListElem* tail; int numberOfItem; /**/ } SelectorCallbacksList; typedef struct _SelectorRep { /* Linked list of event watchers */ ListElem* head; ListElem* tail; /* Object for detecting socket events */ WSAEVENT event; /* other thread notification */ HANDLE callbacksAreAvailable; /* list object - never referred directly - only using a pointer */ SelectorCallbacksList __callback_list_object; /* list of callbacks to notify ; storing pointer here, so we can use interlock funciotns for list updating */ SelectorCallbacksList* callbacksList; /* Number of alive instance */ AtomicInt outstandingInstances; /* io thread id */ ThreadHandle ioThreadHandle; /* flag to stop running */ MI_Boolean keepRunning; /* flag that allows empty selector running when empty selector runs, it only can be interrupted by internal funcitons, since it has no sockets to monitor */ MI_Boolean allowEmptySelector; } SelectorRep; static SelectorCallbacksList* _GetList(SelectorRep* rep) { SelectorCallbacksList * list; list = InterlockedExchangePointer( &rep->callbacksList, 0 ); while ( !list ) { Sleep(0); list = InterlockedExchangePointer( &rep->callbacksList, 0 ); } return list; } static void _SetList(SelectorRep* rep, SelectorCallbacksList * list) { InterlockedExchangePointer( &rep->callbacksList, list ); } static MI_Result _SetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32 mask) { long e = 0; if (mask & SELECTOR_READ) e |= FD_ACCEPT | FD_READ | FD_CLOSE; if (mask & SELECTOR_WRITE) e |= FD_WRITE | FD_CONNECT; if (mask & SELECTOR_EXCEPTION) e |= FD_OOB; if (WSAEventSelect(sock, event, e) == SOCKET_ERROR) return MI_RESULT_FAILED; return MI_RESULT_OK; } static MI_Result _GetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32* mask) { WSANETWORKEVENTS networkEvents; long x; ZeroMemory(&networkEvents, sizeof(networkEvents)); if (WSAEnumNetworkEvents(sock, event, &networkEvents) != 0) return MI_RESULT_FAILED; x = networkEvents.lNetworkEvents; *mask = 0; if (x & FD_ACCEPT || x & FD_READ || x & FD_CLOSE || x & FD_CONNECT) *mask |= SELECTOR_READ; if (x & FD_WRITE) *mask |= SELECTOR_WRITE; if (x & FD_OOB) *mask |= SELECTOR_EXCEPTION; return MI_RESULT_OK; } MI_Result Selector_Init( Selector* self) { SelectorRep* rep = (SelectorRep*)calloc(1, sizeof(SelectorRep)); if (!rep) return MI_RESULT_FAILED; rep->event = WSACreateEvent(); if (rep->event == WSA_INVALID_EVENT) { free(rep); return MI_RESULT_FAILED; } rep->callbacksList = &rep->__callback_list_object; rep->callbacksAreAvailable = CreateEvent( 0, TRUE, FALSE, NULL ); if (NULL == rep->callbacksAreAvailable) { CloseHandle(rep->event); free(rep); return MI_RESULT_FAILED; } self->rep = rep; return MI_RESULT_OK; } void Selector_Destroy(Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; Handler* next; /* Free all watchers */ for (p = (Handler*)rep->head; p; p = next) { next = (Handler*)p->next; /* Unselect events on this socket */ _SetSockEvents(rep->event, p->sock, 0); /* Invoke user callback */ (*p->callback)(self, p, SELECTOR_DESTROY, 0); p = next; } CloseHandle(rep->event); CloseHandle(rep->callbacksAreAvailable); /* free messages in callbacks list */ while (rep->callbacksList->head) { SelectorCallbacksItem* item = (SelectorCallbacksItem*)rep->callbacksList->head; List_Remove(&rep->callbacksList->head, &rep->callbacksList->tail, (ListElem*)item); Message_Release(item->message); } free(rep); } MI_Result Selector_AddHandler( Selector* self, Handler* handler) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; MI_Uint64 currentTimeUsec = 0; if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) return MI_RESULT_FAILED; /* Reject duplicates */ for (p = (Handler*)rep->head; p; p = (Handler*)p->next) { if (p == handler) return MI_RESULT_ALREADY_EXISTS; } /* Add new handler to list */ List_Append(&rep->head, &rep->tail, (ListElem*)handler); (*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec); return MI_RESULT_OK; } MI_Result Selector_RemoveHandler( Selector* self, Handler* handler) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; /* Find and remove handler from list */ for (p = (Handler*)rep->head; p; p = (Handler*)p->next) { if (p == handler) { /* Remove handler */ List_Remove(&rep->head, &rep->tail, (ListElem*)p); /* Unselect events on this socket */ _SetSockEvents(rep->event, p->sock, 0); /* Notify handler of removal */ (*handler->callback)(self, p, SELECTOR_REMOVE, 0); return MI_RESULT_OK; } } return MI_RESULT_NOT_FOUND; } MI_Result Selector_RemoveAllHandlers( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; /* Find and remove handler from list */ for (p = (Handler*)rep->head; p; ) { Handler* next = p->next; /* Remove handler */ List_Remove(&rep->head, &rep->tail, (ListElem*)p); /* Unselect events on this socket */ _SetSockEvents(rep->event, p->sock, 0); /* Notify handler of removal */ (*p->callback)(self, p, SELECTOR_REMOVE, 0); p = next; } return MI_RESULT_OK; } static void _ProcessCallbacks( SelectorRep* rep) { SelectorCallbacksItem* item = 0; SelectorCallbacksList * list; /* remove all items from the list */ list = _GetList(rep); item = (SelectorCallbacksItem*)list->head; list->head = list->tail = 0; list->numberOfItem = 0; ResetEvent(rep->callbacksAreAvailable); _SetList(rep, list); /* process all items */ while (item) { SelectorCallbacksItem* next = item->next; (*item->callback) (item->callback_self, item->message); Message_Release(item->message); item = next; } } MI_Result Selector_Run( Selector* self, MI_Uint64 timeoutUsec) { SelectorRep* rep = (SelectorRep*)self->rep; MI_Uint64 timeoutSelectorAt = TIME_NEVER; HANDLE handles[2]; handles[0] = rep->event; handles[1] = rep->callbacksAreAvailable; if ( TIME_NEVER != timeoutUsec ) { if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt)) return MI_RESULT_FAILED; /* calculate when to terminate selector */ timeoutSelectorAt += timeoutUsec; } rep->ioThreadHandle = ThreadSelf(); for (rep->keepRunning = MI_TRUE; rep->keepRunning;) { Handler* p; DWORD result; DWORD timeoutMsec; MI_Result r; MI_Boolean more; MI_Uint64 currentTimeUsec = 0; MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1; if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) return MI_RESULT_FAILED; if (TIME_NEVER != timeoutSelectorAt) { if (currentTimeUsec >= timeoutSelectorAt) return MI_RESULT_TIME_OUT; breakCurrentSelectAt = timeoutSelectorAt; } /* calculate timeout */ for (p = (Handler*)rep->head; p; ) { Handler* next = p->next; /* update event mask */ if (p->sock != INVALID_SOCK) { r = _SetSockEvents(rep->event, p->sock, p->mask); if (r != MI_RESULT_OK) return r; } /* find the minimum timeout form the list */ if (TIME_NEVER != p->fireTimeoutAt) { /* if expired - don't wait; notification will be issued later in next loop */ if (currentTimeUsec >= p->fireTimeoutAt) breakCurrentSelectAt = currentTimeUsec; else if ( p->fireTimeoutAt < breakCurrentSelectAt) breakCurrentSelectAt = p->fireTimeoutAt; } p = next; } /* empty list - return */ if (!rep->head && !rep->allowEmptySelector) return MI_RESULT_FAILED; /* Wait for events on any of the sockets */ timeoutMsec = (breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */ INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000); result = WaitForMultipleObjects(MI_COUNT(handles), handles, FALSE, timeoutMsec); if (result == WAIT_FAILED) return MI_RESULT_FAILED; //if ((WAIT_OBJECT_0 + 1) == result) /* other thread wants to call callback */ _ProcessCallbacks(rep); if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) return MI_RESULT_FAILED; /* Dispatch events on each socket */ for (p = (Handler*)rep->head; p; ) { Handler* next = p->next; MI_Uint32 mask = 0; /* Get event mask for this socket */ if (p->sock != INVALID_SOCK) { r = _GetSockEvents(rep->event, p->sock, &mask); if (r != MI_RESULT_OK) return r; } if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt) { mask |= SELECTOR_TIMEOUT; } /* If there were any events on this socket, dispatch them */ if (mask) { /*MI_Uint32 oldMask = p->mask;*/ more = (*p->callback)(self, p, mask, currentTimeUsec); /* If callback wants to continue getting events */ if (!more) { /* Remove handler */ List_Remove(&rep->head, &rep->tail, (ListElem*)p); /* Unselect events on this socket */ _SetSockEvents(rep->event, p->sock, 0); /* Notify handler of removal */ (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec); } } p = next; } } return MI_RESULT_OK; } /* Wakes up selector's thread Typical usage is to recalculate timeouts on handlers when selector's Run is running in different thread */ MI_Result Selector_Wakeup( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; /* notify running thread (if different) */ if (rep->ioThreadHandle != ThreadSelf()) { SetEvent(rep->callbacksAreAvailable); } return MI_RESULT_OK; } MI_Result Selector_StopRunning( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; rep->keepRunning = MI_FALSE; return Selector_Wakeup(self); } /* * This function guaranties that callback is called in 'Run'/'IO' thread context, * so no locking is required for accessing sokcet objects, updating buffers etc */ MI_Result Selector_CallInIOThread( Selector* self, Selector_NotificationCallback callback, void* callback_self, Message* message) { SelectorRep* rep = (SelectorRep*)self->rep; SelectorCallbacksItem* newItem; SelectorCallbacksList * list; int itemsInList; if (rep->ioThreadHandle == ThreadSelf()) { /* direct call - we can write to socket instantly */ (*callback)(callback_self, message); return MI_RESULT_OK; } /* add item to the list and set event */ newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem)); if (!newItem) return MI_RESULT_FAILED; newItem->callback = callback; newItem->callback_self = callback_self; newItem->message = message; Message_AddRef(message); list = _GetList(rep); List_Append(&list->head, &list->tail, (ListElem*)newItem); list->numberOfItem++; itemsInList = list->numberOfItem; _SetList(rep, list); list = 0; SetEvent(rep->callbacksAreAvailable); while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES) Sleep_ms(1); #if 0 /* wait until list is empty to avoid memory explosion */ for ( i = 0; itemsInList > LIST_SIZE_LIMIT && i < 500; i++ ) { Sleep(1); list = _GetList(rep); itemsInList = list->numberOfItem; _SetList(rep, list); } #endif return MI_RESULT_OK; } #endif /* defined(CONFIG_OS_WINDOWS) */ /* **============================================================================== ** ** ** POSIX Implementation ** ** **============================================================================== */ #if defined(CONFIG_POSIX) # include # include # include # include # include # include # include # include # include # include # include typedef struct _SelectorCallbacksItem { Selector_NotificationCallback callback; void* callback_self; /* message has to be add-refed when added and dec-refed upon callback invocation */ Message* message; } SelectorCallbacksItem; typedef struct _SelectorRep { /* File descriptor sets */ fd_set readSet; fd_set writeSet; fd_set exceptSet; /* Linked list of event watchers */ ListElem* head; ListElem* tail; /* notifications channel */ int notificationSockets[2]; /* Number of notification items in a queue */ //AtomicInt queueLength; /* Number of alive instance */ AtomicInt outstandingInstances; /* flag to stop running */ MI_Boolean keepRunning; /* flag that allows empty selector running when empty selector runs, it only can be interrupted by internal funcitons, since it has no sockets to monitor */ MI_Boolean allowEmptySelector; /* io thread id */ ThreadHandle ioThreadHandle; } SelectorRep; static int _Select( fd_set* readSet, fd_set* writeSet, fd_set* exceptSet, MI_Uint64 timeoutUsec) { int n = FD_SETSIZE; struct timeval tv; struct timeval* _tv = 0; if ((MI_Uint64)-1 != timeoutUsec) { tv.tv_sec = (long)(timeoutUsec / 1000000); tv.tv_usec = (long)(timeoutUsec % 1000000); _tv = &tv; } return select(n, readSet, writeSet, exceptSet, _tv); } MI_INLINE void _FDSet(Sock sock, fd_set* set) { FD_SET(sock, set); } MI_INLINE void _FDClr(Sock sock, fd_set* set) { FD_CLR(sock, set); } MI_Result Selector_Init( Selector* self) { self->rep = (SelectorRep*)calloc(1, sizeof(SelectorRep)); if (!self->rep) return MI_RESULT_FAILED; if (pipe(self->rep->notificationSockets) != 0) return MI_RESULT_FAILED; /* set non-blocking for reader [0] */ Sock_SetBlocking( self->rep->notificationSockets[0], MI_FALSE); /* Protect notification sockets from child processes */ if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) || MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE)) { LOGW_CHAR(("fcntl(F_SETFD) failed")); } return MI_RESULT_OK; } void Selector_Destroy(Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; Handler* next; /* Free all watchers */ for (p = (Handler*)rep->head; p; p = next) { next = (Handler*)p->next; (*p->callback)(self, p, SELECTOR_DESTROY, 0); p = next; } Sock_Close(rep->notificationSockets[0]); Sock_Close(rep->notificationSockets[1]); free(rep); } MI_Result Selector_AddHandler( Selector* self, Handler* handler) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; MI_Uint64 currentTimeUsec = 0; if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) return MI_RESULT_FAILED; /* Reject duplicates */ for (p = (Handler*)rep->head; p; p = (Handler*)p->next) { if (p == handler) return MI_RESULT_ALREADY_EXISTS; } /* Add new handler to list */ List_Append(&rep->head, &rep->tail, (ListElem*)handler); (*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec); return MI_RESULT_OK; } MI_Result Selector_RemoveHandler( Selector* self, Handler* handler) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; /* Find and remove handler from list */ for (p = (Handler*)rep->head; p; p = (Handler*)p->next) { if (p == handler) { /* Remove handler */ List_Remove(&rep->head, &rep->tail, (ListElem*)p); /* Notify handler of removal */ (*handler->callback)(self, p, SELECTOR_REMOVE, 0); return MI_RESULT_OK; } } return MI_RESULT_NOT_FOUND; } MI_Result Selector_RemoveAllHandlers( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; /* Find and remove handler from list */ for (p = (Handler*)rep->head; p; ) { Handler* next = p->next; /* Remove handler */ List_Remove(&rep->head, &rep->tail, (ListElem*)p); /* Notify handler of removal */ (*p->callback)(self, p, SELECTOR_REMOVE, 0); p = next; } return MI_RESULT_OK; } static void _ProcessCallbacks( SelectorRep* rep) { SelectorCallbacksItem* item = 0; size_t read = 0; while( MI_RESULT_OK == Sock_Read( rep->notificationSockets[0], &item, sizeof(item), &read) && read == sizeof(item) ) { //printf("pipe read: p = %p, sent = %d\n", item, (int)read); if (item) { (*item->callback) (item->callback_self, item->message); Message_Release(item->message); //AtomicDec( &rep->queueLength ); } } } MI_Result Selector_Run( Selector* self, MI_Uint64 timeoutUsec) { SelectorRep* rep = (SelectorRep*)self->rep; MI_Uint64 timeoutSelectorAt = TIME_NEVER; if ( TIME_NEVER != timeoutUsec ) { if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt)) return MI_RESULT_FAILED; /* calculate when to terminate selector */ timeoutSelectorAt += timeoutUsec; } rep->ioThreadHandle = ThreadSelf(); /* Loop while detecting and dispatching events */ for (rep->keepRunning = MI_TRUE; rep->keepRunning;) { Handler* p; MI_Uint64 currentTimeUsec = 0; MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1; int n; MI_Uint32 count = 0; MI_Boolean stressed = Selector_IsStressed(self); if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) return MI_RESULT_FAILED; if (TIME_NEVER != timeoutSelectorAt) { if (currentTimeUsec >= timeoutSelectorAt) return MI_RESULT_TIME_OUT; breakCurrentSelectAt = timeoutSelectorAt; } /* Set up FD sets from handlers */ memset(&rep->readSet, 0, sizeof(rep->readSet)); memset(&rep->writeSet, 0, sizeof(rep->writeSet)); memset(&rep->exceptSet, 0, sizeof(rep->exceptSet)); for (p = (Handler*)rep->head; p; p = (Handler*)p->next) { if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0) { if (p->mask & SELECTOR_READ) { _FDSet(p->sock, &rep->readSet); count++; } if (p->mask & SELECTOR_WRITE) { _FDSet(p->sock, &rep->writeSet); count++; } if (p->mask & SELECTOR_EXCEPTION) { _FDSet(p->sock, &rep->exceptSet); count++; } } /* find the minimum timeout form the list */ if (TIME_NEVER != p->fireTimeoutAt) { /* if expired - don't wait; notification will be issued later in next loop */ if (currentTimeUsec >= p->fireTimeoutAt) breakCurrentSelectAt = currentTimeUsec; else if ( p->fireTimeoutAt < breakCurrentSelectAt) breakCurrentSelectAt = p->fireTimeoutAt; } } _FDSet(rep->notificationSockets[0], &rep->readSet); /* empty list - return */ if (!rep->head && !rep->allowEmptySelector) return MI_RESULT_FAILED; /* Perform system select */ n = _Select(&rep->readSet, &rep->writeSet, &rep->exceptSet, breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec); /* ignore signals, since it canbe part of normal operation */ if (-1 == n && errno != EINTR) return MI_RESULT_FAILED; if (FD_ISSET(rep->notificationSockets[0], &rep->readSet)) _ProcessCallbacks(rep); /* Notify handlers of outstanding events */ for (p = (Handler*)rep->head; p; ) { MI_Uint32 mask = 0; Handler* next = p->next; if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0) { if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION)) { /* Check for read event */ if (FD_ISSET(p->sock, &rep->readSet)) { mask |= SELECTOR_READ; _FDClr(p->sock, &rep->readSet); } /* Check for write event */ if (FD_ISSET(p->sock, &rep->writeSet)) { mask |= SELECTOR_WRITE; _FDClr(p->sock, &rep->writeSet); } /* Check for exception event */ if (FD_ISSET(p->sock, &rep->exceptSet)) { mask |= SELECTOR_EXCEPTION; _FDClr(p->sock, &rep->exceptSet); } } } if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt) { mask |= SELECTOR_TIMEOUT; } /* If any events, notify watcher */ if (mask) { MI_Boolean more; more = (*p->callback)(self, p, mask, currentTimeUsec); if (!more) { /* Remove handler */ List_Remove(&rep->head, &rep->tail, (ListElem*)p); /* Notify handler of removal */ (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec); } } /* Advance to next handler */ p = next; } } return MI_RESULT_OK; } /* Wakes up selector's thread Typical usage is to recalculate timeouts on handlers when selector's Run is running in different thread */ MI_Result Selector_Wakeup( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; /* notify running thread */ { void* p = 0; size_t sent = 0; Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent); } return MI_RESULT_OK; } MI_Result Selector_StopRunning( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; rep->keepRunning = MI_FALSE; return Selector_Wakeup(self); } /* * This funciton guaranties that callback is called in 'Run'/'IO' thread context, * so locking is required for accessing sokcet objects, updating buffers etc */ MI_Result Selector_CallInIOThread( Selector* self, Selector_NotificationCallback callback, void* callback_self, Message* message) { SelectorRep* rep = (SelectorRep*)self->rep; SelectorCallbacksItem* newItem; MI_Result r; size_t sent = 0; if (rep->ioThreadHandle == ThreadSelf()) { /* direct call - we can write to socket instantly */ (*callback)(callback_self, message); return MI_RESULT_OK; } /* add item to the list and set event */ newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem)); if (!newItem) return MI_RESULT_FAILED; newItem->callback = callback; newItem->callback_self = callback_self; newItem->message = message; //AtomicInc( &rep->queueLength ); while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES) Sleep_ms(1); #if 0 { int counter = 0; while (rep->queueLength > LIST_SIZE_LIMIT ) { counter++; /* give system a chance to clear backlog */ Sleep_ms(1); if (counter > 1000) { LOGW((MI_T("cannot send message: queue overflow)\n") )); return MI_RESULT_FAILED; } } } #endif Message_AddRef(message); r = Sock_Write( rep->notificationSockets[1], &newItem, sizeof(newItem), &sent); //printf("pipe write: p = %p, sent = %d\n", newItem, (int)sent); if ( MI_RESULT_OK != r ) Message_Release(message); return r; } #endif /* defined(CONFIG_POSIX) */ /************************************/ /* generic functionality */ MI_Result Selector_ContainsHandler( Selector* self, Handler* handler) { SelectorRep* rep = (SelectorRep*)self->rep; Handler* p; for (p = (Handler*)rep->head; p; p = (Handler*)p->next) { if (p == handler) { return MI_RESULT_OK; } } return MI_RESULT_NOT_FOUND; } static void _dtor(Message* message, void* callbackData) { SelectorRep* rep = (SelectorRep*)callbackData; MI_UNUSED(message); AtomicDec( &rep->outstandingInstances ); } void Selector_NewInstanceCreated( Selector* self, Message* msg) { SelectorRep* rep = (SelectorRep*)self->rep; msg->dtor = _dtor; msg->dtorData = rep; AtomicInc( &rep->outstandingInstances ); } MI_Boolean Selector_IsStressed( Selector* self) { SelectorRep* rep = (SelectorRep*)self->rep; /* Are we close to be stressed? */ if (rep->outstandingInstances >= (MAX_ALLOCATED_INSTANCES * 9 / 10)) return MI_TRUE; return MI_FALSE; } void Selector_SetAllowEmptyFlag( Selector* self, MI_Boolean allowEmptySelector) { SelectorRep* rep = (SelectorRep*)self->rep; rep->allowEmptySelector = allowEmptySelector; }