/*
**==============================================================================
**
** Open Management Infrastructure (OMI)
**
** Copyright (c) Microsoft Corporation
**
** Licensed under the Apache License, Version 2.0 (the "License"); you may not
** use this file except in compliance with the License. You may obtain a copy
** of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
** MERCHANTABLITY OR NON-INFRINGEMENT.
**
** See the Apache 2 License for the specific language governing permissions
** and limitations under the License.
**
**==============================================================================
*/
#include "selector.h"
#include <base/time.h>
#include "thread.h"
#include <base/log.h>
#include <base/result.h>
#include <base/atomic.h>
/* maximum number of items in the list */
#define LIST_SIZE_LIMIT 321
/* maximum instances that can be allocated */
#define MAX_ALLOCATED_INSTANCES 500
/*
**==============================================================================
**
**
** Windows Implementation
**
**
**==============================================================================
*/
#if defined(CONFIG_OS_WINDOWS)
# include <winsock2.h>
/* Notification internal datastructure */
typedef struct _SelectorCallbacksItem
{
/* Links for inserting messages onto linked-lists */
struct _SelectorCallbacksItem* next;
struct _SelectorCallbacksItem* prev;
Selector_NotificationCallback callback;
void* callback_self;
/* message has to be add-refed when added and dec-refed upon callback invocation */
Message* message;
}
SelectorCallbacksItem;
typedef struct _SelectorCallbacksList
{
/* Linked list of callbacks to call */
ListElem* head;
ListElem* tail;
int numberOfItem; /**/
}
SelectorCallbacksList;
typedef struct _SelectorRep
{
/* Linked list of event watchers */
ListElem* head;
ListElem* tail;
/* Object for detecting socket events */
WSAEVENT event;
/* other thread notification */
HANDLE callbacksAreAvailable;
/* list object - never referred directly - only using a pointer */
SelectorCallbacksList __callback_list_object;
/* list of callbacks to notify ;
storing pointer here, so we can use interlock funciotns for list updating
*/
SelectorCallbacksList* callbacksList;
/* Number of alive instance */
AtomicInt outstandingInstances;
/* io thread id */
ThreadHandle ioThreadHandle;
/* flag to stop running */
MI_Boolean keepRunning;
/* flag that allows empty selector running
when empty selector runs, it only can be interrupted by
internal funcitons, since it has no sockets to monitor
*/
MI_Boolean allowEmptySelector;
}
SelectorRep;
static SelectorCallbacksList* _GetList(SelectorRep* rep)
{
SelectorCallbacksList * list;
list = InterlockedExchangePointer( &rep->callbacksList, 0 );
while ( !list )
{
Sleep(0);
list = InterlockedExchangePointer( &rep->callbacksList, 0 );
}
return list;
}
static void _SetList(SelectorRep* rep, SelectorCallbacksList * list)
{
InterlockedExchangePointer( &rep->callbacksList, list );
}
static MI_Result _SetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32 mask)
{
long e = 0;
if (mask & SELECTOR_READ)
e |= FD_ACCEPT | FD_READ | FD_CLOSE;
if (mask & SELECTOR_WRITE)
e |= FD_WRITE | FD_CONNECT;
if (mask & SELECTOR_EXCEPTION)
e |= FD_OOB;
if (WSAEventSelect(sock, event, e) == SOCKET_ERROR)
return MI_RESULT_FAILED;
return MI_RESULT_OK;
}
static MI_Result _GetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32* mask)
{
WSANETWORKEVENTS networkEvents;
long x;
ZeroMemory(&networkEvents, sizeof(networkEvents));
if (WSAEnumNetworkEvents(sock, event, &networkEvents) != 0)
return MI_RESULT_FAILED;
x = networkEvents.lNetworkEvents;
*mask = 0;
if (x & FD_ACCEPT || x & FD_READ || x & FD_CLOSE || x & FD_CONNECT)
*mask |= SELECTOR_READ;
if (x & FD_WRITE)
*mask |= SELECTOR_WRITE;
if (x & FD_OOB)
*mask |= SELECTOR_EXCEPTION;
return MI_RESULT_OK;
}
MI_Result Selector_Init(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));
if (!rep)
return MI_RESULT_FAILED;
rep->event = WSACreateEvent();
if (rep->event == WSA_INVALID_EVENT)
{
free(rep);
return MI_RESULT_FAILED;
}
rep->callbacksList = &rep->__callback_list_object;
rep->callbacksAreAvailable = CreateEvent( 0, TRUE, FALSE, NULL );
if (NULL == rep->callbacksAreAvailable)
{
CloseHandle(rep->event);
free(rep);
return MI_RESULT_FAILED;
}
self->rep = rep;
return MI_RESULT_OK;
}
void Selector_Destroy(Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
Handler* next;
/* Free all watchers */
for (p = (Handler*)rep->head; p; p = next)
{
next = (Handler*)p->next;
/* Unselect events on this socket */
_SetSockEvents(rep->event, p->sock, 0);
/* Invoke user callback */
(*p->callback)(self, p, SELECTOR_DESTROY, 0);
p = next;
}
CloseHandle(rep->event);
CloseHandle(rep->callbacksAreAvailable);
/* free messages in callbacks list */
while (rep->callbacksList->head)
{
SelectorCallbacksItem* item = (SelectorCallbacksItem*)rep->callbacksList->head;
List_Remove(&rep->callbacksList->head, &rep->callbacksList->tail, (ListElem*)item);
Message_Release(item->message);
}
free(rep);
}
MI_Result Selector_AddHandler(
Selector* self,
Handler* handler)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
MI_Uint64 currentTimeUsec = 0;
if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
return MI_RESULT_FAILED;
/* Reject duplicates */
for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
{
if (p == handler)
return MI_RESULT_ALREADY_EXISTS;
}
/* Add new handler to list */
List_Append(&rep->head, &rep->tail, (ListElem*)handler);
(*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec);
return MI_RESULT_OK;
}
MI_Result Selector_RemoveHandler(
Selector* self,
Handler* handler)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
/* Find and remove handler from list */
for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
{
if (p == handler)
{
/* Remove handler */
List_Remove(&rep->head, &rep->tail, (ListElem*)p);
/* Unselect events on this socket */
_SetSockEvents(rep->event, p->sock, 0);
/* Notify handler of removal */
(*handler->callback)(self, p, SELECTOR_REMOVE, 0);
return MI_RESULT_OK;
}
}
return MI_RESULT_NOT_FOUND;
}
MI_Result Selector_RemoveAllHandlers(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
/* Find and remove handler from list */
for (p = (Handler*)rep->head; p; )
{
Handler* next = p->next;
/* Remove handler */
List_Remove(&rep->head, &rep->tail, (ListElem*)p);
/* Unselect events on this socket */
_SetSockEvents(rep->event, p->sock, 0);
/* Notify handler of removal */
(*p->callback)(self, p, SELECTOR_REMOVE, 0);
p = next;
}
return MI_RESULT_OK;
}
static void _ProcessCallbacks(
SelectorRep* rep)
{
SelectorCallbacksItem* item = 0;
SelectorCallbacksList * list;
/* remove all items from the list */
list = _GetList(rep);
item = (SelectorCallbacksItem*)list->head;
list->head = list->tail = 0;
list->numberOfItem = 0;
ResetEvent(rep->callbacksAreAvailable);
_SetList(rep, list);
/* process all items */
while (item)
{
SelectorCallbacksItem* next = item->next;
(*item->callback) (item->callback_self, item->message);
Message_Release(item->message);
item = next;
}
}
MI_Result Selector_Run(
Selector* self,
MI_Uint64 timeoutUsec)
{
SelectorRep* rep = (SelectorRep*)self->rep;
MI_Uint64 timeoutSelectorAt = TIME_NEVER;
HANDLE handles[2];
handles[0] = rep->event;
handles[1] = rep->callbacksAreAvailable;
if ( TIME_NEVER != timeoutUsec )
{
if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))
return MI_RESULT_FAILED;
/* calculate when to terminate selector */
timeoutSelectorAt += timeoutUsec;
}
rep->ioThreadHandle = ThreadSelf();
for (rep->keepRunning = MI_TRUE; rep->keepRunning;)
{
Handler* p;
DWORD result;
DWORD timeoutMsec;
MI_Result r;
MI_Boolean more;
MI_Uint64 currentTimeUsec = 0;
MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
return MI_RESULT_FAILED;
if (TIME_NEVER != timeoutSelectorAt)
{
if (currentTimeUsec >= timeoutSelectorAt)
return MI_RESULT_TIME_OUT;
breakCurrentSelectAt = timeoutSelectorAt;
}
/* calculate timeout */
for (p = (Handler*)rep->head; p; )
{
Handler* next = p->next;
/* update event mask */
if (p->sock != INVALID_SOCK)
{
r = _SetSockEvents(rep->event, p->sock, p->mask);
if (r != MI_RESULT_OK)
return r;
}
/* find the minimum timeout form the list */
if (TIME_NEVER != p->fireTimeoutAt)
{
/* if expired - don't wait; notification will be issued later in next loop */
if (currentTimeUsec >= p->fireTimeoutAt)
breakCurrentSelectAt = currentTimeUsec;
else if ( p->fireTimeoutAt < breakCurrentSelectAt)
breakCurrentSelectAt = p->fireTimeoutAt;
}
p = next;
}
/* empty list - return */
if (!rep->head && !rep->allowEmptySelector)
return MI_RESULT_FAILED;
/* Wait for events on any of the sockets */
timeoutMsec =
(breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */
INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000);
result = WaitForMultipleObjects(MI_COUNT(handles), handles, FALSE, timeoutMsec);
if (result == WAIT_FAILED)
return MI_RESULT_FAILED;
//if ((WAIT_OBJECT_0 + 1) == result) /* other thread wants to call callback */
_ProcessCallbacks(rep);
if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
return MI_RESULT_FAILED;
/* Dispatch events on each socket */
for (p = (Handler*)rep->head; p; )
{
Handler* next = p->next;
MI_Uint32 mask = 0;
/* Get event mask for this socket */
if (p->sock != INVALID_SOCK)
{
r = _GetSockEvents(rep->event, p->sock, &mask);
if (r != MI_RESULT_OK)
return r;
}
if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
{
mask |= SELECTOR_TIMEOUT;
}
/* If there were any events on this socket, dispatch them */
if (mask)
{
/*MI_Uint32 oldMask = p->mask;*/
more = (*p->callback)(self, p, mask, currentTimeUsec);
/* If callback wants to continue getting events */
if (!more)
{
/* Remove handler */
List_Remove(&rep->head, &rep->tail, (ListElem*)p);
/* Unselect events on this socket */
_SetSockEvents(rep->event, p->sock, 0);
/* Notify handler of removal */
(*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
}
}
p = next;
}
}
return MI_RESULT_OK;
}
/* Wakes up selector's thread
Typical usage is to recalculate timeouts on handlers
when selector's Run is running in different thread */
MI_Result Selector_Wakeup(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
/* notify running thread (if different) */
if (rep->ioThreadHandle != ThreadSelf())
{
SetEvent(rep->callbacksAreAvailable);
}
return MI_RESULT_OK;
}
MI_Result Selector_StopRunning(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
rep->keepRunning = MI_FALSE;
return Selector_Wakeup(self);
}
/*
* This function guaranties that callback is called in 'Run'/'IO' thread context,
* so no locking is required for accessing sokcet objects, updating buffers etc
*/
MI_Result Selector_CallInIOThread(
Selector* self,
Selector_NotificationCallback callback,
void* callback_self,
Message* message)
{
SelectorRep* rep = (SelectorRep*)self->rep;
SelectorCallbacksItem* newItem;
SelectorCallbacksList * list;
int itemsInList;
if (rep->ioThreadHandle == ThreadSelf())
{
/* direct call - we can write to socket instantly */
(*callback)(callback_self, message);
return MI_RESULT_OK;
}
/* add item to the list and set event */
newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
if (!newItem)
return MI_RESULT_FAILED;
newItem->callback = callback;
newItem->callback_self = callback_self;
newItem->message = message;
Message_AddRef(message);
list = _GetList(rep);
List_Append(&list->head, &list->tail, (ListElem*)newItem);
list->numberOfItem++;
itemsInList = list->numberOfItem;
_SetList(rep, list);
list = 0;
SetEvent(rep->callbacksAreAvailable);
while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)
Sleep_ms(1);
#if 0
/* wait until list is empty to avoid memory explosion */
for ( i = 0; itemsInList > LIST_SIZE_LIMIT && i < 500; i++ )
{
Sleep(1);
list = _GetList(rep);
itemsInList = list->numberOfItem;
_SetList(rep, list);
}
#endif
return MI_RESULT_OK;
}
#endif /* defined(CONFIG_OS_WINDOWS) */
/*
**==============================================================================
**
**
** POSIX Implementation
**
**
**==============================================================================
*/
#if defined(CONFIG_POSIX)
# include <string.h>
# include <unistd.h>
# include <errno.h>
# include <sys/socket.h>
# include <netinet/tcp.h>
# include <netinet/in.h>
# include <sys/time.h>
# include <sys/types.h>
# include <netdb.h>
# include <fcntl.h>
# include <arpa/inet.h>
typedef struct _SelectorCallbacksItem
{
Selector_NotificationCallback callback;
void* callback_self;
/* message has to be add-refed when added and dec-refed upon callback invocation */
Message* message;
}
SelectorCallbacksItem;
typedef struct _SelectorRep
{
/* File descriptor sets */
fd_set readSet;
fd_set writeSet;
fd_set exceptSet;
/* Linked list of event watchers */
ListElem* head;
ListElem* tail;
/* notifications channel */
int notificationSockets[2];
/* Number of notification items in a queue */
//AtomicInt queueLength;
/* Number of alive instance */
AtomicInt outstandingInstances;
/* flag to stop running */
MI_Boolean keepRunning;
/* flag that allows empty selector running
when empty selector runs, it only can be interrupted by
internal funcitons, since it has no sockets to monitor
*/
MI_Boolean allowEmptySelector;
/* io thread id */
ThreadHandle ioThreadHandle;
}
SelectorRep;
static int _Select(
fd_set* readSet,
fd_set* writeSet,
fd_set* exceptSet,
MI_Uint64 timeoutUsec)
{
int n = FD_SETSIZE;
struct timeval tv;
struct timeval* _tv = 0;
if ((MI_Uint64)-1 != timeoutUsec)
{
tv.tv_sec = (long)(timeoutUsec / 1000000);
tv.tv_usec = (long)(timeoutUsec % 1000000);
_tv = &tv;
}
return select(n, readSet, writeSet, exceptSet, _tv);
}
MI_INLINE void _FDSet(Sock sock, fd_set* set)
{
FD_SET(sock, set);
}
MI_INLINE void _FDClr(Sock sock, fd_set* set)
{
FD_CLR(sock, set);
}
MI_Result Selector_Init(
Selector* self)
{
self->rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));
if (!self->rep)
return MI_RESULT_FAILED;
if (pipe(self->rep->notificationSockets) != 0)
return MI_RESULT_FAILED;
/* set non-blocking for reader [0] */
Sock_SetBlocking( self->rep->notificationSockets[0], MI_FALSE);
/* Protect notification sockets from child processes */
if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) ||
MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE))
{
LOGW_CHAR(("fcntl(F_SETFD) failed"));
}
return MI_RESULT_OK;
}
void Selector_Destroy(Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
Handler* next;
/* Free all watchers */
for (p = (Handler*)rep->head; p; p = next)
{
next = (Handler*)p->next;
(*p->callback)(self, p, SELECTOR_DESTROY, 0);
p = next;
}
Sock_Close(rep->notificationSockets[0]);
Sock_Close(rep->notificationSockets[1]);
free(rep);
}
MI_Result Selector_AddHandler(
Selector* self,
Handler* handler)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
MI_Uint64 currentTimeUsec = 0;
if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
return MI_RESULT_FAILED;
/* Reject duplicates */
for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
{
if (p == handler)
return MI_RESULT_ALREADY_EXISTS;
}
/* Add new handler to list */
List_Append(&rep->head, &rep->tail, (ListElem*)handler);
(*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec);
return MI_RESULT_OK;
}
MI_Result Selector_RemoveHandler(
Selector* self,
Handler* handler)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
/* Find and remove handler from list */
for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
{
if (p == handler)
{
/* Remove handler */
List_Remove(&rep->head, &rep->tail, (ListElem*)p);
/* Notify handler of removal */
(*handler->callback)(self, p, SELECTOR_REMOVE, 0);
return MI_RESULT_OK;
}
}
return MI_RESULT_NOT_FOUND;
}
MI_Result Selector_RemoveAllHandlers(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
/* Find and remove handler from list */
for (p = (Handler*)rep->head; p; )
{
Handler* next = p->next;
/* Remove handler */
List_Remove(&rep->head, &rep->tail, (ListElem*)p);
/* Notify handler of removal */
(*p->callback)(self, p, SELECTOR_REMOVE, 0);
p = next;
}
return MI_RESULT_OK;
}
static void _ProcessCallbacks(
SelectorRep* rep)
{
SelectorCallbacksItem* item = 0;
size_t read = 0;
while( MI_RESULT_OK == Sock_Read( rep->notificationSockets[0], &item, sizeof(item), &read) &&
read == sizeof(item) )
{
//printf("pipe read: p = %p, sent = %d\n", item, (int)read);
if (item)
{
(*item->callback) (item->callback_self, item->message);
Message_Release(item->message);
//AtomicDec( &rep->queueLength );
}
}
}
MI_Result Selector_Run(
Selector* self,
MI_Uint64 timeoutUsec)
{
SelectorRep* rep = (SelectorRep*)self->rep;
MI_Uint64 timeoutSelectorAt = TIME_NEVER;
if ( TIME_NEVER != timeoutUsec )
{
if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))
return MI_RESULT_FAILED;
/* calculate when to terminate selector */
timeoutSelectorAt += timeoutUsec;
}
rep->ioThreadHandle = ThreadSelf();
/* Loop while detecting and dispatching events */
for (rep->keepRunning = MI_TRUE; rep->keepRunning;)
{
Handler* p;
MI_Uint64 currentTimeUsec = 0;
MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
int n;
MI_Uint32 count = 0;
MI_Boolean stressed = Selector_IsStressed(self);
if (MI_RESULT_OK != Time_Now(¤tTimeUsec))
return MI_RESULT_FAILED;
if (TIME_NEVER != timeoutSelectorAt)
{
if (currentTimeUsec >= timeoutSelectorAt)
return MI_RESULT_TIME_OUT;
breakCurrentSelectAt = timeoutSelectorAt;
}
/* Set up FD sets from handlers */
memset(&rep->readSet, 0, sizeof(rep->readSet));
memset(&rep->writeSet, 0, sizeof(rep->writeSet));
memset(&rep->exceptSet, 0, sizeof(rep->exceptSet));
for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
{
if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)
{
if (p->mask & SELECTOR_READ)
{
_FDSet(p->sock, &rep->readSet);
count++;
}
if (p->mask & SELECTOR_WRITE)
{
_FDSet(p->sock, &rep->writeSet);
count++;
}
if (p->mask & SELECTOR_EXCEPTION)
{
_FDSet(p->sock, &rep->exceptSet);
count++;
}
}
/* find the minimum timeout form the list */
if (TIME_NEVER != p->fireTimeoutAt)
{
/* if expired - don't wait; notification will be issued later in next loop */
if (currentTimeUsec >= p->fireTimeoutAt)
breakCurrentSelectAt = currentTimeUsec;
else if ( p->fireTimeoutAt < breakCurrentSelectAt)
breakCurrentSelectAt = p->fireTimeoutAt;
}
}
_FDSet(rep->notificationSockets[0], &rep->readSet);
/* empty list - return */
if (!rep->head && !rep->allowEmptySelector)
return MI_RESULT_FAILED;
/* Perform system select */
n = _Select(&rep->readSet, &rep->writeSet, &rep->exceptSet,
breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec);
/* ignore signals, since it canbe part of normal operation */
if (-1 == n && errno != EINTR)
return MI_RESULT_FAILED;
if (FD_ISSET(rep->notificationSockets[0], &rep->readSet))
_ProcessCallbacks(rep);
/* Notify handlers of outstanding events */
for (p = (Handler*)rep->head; p; )
{
MI_Uint32 mask = 0;
Handler* next = p->next;
if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)
{
if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION))
{
/* Check for read event */
if (FD_ISSET(p->sock, &rep->readSet))
{
mask |= SELECTOR_READ;
_FDClr(p->sock, &rep->readSet);
}
/* Check for write event */
if (FD_ISSET(p->sock, &rep->writeSet))
{
mask |= SELECTOR_WRITE;
_FDClr(p->sock, &rep->writeSet);
}
/* Check for exception event */
if (FD_ISSET(p->sock, &rep->exceptSet))
{
mask |= SELECTOR_EXCEPTION;
_FDClr(p->sock, &rep->exceptSet);
}
}
}
if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
{
mask |= SELECTOR_TIMEOUT;
}
/* If any events, notify watcher */
if (mask)
{
MI_Boolean more;
more = (*p->callback)(self, p, mask, currentTimeUsec);
if (!more)
{
/* Remove handler */
List_Remove(&rep->head, &rep->tail, (ListElem*)p);
/* Notify handler of removal */
(*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
}
}
/* Advance to next handler */
p = next;
}
}
return MI_RESULT_OK;
}
/* Wakes up selector's thread
Typical usage is to recalculate timeouts on handlers
when selector's Run is running in different thread */
MI_Result Selector_Wakeup(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
/* notify running thread */
{
void* p = 0;
size_t sent = 0;
Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent);
}
return MI_RESULT_OK;
}
MI_Result Selector_StopRunning(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
rep->keepRunning = MI_FALSE;
return Selector_Wakeup(self);
}
/*
* This funciton guaranties that callback is called in 'Run'/'IO' thread context,
* so locking is required for accessing sokcet objects, updating buffers etc
*/
MI_Result Selector_CallInIOThread(
Selector* self,
Selector_NotificationCallback callback,
void* callback_self,
Message* message)
{
SelectorRep* rep = (SelectorRep*)self->rep;
SelectorCallbacksItem* newItem;
MI_Result r;
size_t sent = 0;
if (rep->ioThreadHandle == ThreadSelf())
{
/* direct call - we can write to socket instantly */
(*callback)(callback_self, message);
return MI_RESULT_OK;
}
/* add item to the list and set event */
newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
if (!newItem)
return MI_RESULT_FAILED;
newItem->callback = callback;
newItem->callback_self = callback_self;
newItem->message = message;
//AtomicInc( &rep->queueLength );
while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)
Sleep_ms(1);
#if 0
{
int counter = 0;
while (rep->queueLength > LIST_SIZE_LIMIT )
{
counter++;
/* give system a chance to clear backlog */
Sleep_ms(1);
if (counter > 1000)
{
LOGW((MI_T("cannot send message: queue overflow)\n") ));
return MI_RESULT_FAILED;
}
}
}
#endif
Message_AddRef(message);
r = Sock_Write( rep->notificationSockets[1], &newItem, sizeof(newItem), &sent);
//printf("pipe write: p = %p, sent = %d\n", newItem, (int)sent);
if ( MI_RESULT_OK != r )
Message_Release(message);
return r;
}
#endif /* defined(CONFIG_POSIX) */
/************************************/
/* generic functionality */
MI_Result Selector_ContainsHandler(
Selector* self,
Handler* handler)
{
SelectorRep* rep = (SelectorRep*)self->rep;
Handler* p;
for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
{
if (p == handler)
{
return MI_RESULT_OK;
}
}
return MI_RESULT_NOT_FOUND;
}
static void _dtor(Message* message, void* callbackData)
{
SelectorRep* rep = (SelectorRep*)callbackData;
MI_UNUSED(message);
AtomicDec( &rep->outstandingInstances );
}
void Selector_NewInstanceCreated(
Selector* self,
Message* msg)
{
SelectorRep* rep = (SelectorRep*)self->rep;
msg->dtor = _dtor;
msg->dtorData = rep;
AtomicInc( &rep->outstandingInstances );
}
MI_Boolean Selector_IsStressed(
Selector* self)
{
SelectorRep* rep = (SelectorRep*)self->rep;
/* Are we close to be stressed? */
if (rep->outstandingInstances >= (MAX_ALLOCATED_INSTANCES * 9 / 10))
return MI_TRUE;
return MI_FALSE;
}
void Selector_SetAllowEmptyFlag(
Selector* self,
MI_Boolean allowEmptySelector)
{
SelectorRep* rep = (SelectorRep*)self->rep;
rep->allowEmptySelector = allowEmptySelector;
}