version 1.2, 2015/04/20 18:10:35
|
version 1.3, 2015/04/20 18:20:34
|
|
|
*/ | */ |
| |
#include "selector.h" | #include "selector.h" |
#include <base/time.h> |
#include <pal/sleep.h> |
#include "thread.h" |
#include <pal/thread.h> |
|
#include <pal/sleep.h> |
#include <base/log.h> | #include <base/log.h> |
#include <base/result.h> | #include <base/result.h> |
#include <base/atomic.h> |
#include <pal/atomic.h> |
|
|
|
// #define ENABLE_TRACING 1 |
|
#ifdef ENABLE_TRACING |
|
# define TRACING_LEVEL 4 |
|
# include <deprecated/logging/logging.h> |
|
#else |
|
# define LOGE2(a) |
|
# define LOGW2(a) |
|
# define LOGD2(a) |
|
# define LOGX2(a) |
|
#endif |
| |
/* maximum number of items in the list */ | /* maximum number of items in the list */ |
#define LIST_SIZE_LIMIT 321 | #define LIST_SIZE_LIMIT 321 |
|
|
/* maximum instances that can be allocated */ | /* maximum instances that can be allocated */ |
#define MAX_ALLOCATED_INSTANCES 500 | #define MAX_ALLOCATED_INSTANCES 500 |
| |
|
|
/* | /* |
**============================================================================== | **============================================================================== |
** | ** |
|
|
*/ | */ |
SelectorCallbacksList* callbacksList; | SelectorCallbacksList* callbacksList; |
| |
/* Number of alive instance */ |
|
AtomicInt outstandingInstances; |
|
|
|
/* io thread id */ | /* io thread id */ |
ThreadHandle ioThreadHandle; |
ThreadID ioThreadHandle; |
| |
/* flag to stop running */ | /* flag to stop running */ |
MI_Boolean keepRunning; | MI_Boolean keepRunning; |
|
MI_Boolean keepRunningNoReadsMode; |
|
|
|
/* flag to retry a dispatch loop */ |
|
MI_Boolean keepDispatching; |
| |
/* flag that allows empty selector running | /* flag that allows empty selector running |
when empty selector runs, it only can be interrupted by | when empty selector runs, it only can be interrupted by |
|
|
InterlockedExchangePointer( &rep->callbacksList, list ); | InterlockedExchangePointer( &rep->callbacksList, list ); |
} | } |
| |
static MI_Result _SetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32 mask) |
static MI_Result _SetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32 mask, MI_Boolean noReadsMode ) |
{ | { |
long e = 0; | long e = 0; |
| |
if (mask & SELECTOR_READ) |
if( !noReadsMode && (mask & SELECTOR_READ) ) |
e |= FD_ACCEPT | FD_READ | FD_CLOSE; | e |= FD_ACCEPT | FD_READ | FD_CLOSE; |
| |
if (mask & SELECTOR_WRITE) | if (mask & SELECTOR_WRITE) |
|
|
if (mask & SELECTOR_EXCEPTION) | if (mask & SELECTOR_EXCEPTION) |
e |= FD_OOB; | e |= FD_OOB; |
| |
if (WSAEventSelect(sock, event, e) == SOCKET_ERROR) |
if (WSAEventSelect(p->sock, rep->event, e) == SOCKET_ERROR) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
| |
return MI_RESULT_OK; | return MI_RESULT_OK; |
} | } |
| |
static MI_Result _GetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32* mask) |
static MI_Result _GetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32* mask) |
{ | { |
WSANETWORKEVENTS networkEvents; | WSANETWORKEVENTS networkEvents; |
long x; | long x; |
| |
ZeroMemory(&networkEvents, sizeof(networkEvents)); | ZeroMemory(&networkEvents, sizeof(networkEvents)); |
| |
if (WSAEnumNetworkEvents(sock, event, &networkEvents) != 0) |
if (WSAEnumNetworkEvents(p->sock, rep->event, &networkEvents) != 0) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
| |
x = networkEvents.lNetworkEvents; | x = networkEvents.lNetworkEvents; |
|
|
MI_Result Selector_Init( | MI_Result Selector_Init( |
Selector* self) | Selector* self) |
{ | { |
SelectorRep* rep = (SelectorRep*)calloc(1, sizeof(SelectorRep)); |
SelectorRep* rep = (SelectorRep*)PAL_Calloc(1, sizeof(SelectorRep)); |
| |
if (!rep) | if (!rep) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
|
|
| |
if (rep->event == WSA_INVALID_EVENT) | if (rep->event == WSA_INVALID_EVENT) |
{ | { |
free(rep); |
PAL_Free(rep); |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
} | } |
| |
|
|
if (NULL == rep->callbacksAreAvailable) | if (NULL == rep->callbacksAreAvailable) |
{ | { |
CloseHandle(rep->event); | CloseHandle(rep->event); |
free(rep); |
PAL_Free(rep); |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
} | } |
| |
|
|
next = (Handler*)p->next; | next = (Handler*)p->next; |
| |
/* Unselect events on this socket */ | /* Unselect events on this socket */ |
_SetSockEvents(rep->event, p->sock, 0); |
_SetSockEvents(rep, p, 0, MI_FALSE); |
| |
/* Invoke user callback */ | /* Invoke user callback */ |
(*p->callback)(self, p, SELECTOR_DESTROY, 0); | (*p->callback)(self, p, SELECTOR_DESTROY, 0); |
|
|
p = next; |
|
} | } |
| |
CloseHandle(rep->event); | CloseHandle(rep->event); |
|
|
Message_Release(item->message); | Message_Release(item->message); |
} | } |
| |
free(rep); |
PAL_Free(rep); |
} | } |
| |
MI_Result Selector_AddHandler( | MI_Result Selector_AddHandler( |
|
|
Handler* p; | Handler* p; |
MI_Uint64 currentTimeUsec = 0; | MI_Uint64 currentTimeUsec = 0; |
| |
if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) |
if (PAL_TRUE != PAL_Time(¤tTimeUsec)) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
| |
/* Reject duplicates */ | /* Reject duplicates */ |
|
|
List_Remove(&rep->head, &rep->tail, (ListElem*)p); | List_Remove(&rep->head, &rep->tail, (ListElem*)p); |
| |
/* Unselect events on this socket */ | /* Unselect events on this socket */ |
_SetSockEvents(rep->event, p->sock, 0); |
_SetSockEvents(rep, p, 0, MI_FALSE); |
| |
/* Notify handler of removal */ | /* Notify handler of removal */ |
(*handler->callback)(self, p, SELECTOR_REMOVE, 0); | (*handler->callback)(self, p, SELECTOR_REMOVE, 0); |
|
|
List_Remove(&rep->head, &rep->tail, (ListElem*)p); | List_Remove(&rep->head, &rep->tail, (ListElem*)p); |
| |
/* Unselect events on this socket */ | /* Unselect events on this socket */ |
_SetSockEvents(rep->event, p->sock, 0); |
_SetSockEvents(rep, p, 0, MI_FALSE); |
| |
/* Notify handler of removal */ | /* Notify handler of removal */ |
(*p->callback)(self, p, SELECTOR_REMOVE, 0); | (*p->callback)(self, p, SELECTOR_REMOVE, 0); |
|
|
static void _ProcessCallbacks( | static void _ProcessCallbacks( |
SelectorRep* rep) | SelectorRep* rep) |
{ | { |
SelectorCallbacksItem* item = 0; |
SelectorCallbacksItem* item = NULL; |
SelectorCallbacksList * list; | SelectorCallbacksList * list; |
| |
/* remove all items from the list */ | /* remove all items from the list */ |
|
|
} | } |
} | } |
| |
MI_Result Selector_Run( |
void _Selector_WakeupFromWait( |
Selector* self, |
SelectorRep* rep) |
MI_Uint64 timeoutUsec) |
|
{ |
|
SelectorRep* rep = (SelectorRep*)self->rep; |
|
MI_Uint64 timeoutSelectorAt = TIME_NEVER; |
|
HANDLE handles[2]; |
|
|
|
handles[0] = rep->event; |
|
handles[1] = rep->callbacksAreAvailable; |
|
|
|
if ( TIME_NEVER != timeoutUsec ) |
|
{ |
|
if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt)) |
|
return MI_RESULT_FAILED; |
|
|
|
/* calculate when to terminate selector */ |
|
timeoutSelectorAt += timeoutUsec; |
|
} |
|
|
|
rep->ioThreadHandle = ThreadSelf(); |
|
|
|
for (rep->keepRunning = MI_TRUE; rep->keepRunning;) |
|
{ |
|
Handler* p; |
|
DWORD result; |
|
DWORD timeoutMsec; |
|
MI_Result r; |
|
MI_Boolean more; |
|
MI_Uint64 currentTimeUsec = 0; |
|
MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1; |
|
|
|
if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) |
|
return MI_RESULT_FAILED; |
|
|
|
if (TIME_NEVER != timeoutSelectorAt) |
|
{ |
|
if (currentTimeUsec >= timeoutSelectorAt) |
|
return MI_RESULT_TIME_OUT; |
|
|
|
breakCurrentSelectAt = timeoutSelectorAt; |
|
} |
|
|
|
/* calculate timeout */ |
|
for (p = (Handler*)rep->head; p; ) |
|
{ |
|
Handler* next = p->next; |
|
|
|
/* update event mask */ |
|
if (p->sock != INVALID_SOCK) |
|
{ |
|
r = _SetSockEvents(rep->event, p->sock, p->mask); |
|
|
|
if (r != MI_RESULT_OK) |
|
return r; |
|
} |
|
|
|
/* find the minimum timeout form the list */ |
|
if (TIME_NEVER != p->fireTimeoutAt) |
|
{ |
|
/* if expired - don't wait; notification will be issued later in next loop */ |
|
if (currentTimeUsec >= p->fireTimeoutAt) |
|
breakCurrentSelectAt = currentTimeUsec; |
|
else if ( p->fireTimeoutAt < breakCurrentSelectAt) |
|
breakCurrentSelectAt = p->fireTimeoutAt; |
|
} |
|
|
|
p = next; |
|
} |
|
|
|
/* empty list - return */ |
|
if (!rep->head && !rep->allowEmptySelector) |
|
return MI_RESULT_FAILED; |
|
|
|
/* Wait for events on any of the sockets */ |
|
timeoutMsec = |
|
(breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */ |
|
INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000); |
|
result = WaitForMultipleObjects(MI_COUNT(handles), handles, FALSE, timeoutMsec); |
|
|
|
if (result == WAIT_FAILED) |
|
return MI_RESULT_FAILED; |
|
|
|
//if ((WAIT_OBJECT_0 + 1) == result) /* other thread wants to call callback */ |
|
_ProcessCallbacks(rep); |
|
|
|
if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) |
|
return MI_RESULT_FAILED; |
|
|
|
/* Dispatch events on each socket */ |
|
for (p = (Handler*)rep->head; p; ) |
|
{ |
|
Handler* next = p->next; |
|
MI_Uint32 mask = 0; |
|
|
|
/* Get event mask for this socket */ |
|
if (p->sock != INVALID_SOCK) |
|
{ |
|
r = _GetSockEvents(rep->event, p->sock, &mask); |
|
|
|
if (r != MI_RESULT_OK) |
|
return r; |
|
} |
|
|
|
if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt) |
|
{ |
|
mask |= SELECTOR_TIMEOUT; |
|
} |
|
|
|
/* If there were any events on this socket, dispatch them */ |
|
if (mask) |
|
{ |
|
/*MI_Uint32 oldMask = p->mask;*/ |
|
more = (*p->callback)(self, p, mask, currentTimeUsec); |
|
|
|
/* If callback wants to continue getting events */ |
|
if (!more) |
|
{ |
|
/* Remove handler */ |
|
List_Remove(&rep->head, &rep->tail, (ListElem*)p); |
|
|
|
/* Unselect events on this socket */ |
|
_SetSockEvents(rep->event, p->sock, 0); |
|
|
|
/* Notify handler of removal */ |
|
(*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec); |
|
} |
|
} |
|
|
|
p = next; |
|
} |
|
} |
|
return MI_RESULT_OK; |
|
} |
|
|
|
/* Wakes up selector's thread |
|
Typical usage is to recalculate timeouts on handlers |
|
when selector's Run is running in different thread */ |
|
MI_Result Selector_Wakeup( |
|
Selector* self) |
|
{ |
|
SelectorRep* rep = (SelectorRep*)self->rep; |
|
|
|
/* notify running thread (if different) */ |
|
if (rep->ioThreadHandle != ThreadSelf()) |
|
{ | { |
SetEvent(rep->callbacksAreAvailable); | SetEvent(rep->callbacksAreAvailable); |
} | } |
| |
return MI_RESULT_OK; |
|
} |
|
|
|
MI_Result Selector_StopRunning( |
|
Selector* self) |
|
{ |
|
SelectorRep* rep = (SelectorRep*)self->rep; |
|
|
|
rep->keepRunning = MI_FALSE; |
|
|
|
return Selector_Wakeup(self); |
|
} |
|
|
|
/* | /* |
* This function guaranties that callback is called in 'Run'/'IO' thread context, |
* This function guarantees that callback is called in 'Run'/'IO' thread context, |
* so no locking is required for accessing sokcet objects, updating buffers etc |
* so no locking is required for accessing socket objects, updating buffers, etc. |
*/ | */ |
MI_Result Selector_CallInIOThread( | MI_Result Selector_CallInIOThread( |
Selector* self, | Selector* self, |
|
|
SelectorCallbacksItem* newItem; | SelectorCallbacksItem* newItem; |
SelectorCallbacksList * list; | SelectorCallbacksList * list; |
int itemsInList; | int itemsInList; |
|
ThreadID current = Thread_ID(); |
| |
if (rep->ioThreadHandle == ThreadSelf()) |
if (Thread_Equal(&rep->ioThreadHandle, ¤t)) |
{ | { |
/* direct call - we can write to socket instantly */ | /* direct call - we can write to socket instantly */ |
| |
|
trace_Sock_SendingOnOwnThread( |
|
message, |
|
message->tag, |
|
MessageName(message->tag), |
|
message->operationId ); |
|
|
(*callback)(callback_self, message); | (*callback)(callback_self, message); |
return MI_RESULT_OK; | return MI_RESULT_OK; |
} | } |
|
|
list = _GetList(rep); | list = _GetList(rep); |
List_Append(&list->head, &list->tail, (ListElem*)newItem); | List_Append(&list->head, &list->tail, (ListElem*)newItem); |
list->numberOfItem++; | list->numberOfItem++; |
itemsInList = list->numberOfItem; |
|
_SetList(rep, list); |
|
list = 0; |
|
SetEvent(rep->callbacksAreAvailable); |
|
| |
while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES) |
trace_Sock_Sending( |
Sleep_ms(1); |
message, |
|
message->tag, |
|
MessageName(message->tag), |
|
message->operationId, |
|
0, |
|
MAX_ALLOCATED_INSTANCES, |
|
list->numberOfItem ); |
|
|
#if 0 | #if 0 |
/* wait until list is empty to avoid memory explosion */ |
|
for ( i = 0; itemsInList > LIST_SIZE_LIMIT && i < 500; i++ ) |
|
{ |
|
Sleep(1); |
|
list = _GetList(rep); |
|
itemsInList = list->numberOfItem; | itemsInList = list->numberOfItem; |
_SetList(rep, list); |
|
} |
|
#endif | #endif |
|
_SetList(rep, list); |
|
list = 0; |
|
SetEvent(rep->callbacksAreAvailable); |
| |
return MI_RESULT_OK; | return MI_RESULT_OK; |
} | } |
|
|
/* notifications channel */ | /* notifications channel */ |
int notificationSockets[2]; | int notificationSockets[2]; |
| |
/* Number of notification items in a queue */ |
|
//AtomicInt queueLength; |
|
|
|
/* Number of alive instance */ |
|
AtomicInt outstandingInstances; |
|
|
|
/* flag to stop running */ | /* flag to stop running */ |
MI_Boolean keepRunning; | MI_Boolean keepRunning; |
|
MI_Boolean keepRunningNoReadsMode; |
|
|
|
/* flag to retry a dispatch loop */ |
|
MI_Boolean keepDispatching; |
| |
/* flag that allows empty selector running | /* flag that allows empty selector running |
when empty selector runs, it only can be interrupted by | when empty selector runs, it only can be interrupted by |
|
|
MI_Boolean allowEmptySelector; | MI_Boolean allowEmptySelector; |
| |
/* io thread id */ | /* io thread id */ |
ThreadHandle ioThreadHandle; |
ThreadID ioThreadHandle; |
} | } |
SelectorRep; | SelectorRep; |
| |
|
|
fd_set* readSet, | fd_set* readSet, |
fd_set* writeSet, | fd_set* writeSet, |
fd_set* exceptSet, | fd_set* exceptSet, |
MI_Uint64 timeoutUsec) |
MI_Uint64 timeoutUsec, |
|
MI_Boolean* keepRunning) |
{ | { |
int n = FD_SETSIZE; | int n = FD_SETSIZE; |
|
int r; |
struct timeval tv; | struct timeval tv; |
struct timeval* _tv = 0; | struct timeval* _tv = 0; |
| |
|
|
_tv = &tv; | _tv = &tv; |
} | } |
| |
return select(n, readSet, writeSet, exceptSet, _tv); |
do |
|
{ |
|
r = select(n, readSet, writeSet, exceptSet, _tv); |
|
} |
|
while( (*keepRunning == MI_TRUE) && ( -1 == r ) && ( errno == EINTR ) ); |
|
|
|
return r; |
} | } |
| |
MI_INLINE void _FDSet(Sock sock, fd_set* set) | MI_INLINE void _FDSet(Sock sock, fd_set* set) |
|
|
MI_Result Selector_Init( | MI_Result Selector_Init( |
Selector* self) | Selector* self) |
{ | { |
self->rep = (SelectorRep*)calloc(1, sizeof(SelectorRep)); |
self->rep = (SelectorRep*)PAL_Calloc(1, sizeof(SelectorRep)); |
| |
if (!self->rep) | if (!self->rep) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
|
|
if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) || | if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) || |
MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE)) | MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE)) |
{ | { |
LOGW_CHAR(("fcntl(F_SETFD) failed")); |
trace_fcntl_failed( errno ); |
} | } |
| |
return MI_RESULT_OK; | return MI_RESULT_OK; |
|
|
Sock_Close(rep->notificationSockets[0]); | Sock_Close(rep->notificationSockets[0]); |
Sock_Close(rep->notificationSockets[1]); | Sock_Close(rep->notificationSockets[1]); |
| |
free(rep); |
PAL_Free(rep); |
} | } |
| |
MI_Result Selector_AddHandler( | MI_Result Selector_AddHandler( |
|
|
Handler* p; | Handler* p; |
MI_Uint64 currentTimeUsec = 0; | MI_Uint64 currentTimeUsec = 0; |
| |
if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) |
if (PAL_TRUE != PAL_Time(¤tTimeUsec)) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
| |
/* Reject duplicates */ | /* Reject duplicates */ |
|
|
return MI_RESULT_OK; | return MI_RESULT_OK; |
} | } |
| |
|
|
|
|
static void _ProcessCallbacks( | static void _ProcessCallbacks( |
SelectorRep* rep) | SelectorRep* rep) |
{ | { |
SelectorCallbacksItem* item = 0; |
SelectorCallbacksItem* item = NULL; |
size_t read = 0; | size_t read = 0; |
|
MI_Result r; |
| |
while( MI_RESULT_OK == Sock_Read( rep->notificationSockets[0], &item, sizeof(item), &read) && |
LOGD2((ZT("_ProcessCallbacks - Begin. notification socket: %d"), rep->notificationSockets[0])); |
|
while( (r = Sock_Read(rep->notificationSockets[0], &item, sizeof item, &read)) == MI_RESULT_OK && |
read == sizeof(item) ) | read == sizeof(item) ) |
{ | { |
//printf("pipe read: p = %p, sent = %d\n", item, (int)read); |
|
if (item) | if (item) |
{ | { |
|
LOGD2((ZT("_ProcessCallbacks - Calling item callback"))); |
(*item->callback) (item->callback_self, item->message); | (*item->callback) (item->callback_self, item->message); |
Message_Release(item->message); | Message_Release(item->message); |
//AtomicDec( &rep->queueLength ); |
//Atomic_Dec( &rep->queueLength ); |
} | } |
} | } |
|
LOGD2((ZT("_ProcessCallbacks - End. last result: %d (%s)"), r, mistrerror(r))); |
} | } |
| |
MI_Result Selector_Run( |
void _Selector_WakeupFromWait( |
|
SelectorRep* rep) |
|
{ |
|
void* p = 0; |
|
size_t sent = 0; |
|
|
|
Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent); |
|
} |
|
|
|
/* |
|
* This function guaranties that callback is called in 'Run'/'IO' thread context, |
|
* so locking is required for accessing sokcet objects, updating buffers etc |
|
*/ |
|
MI_Result Selector_CallInIOThread( |
Selector* self, | Selector* self, |
MI_Uint64 timeoutUsec) |
Selector_NotificationCallback callback, |
|
void* callback_self, |
|
Message* message) |
{ | { |
SelectorRep* rep = (SelectorRep*)self->rep; | SelectorRep* rep = (SelectorRep*)self->rep; |
MI_Uint64 timeoutSelectorAt = TIME_NEVER; |
SelectorCallbacksItem* newItem; |
|
MI_Result r; |
|
size_t sent = 0; |
|
ThreadID current = Thread_ID(); |
| |
if ( TIME_NEVER != timeoutUsec ) |
if (Thread_Equal(&rep->ioThreadHandle, ¤t)) |
{ | { |
if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt)) |
/* direct call - we can write to socket instantly */ |
return MI_RESULT_FAILED; |
|
| |
/* calculate when to terminate selector */ |
trace_Sock_SendingOnOwnThread( |
timeoutSelectorAt += timeoutUsec; |
message, |
} |
message->tag, |
|
MessageName(message->tag), |
|
message->operationId); |
| |
rep->ioThreadHandle = ThreadSelf(); |
(*callback)(callback_self, message); |
|
return MI_RESULT_OK; |
|
} |
| |
/* Loop while detecting and dispatching events */ |
/* add item to the list and set event */ |
for (rep->keepRunning = MI_TRUE; rep->keepRunning;) |
newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem)); |
{ |
|
Handler* p; |
|
MI_Uint64 currentTimeUsec = 0; |
|
MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1; |
|
int n; |
|
MI_Uint32 count = 0; |
|
MI_Boolean stressed = Selector_IsStressed(self); |
|
| |
if (MI_RESULT_OK != Time_Now(¤tTimeUsec)) |
if (!newItem) |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
| |
if (TIME_NEVER != timeoutSelectorAt) |
newItem->callback = callback; |
{ |
newItem->callback_self = callback_self; |
if (currentTimeUsec >= timeoutSelectorAt) |
newItem->message = message; |
return MI_RESULT_TIME_OUT; |
|
| |
breakCurrentSelectAt = timeoutSelectorAt; |
Message_AddRef(message); |
} |
r = Sock_Write(rep->notificationSockets[1], &newItem, sizeof(newItem), &sent); |
| |
/* Set up FD sets from handlers */ |
trace_Sock_SentResult( |
memset(&rep->readSet, 0, sizeof(rep->readSet)); |
message, |
memset(&rep->writeSet, 0, sizeof(rep->writeSet)); |
message->tag, |
memset(&rep->exceptSet, 0, sizeof(rep->exceptSet)); |
MessageName(message->tag), |
|
message->operationId, |
|
r); |
| |
for (p = (Handler*)rep->head; p; p = (Handler*)p->next) |
if ( MI_RESULT_OK != r ) |
{ |
Message_Release(message); |
if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0) |
|
|
return r; |
|
} |
|
|
|
static MI_Result _SetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32 mask, MI_Boolean noReadsMode ) |
{ | { |
if (p->mask & SELECTOR_READ) |
if( !noReadsMode && (mask & SELECTOR_READ) ) |
{ | { |
_FDSet(p->sock, &rep->readSet); | _FDSet(p->sock, &rep->readSet); |
count++; |
|
} | } |
if (p->mask & SELECTOR_WRITE) |
if (mask & SELECTOR_WRITE) |
{ | { |
_FDSet(p->sock, &rep->writeSet); | _FDSet(p->sock, &rep->writeSet); |
count++; |
|
} |
|
if (p->mask & SELECTOR_EXCEPTION) |
|
{ |
|
_FDSet(p->sock, &rep->exceptSet); |
|
count++; |
|
} |
|
} |
|
/* find the minimum timeout form the list */ |
|
if (TIME_NEVER != p->fireTimeoutAt) |
|
{ |
|
/* if expired - don't wait; notification will be issued later in next loop */ |
|
if (currentTimeUsec >= p->fireTimeoutAt) |
|
breakCurrentSelectAt = currentTimeUsec; |
|
else if ( p->fireTimeoutAt < breakCurrentSelectAt) |
|
breakCurrentSelectAt = p->fireTimeoutAt; |
|
} | } |
|
|
|
return MI_RESULT_OK; |
} | } |
| |
_FDSet(rep->notificationSockets[0], &rep->readSet); |
static MI_Result _GetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32* mask) |
|
|
/* empty list - return */ |
|
if (!rep->head && !rep->allowEmptySelector) |
|
return MI_RESULT_FAILED; |
|
|
|
|
|
/* Perform system select */ |
|
n = _Select(&rep->readSet, &rep->writeSet, &rep->exceptSet, |
|
breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec); |
|
|
|
/* ignore signals, since it canbe part of normal operation */ |
|
if (-1 == n && errno != EINTR) |
|
return MI_RESULT_FAILED; |
|
|
|
if (FD_ISSET(rep->notificationSockets[0], &rep->readSet)) |
|
_ProcessCallbacks(rep); |
|
|
|
/* Notify handlers of outstanding events */ |
|
for (p = (Handler*)rep->head; p; ) |
|
{ |
|
MI_Uint32 mask = 0; |
|
Handler* next = p->next; |
|
|
|
if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0) |
|
{ | { |
|
*mask = 0; |
if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION)) | if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION)) |
{ | { |
/* Check for read event */ | /* Check for read event */ |
if (FD_ISSET(p->sock, &rep->readSet)) | if (FD_ISSET(p->sock, &rep->readSet)) |
{ | { |
mask |= SELECTOR_READ; |
*mask |= SELECTOR_READ; |
_FDClr(p->sock, &rep->readSet); | _FDClr(p->sock, &rep->readSet); |
} | } |
| |
/* Check for write event */ | /* Check for write event */ |
if (FD_ISSET(p->sock, &rep->writeSet)) | if (FD_ISSET(p->sock, &rep->writeSet)) |
{ | { |
mask |= SELECTOR_WRITE; |
*mask |= SELECTOR_WRITE; |
_FDClr(p->sock, &rep->writeSet); | _FDClr(p->sock, &rep->writeSet); |
} | } |
| |
/* Check for exception event */ | /* Check for exception event */ |
if (FD_ISSET(p->sock, &rep->exceptSet)) | if (FD_ISSET(p->sock, &rep->exceptSet)) |
{ | { |
mask |= SELECTOR_EXCEPTION; |
|
_FDClr(p->sock, &rep->exceptSet); | _FDClr(p->sock, &rep->exceptSet); |
} | } |
} | } |
} |
|
if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt) |
return MI_RESULT_OK; |
{ |
|
mask |= SELECTOR_TIMEOUT; |
|
} | } |
| |
/* If any events, notify watcher */ |
#endif /* defined(CONFIG_POSIX) */ |
if (mask) |
|
{ |
|
MI_Boolean more; |
|
| |
more = (*p->callback)(self, p, mask, currentTimeUsec); |
/************************************/ |
|
/* generic functionality */ |
| |
if (!more) |
MI_Result Selector_ContainsHandler( |
|
Selector* self, |
|
Handler* handler) |
{ | { |
/* Remove handler */ |
SelectorRep* rep = (SelectorRep*)self->rep; |
List_Remove(&rep->head, &rep->tail, (ListElem*)p); |
Handler* p; |
| |
/* Notify handler of removal */ |
for (p = (Handler*)rep->head; p; p = (Handler*)p->next) |
(*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec); |
{ |
|
if (p == handler) |
|
{ |
|
return MI_RESULT_OK; |
} | } |
|
|
} | } |
| |
/* Advance to next handler */ |
return MI_RESULT_NOT_FOUND; |
p = next; |
|
} |
|
} | } |
| |
return MI_RESULT_OK; |
void Selector_SetAllowEmptyFlag( |
|
Selector* self, |
|
MI_Boolean allowEmptySelector) |
|
{ |
|
SelectorRep* rep = (SelectorRep*)self->rep; |
|
|
|
rep->allowEmptySelector = allowEmptySelector; |
} | } |
| |
/* Wakes up selector's thread | /* Wakes up selector's thread |
Typical usage is to recalculate timeouts on handlers | Typical usage is to recalculate timeouts on handlers |
when selector's Run is running in different thread */ | when selector's Run is running in different thread */ |
MI_Result Selector_Wakeup( | MI_Result Selector_Wakeup( |
Selector* self) |
_In_ Selector* self, |
|
MI_Boolean retryDispatching ) |
{ | { |
SelectorRep* rep = (SelectorRep*)self->rep; | SelectorRep* rep = (SelectorRep*)self->rep; |
|
ThreadID current = Thread_ID(); |
| |
/* notify running thread */ |
/* notify running thread (if different) */ |
|
if( !Thread_Equal(&rep->ioThreadHandle, ¤t) ) |
{ | { |
void* p = 0; |
_Selector_WakeupFromWait(rep); |
size_t sent = 0; |
} |
|
else if( retryDispatching ) |
Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent); |
{ |
|
rep->keepDispatching = MI_TRUE; |
} | } |
| |
return MI_RESULT_OK; | return MI_RESULT_OK; |
} | } |
| |
|
|
MI_Result Selector_StopRunning( | MI_Result Selector_StopRunning( |
Selector* self) | Selector* self) |
{ | { |
|
|
| |
rep->keepRunning = MI_FALSE; | rep->keepRunning = MI_FALSE; |
| |
return Selector_Wakeup(self); |
return Selector_Wakeup(self, MI_FALSE); |
} | } |
| |
/* |
MI_Result Selector_StopRunningNoReadsMode( |
* This funciton guaranties that callback is called in 'Run'/'IO' thread context, |
Selector* self) |
* so locking is required for accessing sokcet objects, updating buffers etc |
|
*/ |
|
MI_Result Selector_CallInIOThread( |
|
Selector* self, |
|
Selector_NotificationCallback callback, |
|
void* callback_self, |
|
Message* message) |
|
{ | { |
SelectorRep* rep = (SelectorRep*)self->rep; | SelectorRep* rep = (SelectorRep*)self->rep; |
SelectorCallbacksItem* newItem; |
|
MI_Result r; |
|
size_t sent = 0; |
|
| |
if (rep->ioThreadHandle == ThreadSelf()) |
rep->keepRunningNoReadsMode = MI_FALSE; |
{ |
|
/* direct call - we can write to socket instantly */ |
|
| |
(*callback)(callback_self, message); |
return Selector_Wakeup(self, MI_FALSE); |
return MI_RESULT_OK; |
|
} | } |
| |
/* add item to the list and set event */ |
MI_Result Selector_Run( |
newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem)); |
Selector* self, |
|
MI_Uint64 timeoutUsec, |
|
MI_Boolean noReadsMode ) |
|
{ |
|
SelectorRep* rep = (SelectorRep*)self->rep; |
|
MI_Uint64 timeoutSelectorAt = TIME_NEVER; |
|
MI_Boolean* keepRunningVar; |
|
#if defined(CONFIG_OS_WINDOWS) |
|
HANDLE handles[2]; |
| |
if (!newItem) |
trace_SelectorRun_Enter( self, timeoutUsec, noReadsMode ); |
return MI_RESULT_FAILED; |
|
| |
newItem->callback = callback; |
handles[0] = rep->event; |
newItem->callback_self = callback_self; |
handles[1] = rep->callbacksAreAvailable; |
newItem->message = message; |
#endif // defined(CONFIG_OS_WINDOWS) |
| |
//AtomicInc( &rep->queueLength ); |
if( noReadsMode ) |
|
{ |
|
keepRunningVar = &rep->keepRunningNoReadsMode; |
|
} |
|
else |
|
{ |
|
keepRunningVar = &rep->keepRunning; |
|
} |
| |
while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES) |
LOGD2((ZT("Selector_Run - Begin. timeoutUsec: %ld"), (long)timeoutUsec)); |
Sleep_ms(1); |
|
| |
#if 0 |
if ( TIME_NEVER != timeoutUsec ) |
{ | { |
int counter = 0; |
if (PAL_TRUE != PAL_Time(&timeoutSelectorAt)) |
|
{ |
|
trace_SelectorRun_InitPALTIME_Error( self ); |
|
return MI_RESULT_FAILED; |
|
} |
|
|
|
/* calculate when to terminate selector */ |
|
LOGD2((ZT("Selector_Run - After timeout calculation. time now: %s, timeoutSelectorAt: %s"), FmtTime(timeoutSelectorAt), FmtTime(timeoutSelectorAt + timeoutUsec))); |
|
timeoutSelectorAt += timeoutUsec; |
|
} |
|
|
|
rep->ioThreadHandle = Thread_ID(); |
| |
while (rep->queueLength > LIST_SIZE_LIMIT ) |
/* Loop while detecting and dispatching events */ |
|
for (*keepRunningVar = MI_TRUE; *keepRunningVar; ) |
{ | { |
counter++; |
Handler* p; |
/* give system a chance to clear backlog */ |
MI_Uint64 currentTimeUsec = 0; |
Sleep_ms(1); |
MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1; |
|
MI_Boolean more; |
|
MI_Result r; |
|
#if defined(CONFIG_OS_WINDOWS) |
|
DWORD result; |
|
DWORD timeoutMsec; |
|
#else |
|
int n; |
|
#endif |
| |
if (counter > 1000) |
if (PAL_TRUE != PAL_Time(¤tTimeUsec)) |
{ | { |
LOGW((MI_T("cannot send message: queue overflow)\n") )); |
trace_SelectorRun_InitPALTIME_Error( self ); |
|
LOGE2((ZT("Selector_Run - PAL_Time failed"))); |
return MI_RESULT_FAILED; | return MI_RESULT_FAILED; |
} | } |
|
|
|
if (TIME_NEVER != timeoutSelectorAt) |
|
{ |
|
if (currentTimeUsec >= timeoutSelectorAt) |
|
{ |
|
LOGW2((ZT("Selector_Run - Selector timeout. current time: %s, limit time: %s, interval: %ld ms"), |
|
FmtTime(timeoutSelectorAt), FmtTime(currentTimeUsec), |
|
(long)(currentTimeUsec - timeoutSelectorAt))); |
|
return MI_RESULT_TIME_OUT; |
} | } |
|
|
|
breakCurrentSelectAt = timeoutSelectorAt; |
} | } |
#endif |
|
| |
Message_AddRef(message); |
#if defined(CONFIG_POSIX) |
r = Sock_Write( rep->notificationSockets[1], &newItem, sizeof(newItem), &sent); |
/* Set up FD sets from handlers */ |
|
memset(&rep->readSet, 0, sizeof(rep->readSet)); |
|
memset(&rep->writeSet, 0, sizeof(rep->writeSet)); |
|
memset(&rep->exceptSet, 0, sizeof(rep->exceptSet)); |
|
#endif /* defined(CONFIG_POSIX) */ |
| |
|
/* calculate timeout */ |
|
for (p = (Handler*)rep->head; p; ) |
|
{ |
|
Handler* next = p->next; |
| |
//printf("pipe write: p = %p, sent = %d\n", newItem, (int)sent); |
/* update event mask */ |
if ( MI_RESULT_OK != r ) |
#if defined(CONFIG_OS_WINDOWS) |
Message_Release(message); |
if( p->sock != INVALID_SOCK ) |
|
#endif |
|
{ |
|
r = _SetSockEvents(rep, p, p->mask, noReadsMode ); |
| |
|
if (r != MI_RESULT_OK) |
|
{ |
|
LOGE2((ZT("Selector_Run - _SetSockEvents failed"))); |
|
trace_SelectorRun_SetSocketEventsError( self, r, p ); |
return r; | return r; |
} | } |
|
} |
| |
#endif /* defined(CONFIG_POSIX) */ |
/* find the minimum timeout form the list */ |
|
if (TIME_NEVER != p->fireTimeoutAt) |
|
{ |
|
/* if expired - don't wait; notification will be issued later in next loop */ |
|
if (currentTimeUsec >= p->fireTimeoutAt) |
|
breakCurrentSelectAt = currentTimeUsec; |
|
else if ( p->fireTimeoutAt < breakCurrentSelectAt) |
|
breakCurrentSelectAt = p->fireTimeoutAt; |
|
} |
| |
/************************************/ |
p = next; |
/* generic functionality */ |
} |
| |
MI_Result Selector_ContainsHandler( |
#if defined(CONFIG_POSIX) |
Selector* self, |
_FDSet(rep->notificationSockets[0], &rep->readSet); |
Handler* handler) |
#endif /* defined(CONFIG_POSIX) */ |
|
|
|
/* empty list - return */ |
|
if (!rep->head && !rep->allowEmptySelector) |
{ | { |
SelectorRep* rep = (SelectorRep*)self->rep; |
LOGE2((ZT("Selector_Run - Empty list"))); |
Handler* p; |
trace_SelectorRun_EmptyList( self ); |
|
return MI_RESULT_FAILED; |
|
} |
| |
for (p = (Handler*)rep->head; p; p = (Handler*)p->next) |
#if defined(CONFIG_OS_WINDOWS) |
|
/* Wait for events on any of the sockets */ |
|
timeoutMsec = |
|
(breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */ |
|
INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000); |
|
result = WaitForMultipleObjectsEx(MI_COUNT(handles), handles, FALSE, timeoutMsec, FALSE); |
|
|
|
if (result == WAIT_FAILED) |
{ | { |
if (p == handler) |
trace_SelectorRun_WaitError( self, result ); |
|
return MI_RESULT_FAILED; |
|
} |
|
#else |
|
/* Perform system select */ |
|
n = _Select(&rep->readSet, &rep->writeSet, NULL, |
|
breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec, |
|
keepRunningVar); |
|
|
|
/* ignore signals, since it canbe part of normal operation */ |
|
if (-1 == n && errno != EINTR) |
{ | { |
return MI_RESULT_OK; |
LOGE2((ZT("Selector_Run - _Select failed. errno: %d (%s)"), errno, strerror(errno))); |
|
trace_SelectorRun_WaitError( self, errno ); |
|
return MI_RESULT_FAILED; |
} | } |
|
#endif |
|
|
|
do |
|
{ |
|
rep->keepDispatching = MI_FALSE; |
|
|
|
#if defined(CONFIG_OS_WINDOWS) |
|
//if ((WAIT_OBJECT_0 + 1) == result) /* other thread wants to call callback */ |
|
#else |
|
if (FD_ISSET(rep->notificationSockets[0], &rep->readSet)) |
|
#endif |
|
{ |
|
_ProcessCallbacks(rep); |
} | } |
| |
return MI_RESULT_NOT_FOUND; |
/* Dispatch events on each socket */ |
|
for (p = (Handler*)rep->head; p; ) |
|
{ |
|
Handler* next = p->next; |
|
MI_Uint32 mask = 0; |
|
|
|
/* Refresh current time stamp */ |
|
if (PAL_TRUE != PAL_Time(¤tTimeUsec)) |
|
{ |
|
LOGE2((ZT("Selector_Run - PAL_Time failed"))); |
|
trace_SelectorRun_PALTimeError( self ); |
|
return MI_RESULT_FAILED; |
} | } |
| |
static void _dtor(Message* message, void* callbackData) |
/* Get event mask for this socket */ |
|
#if defined(CONFIG_OS_WINDOWS) |
|
if (p->sock != INVALID_SOCK) |
|
#endif |
{ | { |
SelectorRep* rep = (SelectorRep*)callbackData; |
r = _GetSockEvents(rep, p, &mask); |
| |
MI_UNUSED(message); |
if (r != MI_RESULT_OK) |
|
{ |
|
LOGE2((ZT("Selector_Run - _GetSockEvents failed with result: %d (%s)"), (int)r, mistrerror(r))); |
|
trace_SelectorRun_GetSocketEventsError( self, r, p ); |
|
return r; |
|
} |
|
} |
| |
AtomicDec( &rep->outstandingInstances ); |
if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt) |
|
{ |
|
mask |= SELECTOR_TIMEOUT; |
} | } |
| |
void Selector_NewInstanceCreated( |
/* If there were any events on this socket, dispatch them */ |
Selector* self, |
if (mask) |
Message* msg) |
|
{ | { |
SelectorRep* rep = (SelectorRep*)self->rep; |
/*MI_Uint32 oldMask = p->mask;*/ |
|
more = (*p->callback)(self, p, mask, currentTimeUsec); |
| |
msg->dtor = _dtor; |
/* If callback wants to continue getting events */ |
msg->dtorData = rep; |
if (!more) |
|
{ |
|
/* Remove handler */ |
|
List_Remove(&rep->head, &rep->tail, (ListElem*)p); |
| |
AtomicInc( &rep->outstandingInstances ); |
/* Refresh current time stamp */ |
|
if (PAL_TRUE != PAL_Time(¤tTimeUsec)) |
|
{ |
|
LOGE2((ZT("Selector_Run - PAL_Time failed"))); |
|
trace_SelectorRun_PALTimeError( self ); |
|
return MI_RESULT_FAILED; |
} | } |
| |
MI_Boolean Selector_IsStressed( |
#if defined(CONFIG_OS_WINDOWS) |
Selector* self) |
/* Unselect events on this socket */ |
{ |
_SetSockEvents(rep, p, 0, noReadsMode); |
SelectorRep* rep = (SelectorRep*)self->rep; |
#endif |
|
|
|
/* Notify handler of removal */ |
|
(*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec); |
|
} |
|
} |
| |
/* Are we close to be stressed? */ |
p = next; |
if (rep->outstandingInstances >= (MAX_ALLOCATED_INSTANCES * 9 / 10)) |
} |
return MI_TRUE; |
} |
|
while( rep->keepDispatching ); |
|
} |
| |
return MI_FALSE; |
LOGE2((ZT("Selector_Run - OK exit"))); |
|
trace_SelectorRun_Exit( self ); |
|
return MI_RESULT_OK; |
} | } |
| |
void Selector_SetAllowEmptyFlag( |
int Selector_IsSelectorThread(Selector* self, ThreadID *id) |
Selector* self, |
{ |
MI_Boolean allowEmptySelector) |
if( NULL == self || NULL == self->rep ) |
|
{ |
|
// Some UTs may not have an actual selector |
|
return 0; |
|
} |
|
else |
{ | { |
SelectorRep* rep = (SelectorRep*)self->rep; | SelectorRep* rep = (SelectorRep*)self->rep; |
|
return Thread_Equal(&rep->ioThreadHandle, id); |
rep->allowEmptySelector = allowEmptySelector; |
|
} | } |
|
} |
|
|
| |