(file) Return to selector.c CVS log (file) (dir) Up to [OMI] / omi / sock

Diff for /omi/sock/selector.c between version 1.2 and 1.3

version 1.2, 2015/04/20 18:10:35 version 1.3, 2015/04/20 18:20:34
Line 23 
Line 23 
 */ */
  
 #include "selector.h" #include "selector.h"
 #include <base/time.h>  #include <pal/sleep.h>
 #include "thread.h"  #include <pal/thread.h>
   #include <pal/sleep.h>
 #include <base/log.h> #include <base/log.h>
 #include <base/result.h> #include <base/result.h>
 #include <base/atomic.h>  #include <pal/atomic.h>
   
   // #define  ENABLE_TRACING 1
   #ifdef ENABLE_TRACING
   # define TRACING_LEVEL 4
   # include <deprecated/logging/logging.h>
   #else
   # define LOGE2(a)
   # define LOGW2(a)
   # define LOGD2(a)
   # define LOGX2(a)
   #endif
  
 /* maximum number of items in the list */ /* maximum number of items in the list */
 #define LIST_SIZE_LIMIT     321 #define LIST_SIZE_LIMIT     321
Line 35 
Line 47 
 /* maximum instances that can be allocated */ /* maximum instances that can be allocated */
 #define MAX_ALLOCATED_INSTANCES 500 #define MAX_ALLOCATED_INSTANCES 500
  
   
 /* /*
 **============================================================================== **==============================================================================
 ** **
Line 90 
Line 101 
     */     */
     SelectorCallbacksList*  callbacksList;     SelectorCallbacksList*  callbacksList;
  
     /* Number of alive instance */  
     AtomicInt outstandingInstances;  
   
     /* io thread id */     /* io thread id */
     ThreadHandle    ioThreadHandle;      ThreadID    ioThreadHandle;
  
     /* flag to stop running */     /* flag to stop running */
     MI_Boolean keepRunning;     MI_Boolean keepRunning;
       MI_Boolean keepRunningNoReadsMode;
   
       /* flag to retry a dispatch loop */
       MI_Boolean keepDispatching;
  
     /* flag that allows empty selector running     /* flag that allows empty selector running
         when empty selector runs, it only can be interrupted by         when empty selector runs, it only can be interrupted by
Line 127 
Line 139 
     InterlockedExchangePointer( &rep->callbacksList, list );     InterlockedExchangePointer( &rep->callbacksList, list );
 } }
  
 static MI_Result _SetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32 mask)  static MI_Result _SetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32 mask, MI_Boolean noReadsMode )
 { {
     long e = 0;     long e = 0;
  
     if (mask & SELECTOR_READ)      if( !noReadsMode && (mask & SELECTOR_READ) )
         e |= FD_ACCEPT | FD_READ | FD_CLOSE;         e |= FD_ACCEPT | FD_READ | FD_CLOSE;
  
     if (mask & SELECTOR_WRITE)     if (mask & SELECTOR_WRITE)
Line 140 
Line 152 
     if (mask & SELECTOR_EXCEPTION)     if (mask & SELECTOR_EXCEPTION)
         e |= FD_OOB;         e |= FD_OOB;
  
     if (WSAEventSelect(sock, event, e) == SOCKET_ERROR)      if (WSAEventSelect(p->sock, rep->event, e) == SOCKET_ERROR)
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
  
     return MI_RESULT_OK;     return MI_RESULT_OK;
 } }
  
 static MI_Result _GetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32* mask)  static MI_Result _GetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32* mask)
 { {
     WSANETWORKEVENTS networkEvents;     WSANETWORKEVENTS networkEvents;
     long x;     long x;
  
     ZeroMemory(&networkEvents, sizeof(networkEvents));     ZeroMemory(&networkEvents, sizeof(networkEvents));
  
     if (WSAEnumNetworkEvents(sock, event, &networkEvents) != 0)      if (WSAEnumNetworkEvents(p->sock, rep->event, &networkEvents) != 0)
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
  
     x = networkEvents.lNetworkEvents;     x = networkEvents.lNetworkEvents;
Line 174 
Line 186 
 MI_Result Selector_Init( MI_Result Selector_Init(
     Selector* self)     Selector* self)
 { {
     SelectorRep* rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));      SelectorRep* rep = (SelectorRep*)PAL_Calloc(1, sizeof(SelectorRep));
  
     if (!rep)     if (!rep)
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
Line 183 
Line 195 
  
     if (rep->event == WSA_INVALID_EVENT)     if (rep->event == WSA_INVALID_EVENT)
     {     {
         free(rep);          PAL_Free(rep);
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
     }     }
  
Line 194 
Line 206 
     if (NULL == rep->callbacksAreAvailable)     if (NULL == rep->callbacksAreAvailable)
     {     {
         CloseHandle(rep->event);         CloseHandle(rep->event);
         free(rep);          PAL_Free(rep);
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
     }     }
  
Line 215 
Line 227 
         next = (Handler*)p->next;         next = (Handler*)p->next;
  
         /* Unselect events on this socket */         /* Unselect events on this socket */
         _SetSockEvents(rep->event, p->sock, 0);          _SetSockEvents(rep, p, 0, MI_FALSE);
  
         /* Invoke user callback */         /* Invoke user callback */
         (*p->callback)(self, p, SELECTOR_DESTROY, 0);         (*p->callback)(self, p, SELECTOR_DESTROY, 0);
   
         p = next;  
     }     }
  
     CloseHandle(rep->event);     CloseHandle(rep->event);
Line 235 
Line 245 
         Message_Release(item->message);         Message_Release(item->message);
     }     }
  
     free(rep);      PAL_Free(rep);
 } }
  
 MI_Result Selector_AddHandler( MI_Result Selector_AddHandler(
Line 246 
Line 256 
     Handler* p;     Handler* p;
     MI_Uint64 currentTimeUsec = 0;     MI_Uint64 currentTimeUsec = 0;
  
     if (MI_RESULT_OK != Time_Now(&currentTimeUsec))      if (PAL_TRUE != PAL_Time(&currentTimeUsec))
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
  
     /* Reject duplicates */     /* Reject duplicates */
Line 280 
Line 290 
             List_Remove(&rep->head, &rep->tail, (ListElem*)p);             List_Remove(&rep->head, &rep->tail, (ListElem*)p);
  
             /* Unselect events on this socket */             /* Unselect events on this socket */
             _SetSockEvents(rep->event, p->sock, 0);              _SetSockEvents(rep, p, 0, MI_FALSE);
  
             /* Notify handler of removal */             /* Notify handler of removal */
             (*handler->callback)(self, p, SELECTOR_REMOVE, 0);             (*handler->callback)(self, p, SELECTOR_REMOVE, 0);
Line 307 
Line 317 
         List_Remove(&rep->head, &rep->tail, (ListElem*)p);         List_Remove(&rep->head, &rep->tail, (ListElem*)p);
  
         /* Unselect events on this socket */         /* Unselect events on this socket */
         _SetSockEvents(rep->event, p->sock, 0);          _SetSockEvents(rep, p, 0, MI_FALSE);
  
         /* Notify handler of removal */         /* Notify handler of removal */
         (*p->callback)(self, p, SELECTOR_REMOVE, 0);         (*p->callback)(self, p, SELECTOR_REMOVE, 0);
Line 321 
Line 331 
 static void _ProcessCallbacks( static void _ProcessCallbacks(
     SelectorRep* rep)     SelectorRep* rep)
 { {
     SelectorCallbacksItem* item = 0;      SelectorCallbacksItem* item = NULL;
     SelectorCallbacksList * list;     SelectorCallbacksList * list;
  
     /* remove all items from the list */     /* remove all items from the list */
Line 343 
Line 353 
     }     }
 } }
  
 MI_Result Selector_Run(  void _Selector_WakeupFromWait(
     Selector* self,      SelectorRep* rep)
     MI_Uint64 timeoutUsec)  
 {  
     SelectorRep* rep = (SelectorRep*)self->rep;  
     MI_Uint64 timeoutSelectorAt = TIME_NEVER;  
     HANDLE handles[2];  
   
     handles[0] = rep->event;  
     handles[1] = rep->callbacksAreAvailable;  
   
     if ( TIME_NEVER != timeoutUsec )  
     {  
         if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))  
             return MI_RESULT_FAILED;  
   
         /* calculate when to terminate selector */  
         timeoutSelectorAt += timeoutUsec;  
     }  
   
     rep->ioThreadHandle = ThreadSelf();  
   
     for (rep->keepRunning = MI_TRUE; rep->keepRunning;)  
     {  
         Handler* p;  
         DWORD result;  
         DWORD timeoutMsec;  
         MI_Result r;  
         MI_Boolean more;  
         MI_Uint64 currentTimeUsec = 0;  
         MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;  
   
         if (MI_RESULT_OK != Time_Now(&currentTimeUsec))  
             return MI_RESULT_FAILED;  
   
         if (TIME_NEVER != timeoutSelectorAt)  
         {  
             if (currentTimeUsec >= timeoutSelectorAt)  
                 return MI_RESULT_TIME_OUT;  
   
             breakCurrentSelectAt = timeoutSelectorAt;  
         }  
   
         /* calculate timeout */  
         for (p = (Handler*)rep->head; p; )  
         {  
             Handler* next = p->next;  
   
             /* update event mask */  
             if (p->sock != INVALID_SOCK)  
             {  
                 r = _SetSockEvents(rep->event, p->sock, p->mask);  
   
                 if (r != MI_RESULT_OK)  
                     return r;  
             }  
   
             /* find the minimum timeout form the list */  
             if (TIME_NEVER != p->fireTimeoutAt)  
             {  
                 /* if expired - don't wait; notification will be issued later in next loop */  
                 if (currentTimeUsec >= p->fireTimeoutAt)  
                     breakCurrentSelectAt = currentTimeUsec;  
                 else if ( p->fireTimeoutAt < breakCurrentSelectAt)  
                     breakCurrentSelectAt = p->fireTimeoutAt;  
             }  
   
             p = next;  
         }  
   
         /* empty list - return */  
         if (!rep->head && !rep->allowEmptySelector)  
             return MI_RESULT_FAILED;  
   
         /* Wait for events on any of the sockets */  
         timeoutMsec =  
             (breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */  
             INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000);  
         result = WaitForMultipleObjects(MI_COUNT(handles), handles, FALSE, timeoutMsec);  
   
         if (result == WAIT_FAILED)  
             return MI_RESULT_FAILED;  
   
         //if ((WAIT_OBJECT_0 + 1) == result)  /* other thread wants to call callback */  
             _ProcessCallbacks(rep);  
   
         if (MI_RESULT_OK != Time_Now(&currentTimeUsec))  
             return MI_RESULT_FAILED;  
   
         /* Dispatch events on each socket */  
         for (p = (Handler*)rep->head; p; )  
         {  
             Handler* next = p->next;  
             MI_Uint32 mask = 0;  
   
             /* Get event mask for this socket */  
             if (p->sock != INVALID_SOCK)  
             {  
                 r = _GetSockEvents(rep->event, p->sock, &mask);  
   
                 if (r != MI_RESULT_OK)  
                     return r;  
             }  
   
             if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)  
             {  
                 mask |= SELECTOR_TIMEOUT;  
             }  
   
             /* If there were any events on this socket, dispatch them */  
             if (mask)  
             {  
                 /*MI_Uint32 oldMask = p->mask;*/  
                 more = (*p->callback)(self, p, mask, currentTimeUsec);  
   
                 /* If callback wants to continue getting events */  
                 if (!more)  
                 {  
                     /* Remove handler */  
                     List_Remove(&rep->head, &rep->tail, (ListElem*)p);  
   
                     /* Unselect events on this socket */  
                     _SetSockEvents(rep->event, p->sock, 0);  
   
                     /* Notify handler of removal */  
                     (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);  
                 }  
             }  
   
             p = next;  
         }  
     }  
     return MI_RESULT_OK;  
 }  
   
 /* Wakes up selector's thread  
     Typical usage is to recalculate timeouts on handlers  
     when selector's Run is running in different thread */  
 MI_Result Selector_Wakeup(  
     Selector* self)  
 {  
     SelectorRep* rep = (SelectorRep*)self->rep;  
   
     /* notify running thread (if different) */  
     if (rep->ioThreadHandle != ThreadSelf())  
     {     {
         SetEvent(rep->callbacksAreAvailable);         SetEvent(rep->callbacksAreAvailable);
     }     }
  
     return MI_RESULT_OK;  
 }  
   
 MI_Result Selector_StopRunning(  
     Selector* self)  
 {  
     SelectorRep* rep = (SelectorRep*)self->rep;  
   
     rep->keepRunning = MI_FALSE;  
   
     return Selector_Wakeup(self);  
 }  
   
 /* /*
     * This function guaranties that callback is called in 'Run'/'IO' thread context,      * This function guarantees that callback is called in 'Run'/'IO' thread context,
     * so no locking is required for accessing sokcet objects, updating buffers etc      * so no locking is required for accessing socket objects, updating buffers, etc.
 */ */
 MI_Result Selector_CallInIOThread( MI_Result Selector_CallInIOThread(
     Selector* self,     Selector* self,
Line 519 
Line 373 
     SelectorCallbacksItem* newItem;     SelectorCallbacksItem* newItem;
     SelectorCallbacksList * list;     SelectorCallbacksList * list;
     int itemsInList;     int itemsInList;
       ThreadID current = Thread_ID();
  
     if (rep->ioThreadHandle == ThreadSelf())      if (Thread_Equal(&rep->ioThreadHandle, &current))
     {     {
         /* direct call - we can write to socket instantly */         /* direct call - we can write to socket instantly */
  
           trace_Sock_SendingOnOwnThread(
               message,
               message->tag,
               MessageName(message->tag),
               message->operationId );
   
         (*callback)(callback_self, message);         (*callback)(callback_self, message);
         return MI_RESULT_OK;         return MI_RESULT_OK;
     }     }
Line 542 
Line 403 
     list = _GetList(rep);     list = _GetList(rep);
     List_Append(&list->head, &list->tail, (ListElem*)newItem);     List_Append(&list->head, &list->tail, (ListElem*)newItem);
     list->numberOfItem++;     list->numberOfItem++;
     itemsInList = list->numberOfItem;  
     _SetList(rep, list);  
     list  = 0;  
     SetEvent(rep->callbacksAreAvailable);  
  
     while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)      trace_Sock_Sending(
         Sleep_ms(1);          message,
           message->tag,
           MessageName(message->tag),
           message->operationId,
           0,
           MAX_ALLOCATED_INSTANCES,
           list->numberOfItem );
   
 #if 0 #if 0
     /* wait until list is empty to avoid memory explosion */  
     for ( i = 0; itemsInList > LIST_SIZE_LIMIT && i < 500; i++ )  
     {  
         Sleep(1);  
         list = _GetList(rep);  
         itemsInList = list->numberOfItem;         itemsInList = list->numberOfItem;
         _SetList(rep, list);  
     }  
 #endif #endif
       _SetList(rep, list);
       list  = 0;
       SetEvent(rep->callbacksAreAvailable);
  
     return MI_RESULT_OK;     return MI_RESULT_OK;
 } }
Line 611 
Line 471 
     /* notifications channel */     /* notifications channel */
     int notificationSockets[2];     int notificationSockets[2];
  
     /* Number of notification items in a queue */  
     //AtomicInt queueLength;  
   
     /* Number of alive instance */  
     AtomicInt outstandingInstances;  
   
     /* flag to stop running */     /* flag to stop running */
     MI_Boolean keepRunning;     MI_Boolean keepRunning;
       MI_Boolean keepRunningNoReadsMode;
   
       /* flag to retry a dispatch loop */
       MI_Boolean keepDispatching;
  
     /* flag that allows empty selector running     /* flag that allows empty selector running
         when empty selector runs, it only can be interrupted by         when empty selector runs, it only can be interrupted by
Line 627 
Line 485 
     MI_Boolean allowEmptySelector;     MI_Boolean allowEmptySelector;
  
     /* io thread id */     /* io thread id */
     ThreadHandle    ioThreadHandle;      ThreadID    ioThreadHandle;
 } }
 SelectorRep; SelectorRep;
  
Line 635 
Line 493 
     fd_set* readSet,     fd_set* readSet,
     fd_set* writeSet,     fd_set* writeSet,
     fd_set* exceptSet,     fd_set* exceptSet,
     MI_Uint64 timeoutUsec)      MI_Uint64 timeoutUsec,
       MI_Boolean* keepRunning)
 { {
     int n = FD_SETSIZE;     int n = FD_SETSIZE;
       int r;
     struct timeval tv;     struct timeval tv;
     struct timeval* _tv = 0;     struct timeval* _tv = 0;
  
Line 648 
Line 508 
         _tv = &tv;         _tv = &tv;
     }     }
  
     return select(n, readSet, writeSet, exceptSet, _tv);      do
       {
           r = select(n, readSet, writeSet, exceptSet, _tv);
       }
       while( (*keepRunning == MI_TRUE) && ( -1 == r ) && ( errno == EINTR ) );
   
       return r;
 } }
  
 MI_INLINE void _FDSet(Sock sock, fd_set* set) MI_INLINE void _FDSet(Sock sock, fd_set* set)
Line 664 
Line 530 
 MI_Result Selector_Init( MI_Result Selector_Init(
     Selector* self)     Selector* self)
 { {
     self->rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));      self->rep = (SelectorRep*)PAL_Calloc(1, sizeof(SelectorRep));
  
     if (!self->rep)     if (!self->rep)
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
Line 679 
Line 545 
     if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) ||     if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) ||
         MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE))         MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE))
     {     {
         LOGW_CHAR(("fcntl(F_SETFD) failed"));          trace_fcntl_failed( errno );
     }     }
  
     return MI_RESULT_OK;     return MI_RESULT_OK;
Line 703 
Line 569 
     Sock_Close(rep->notificationSockets[0]);     Sock_Close(rep->notificationSockets[0]);
     Sock_Close(rep->notificationSockets[1]);     Sock_Close(rep->notificationSockets[1]);
  
     free(rep);      PAL_Free(rep);
 } }
  
 MI_Result Selector_AddHandler( MI_Result Selector_AddHandler(
Line 714 
Line 580 
     Handler* p;     Handler* p;
     MI_Uint64 currentTimeUsec = 0;     MI_Uint64 currentTimeUsec = 0;
  
     if (MI_RESULT_OK != Time_Now(&currentTimeUsec))      if (PAL_TRUE != PAL_Time(&currentTimeUsec))
         return MI_RESULT_FAILED;         return MI_RESULT_FAILED;
  
     /* Reject duplicates */     /* Reject duplicates */
Line 780 
Line 646 
     return MI_RESULT_OK;     return MI_RESULT_OK;
 } }
  
   
   
 static void _ProcessCallbacks( static void _ProcessCallbacks(
     SelectorRep* rep)     SelectorRep* rep)
 { {
     SelectorCallbacksItem* item = 0;      SelectorCallbacksItem* item = NULL;
     size_t read = 0;     size_t read = 0;
       MI_Result r;
  
     while( MI_RESULT_OK == Sock_Read( rep->notificationSockets[0], &item, sizeof(item), &read) &&      LOGD2((ZT("_ProcessCallbacks - Begin. notification socket: %d"), rep->notificationSockets[0]));
       while( (r = Sock_Read(rep->notificationSockets[0], &item, sizeof item, &read)) == MI_RESULT_OK &&
         read == sizeof(item) )         read == sizeof(item) )
     {     {
     //printf("pipe read: p = %p, sent = %d\n", item, (int)read);  
         if (item)         if (item)
         {         {
               LOGD2((ZT("_ProcessCallbacks - Calling item callback")));
             (*item->callback) (item->callback_self, item->message);             (*item->callback) (item->callback_self, item->message);
             Message_Release(item->message);             Message_Release(item->message);
             //AtomicDec( &rep->queueLength );              //Atomic_Dec( &rep->queueLength );
         }         }
     }     }
       LOGD2((ZT("_ProcessCallbacks - End. last result: %d (%s)"), r, mistrerror(r)));
 } }
  
 MI_Result Selector_Run(  void _Selector_WakeupFromWait(
       SelectorRep* rep)
   {
       void* p = 0;
       size_t sent = 0;
   
       Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent);
   }
   
   /*
       * This function guaranties that callback is called in 'Run'/'IO' thread context,
       * so locking is required for accessing sokcet objects, updating buffers etc
   */
   MI_Result Selector_CallInIOThread(
     Selector* self,     Selector* self,
     MI_Uint64 timeoutUsec)      Selector_NotificationCallback  callback,
       void* callback_self,
       Message* message)
 { {
     SelectorRep* rep = (SelectorRep*)self->rep;     SelectorRep* rep = (SelectorRep*)self->rep;
     MI_Uint64 timeoutSelectorAt = TIME_NEVER;      SelectorCallbacksItem* newItem;
       MI_Result r;
       size_t sent = 0;
       ThreadID current = Thread_ID();
  
     if ( TIME_NEVER != timeoutUsec )      if (Thread_Equal(&rep->ioThreadHandle, &current))
     {     {
         if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))          /* direct call - we can write to socket instantly */
             return MI_RESULT_FAILED;  
  
         /* calculate when to terminate selector */          trace_Sock_SendingOnOwnThread(
         timeoutSelectorAt += timeoutUsec;              message,
     }              message->tag,
               MessageName(message->tag),
               message->operationId);
  
     rep->ioThreadHandle = ThreadSelf();          (*callback)(callback_self, message);
           return MI_RESULT_OK;
       }
  
     /* Loop while detecting and dispatching events */      /* add item to the list and set event */
     for (rep->keepRunning = MI_TRUE; rep->keepRunning;)      newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
     {  
         Handler* p;  
         MI_Uint64 currentTimeUsec = 0;  
         MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;  
         int n;  
         MI_Uint32 count = 0;  
         MI_Boolean stressed = Selector_IsStressed(self);  
  
         if (MI_RESULT_OK != Time_Now(&currentTimeUsec))      if (!newItem)
             return MI_RESULT_FAILED;             return MI_RESULT_FAILED;
  
         if (TIME_NEVER != timeoutSelectorAt)      newItem->callback = callback;
         {      newItem->callback_self = callback_self;
             if (currentTimeUsec >= timeoutSelectorAt)      newItem->message = message;
                 return MI_RESULT_TIME_OUT;  
  
             breakCurrentSelectAt = timeoutSelectorAt;      Message_AddRef(message);
         }      r = Sock_Write(rep->notificationSockets[1], &newItem, sizeof(newItem), &sent);
  
         /* Set up FD sets from handlers */      trace_Sock_SentResult(
         memset(&rep->readSet, 0, sizeof(rep->readSet));          message,
         memset(&rep->writeSet, 0, sizeof(rep->writeSet));          message->tag,
         memset(&rep->exceptSet, 0, sizeof(rep->exceptSet));          MessageName(message->tag),
           message->operationId,
           r);
  
         for (p = (Handler*)rep->head; p; p = (Handler*)p->next)      if ( MI_RESULT_OK != r )
         {          Message_Release(message);
             if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)  
       return r;
   }
   
   static MI_Result _SetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32 mask, MI_Boolean noReadsMode )
             {             {
                 if (p->mask & SELECTOR_READ)      if( !noReadsMode && (mask & SELECTOR_READ) )
                 {                 {
                     _FDSet(p->sock, &rep->readSet);                     _FDSet(p->sock, &rep->readSet);
                     count++;  
                 }                 }
                 if (p->mask & SELECTOR_WRITE)      if (mask & SELECTOR_WRITE)
                 {                 {
                     _FDSet(p->sock, &rep->writeSet);                     _FDSet(p->sock, &rep->writeSet);
                     count++;  
                 }  
                 if (p->mask & SELECTOR_EXCEPTION)  
                 {  
                     _FDSet(p->sock, &rep->exceptSet);  
                     count++;  
                 }  
             }  
             /* find the minimum timeout form the list */  
             if (TIME_NEVER != p->fireTimeoutAt)  
             {  
                 /* if expired - don't wait; notification will be issued later in next loop */  
                 if (currentTimeUsec >= p->fireTimeoutAt)  
                     breakCurrentSelectAt = currentTimeUsec;  
                 else if ( p->fireTimeoutAt < breakCurrentSelectAt)  
                     breakCurrentSelectAt = p->fireTimeoutAt;  
             }             }
   
       return MI_RESULT_OK;
         }         }
  
         _FDSet(rep->notificationSockets[0], &rep->readSet);  static MI_Result _GetSockEvents(SelectorRep* rep, Handler* p, MI_Uint32* mask)
   
         /* empty list - return */  
         if (!rep->head && !rep->allowEmptySelector)  
             return MI_RESULT_FAILED;  
   
   
         /* Perform system select */  
         n = _Select(&rep->readSet, &rep->writeSet, &rep->exceptSet,  
             breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec);  
   
         /* ignore signals, since it canbe part of normal operation */  
         if (-1 == n && errno != EINTR)  
             return MI_RESULT_FAILED;  
   
         if (FD_ISSET(rep->notificationSockets[0], &rep->readSet))  
             _ProcessCallbacks(rep);  
   
         /* Notify handlers of outstanding events */  
         for (p = (Handler*)rep->head; p; )  
         {  
             MI_Uint32 mask = 0;  
             Handler* next = p->next;  
   
             if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)  
             {             {
       *mask = 0;
                 if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION))                 if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION))
                 {                 {
                     /* Check for read event */                     /* Check for read event */
                     if (FD_ISSET(p->sock, &rep->readSet))                     if (FD_ISSET(p->sock, &rep->readSet))
                     {                     {
                         mask |= SELECTOR_READ;              *mask |= SELECTOR_READ;
                         _FDClr(p->sock, &rep->readSet);                         _FDClr(p->sock, &rep->readSet);
                     }                     }
  
                     /* Check for write event */                     /* Check for write event */
                     if (FD_ISSET(p->sock, &rep->writeSet))                     if (FD_ISSET(p->sock, &rep->writeSet))
                     {                     {
                         mask |= SELECTOR_WRITE;              *mask |= SELECTOR_WRITE;
                         _FDClr(p->sock, &rep->writeSet);                         _FDClr(p->sock, &rep->writeSet);
                     }                     }
  
                     /* Check for exception event */                     /* Check for exception event */
                     if (FD_ISSET(p->sock, &rep->exceptSet))                     if (FD_ISSET(p->sock, &rep->exceptSet))
                     {                     {
                         mask |= SELECTOR_EXCEPTION;  
                         _FDClr(p->sock, &rep->exceptSet);                         _FDClr(p->sock, &rep->exceptSet);
                     }                     }
                 }                 }
             }  
             if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)      return MI_RESULT_OK;
             {  
                 mask |= SELECTOR_TIMEOUT;  
             }             }
  
             /* If any events, notify watcher */  #endif /* defined(CONFIG_POSIX) */
             if (mask)  
             {  
                 MI_Boolean more;  
  
                 more = (*p->callback)(self, p, mask, currentTimeUsec);  /************************************/
   /* generic functionality */
  
                 if (!more)  MI_Result Selector_ContainsHandler(
       Selector* self,
       Handler* handler)
                 {                 {
                     /* Remove handler */      SelectorRep* rep = (SelectorRep*)self->rep;
                     List_Remove(&rep->head, &rep->tail, (ListElem*)p);      Handler* p;
  
                     /* Notify handler of removal */      for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
                     (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);      {
           if (p == handler)
           {
               return MI_RESULT_OK;
                 }                 }
   
             }             }
  
             /* Advance to next handler */      return MI_RESULT_NOT_FOUND;
             p = next;  
         }  
     }     }
  
     return MI_RESULT_OK;  void Selector_SetAllowEmptyFlag(
       Selector* self,
       MI_Boolean allowEmptySelector)
   {
       SelectorRep* rep = (SelectorRep*)self->rep;
   
       rep->allowEmptySelector = allowEmptySelector;
 } }
  
 /* Wakes up selector's thread /* Wakes up selector's thread
     Typical usage is to recalculate timeouts on handlers     Typical usage is to recalculate timeouts on handlers
     when selector's Run is running in different thread */     when selector's Run is running in different thread */
 MI_Result Selector_Wakeup( MI_Result Selector_Wakeup(
     Selector* self)      _In_    Selector*   self,
               MI_Boolean  retryDispatching )
 { {
     SelectorRep* rep = (SelectorRep*)self->rep;     SelectorRep* rep = (SelectorRep*)self->rep;
       ThreadID current = Thread_ID();
  
     /* notify running thread */      /* notify running thread (if different) */
       if( !Thread_Equal(&rep->ioThreadHandle, &current) )
     {     {
         void* p = 0;          _Selector_WakeupFromWait(rep);
         size_t sent = 0;      }
       else if( retryDispatching )
         Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent);      {
           rep->keepDispatching = MI_TRUE;
     }     }
  
     return MI_RESULT_OK;     return MI_RESULT_OK;
 } }
  
   
 MI_Result Selector_StopRunning( MI_Result Selector_StopRunning(
     Selector* self)     Selector* self)
 { {
Line 984 
Line 838 
  
     rep->keepRunning = MI_FALSE;     rep->keepRunning = MI_FALSE;
  
     return Selector_Wakeup(self);      return Selector_Wakeup(self, MI_FALSE);
 } }
  
 /*  MI_Result Selector_StopRunningNoReadsMode(
     * This funciton guaranties that callback is called in 'Run'/'IO' thread context,      Selector* self)
     * so locking is required for accessing sokcet objects, updating buffers etc  
 */  
 MI_Result Selector_CallInIOThread(  
     Selector* self,  
     Selector_NotificationCallback  callback,  
     void* callback_self,  
     Message* message)  
 { {
     SelectorRep* rep = (SelectorRep*)self->rep;     SelectorRep* rep = (SelectorRep*)self->rep;
     SelectorCallbacksItem* newItem;  
     MI_Result r;  
     size_t sent = 0;  
  
     if (rep->ioThreadHandle == ThreadSelf())      rep->keepRunningNoReadsMode = MI_FALSE;
     {  
         /* direct call - we can write to socket instantly */  
  
         (*callback)(callback_self, message);      return Selector_Wakeup(self, MI_FALSE);
         return MI_RESULT_OK;  
     }     }
  
     /* add item to the list and set event */  MI_Result Selector_Run(
     newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));      Selector* self,
       MI_Uint64 timeoutUsec,
       MI_Boolean noReadsMode )
   {
       SelectorRep* rep = (SelectorRep*)self->rep;
       MI_Uint64 timeoutSelectorAt = TIME_NEVER;
       MI_Boolean* keepRunningVar;
   #if defined(CONFIG_OS_WINDOWS)
       HANDLE handles[2];
  
     if (!newItem)      trace_SelectorRun_Enter( self, timeoutUsec, noReadsMode );
         return MI_RESULT_FAILED;  
  
     newItem->callback = callback;      handles[0] = rep->event;
     newItem->callback_self = callback_self;      handles[1] = rep->callbacksAreAvailable;
     newItem->message = message;  #endif // defined(CONFIG_OS_WINDOWS)
  
     //AtomicInc( &rep->queueLength );      if( noReadsMode )
       {
           keepRunningVar = &rep->keepRunningNoReadsMode;
       }
       else
       {
           keepRunningVar = &rep->keepRunning;
       }
  
     while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)      LOGD2((ZT("Selector_Run - Begin. timeoutUsec: %ld"), (long)timeoutUsec));
         Sleep_ms(1);  
  
 #if 0      if ( TIME_NEVER != timeoutUsec )
     {     {
         int counter = 0;          if (PAL_TRUE != PAL_Time(&timeoutSelectorAt))
           {
               trace_SelectorRun_InitPALTIME_Error( self );
               return MI_RESULT_FAILED;
           }
   
           /* calculate when to terminate selector */
           LOGD2((ZT("Selector_Run - After timeout calculation. time now: %s, timeoutSelectorAt: %s"), FmtTime(timeoutSelectorAt), FmtTime(timeoutSelectorAt + timeoutUsec)));
           timeoutSelectorAt += timeoutUsec;
       }
   
       rep->ioThreadHandle = Thread_ID();
  
         while (rep->queueLength > LIST_SIZE_LIMIT )      /* Loop while detecting and dispatching events */
       for (*keepRunningVar = MI_TRUE; *keepRunningVar; )
         {         {
             counter++;          Handler* p;
             /* give system a chance to clear backlog */          MI_Uint64 currentTimeUsec = 0;
             Sleep_ms(1);          MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
           MI_Boolean more;
           MI_Result r;
   #if defined(CONFIG_OS_WINDOWS)
           DWORD result;
           DWORD timeoutMsec;
   #else
           int n;
   #endif
  
             if (counter > 1000)          if (PAL_TRUE != PAL_Time(&currentTimeUsec))
             {             {
                 LOGW((MI_T("cannot send message: queue overflow)\n") ));              trace_SelectorRun_InitPALTIME_Error( self );
               LOGE2((ZT("Selector_Run - PAL_Time failed")));
                 return MI_RESULT_FAILED;                 return MI_RESULT_FAILED;
             }             }
   
           if (TIME_NEVER != timeoutSelectorAt)
           {
               if (currentTimeUsec >= timeoutSelectorAt)
               {
                   LOGW2((ZT("Selector_Run - Selector timeout. current time: %s, limit time: %s, interval: %ld ms"),
                           FmtTime(timeoutSelectorAt), FmtTime(currentTimeUsec),
                           (long)(currentTimeUsec - timeoutSelectorAt)));
                   return MI_RESULT_TIME_OUT;
         }         }
   
               breakCurrentSelectAt = timeoutSelectorAt;
     }     }
 #endif  
  
     Message_AddRef(message);  #if defined(CONFIG_POSIX)
     r = Sock_Write( rep->notificationSockets[1], &newItem, sizeof(newItem), &sent);          /* Set up FD sets from handlers */
           memset(&rep->readSet, 0, sizeof(rep->readSet));
           memset(&rep->writeSet, 0, sizeof(rep->writeSet));
           memset(&rep->exceptSet, 0, sizeof(rep->exceptSet));
   #endif /* defined(CONFIG_POSIX) */
  
           /* calculate timeout */
           for (p = (Handler*)rep->head; p; )
           {
               Handler* next = p->next;
  
     //printf("pipe write: p = %p, sent = %d\n", newItem, (int)sent);              /* update event mask */
     if ( MI_RESULT_OK != r )  #if defined(CONFIG_OS_WINDOWS)
         Message_Release(message);              if( p->sock != INVALID_SOCK )
   #endif
               {
                   r = _SetSockEvents(rep, p, p->mask, noReadsMode );
  
                   if (r != MI_RESULT_OK)
                   {
                       LOGE2((ZT("Selector_Run - _SetSockEvents failed")));
                       trace_SelectorRun_SetSocketEventsError( self, r, p );
     return r;     return r;
 } }
               }
  
 #endif /* defined(CONFIG_POSIX) */              /* find the minimum timeout form the list */
               if (TIME_NEVER != p->fireTimeoutAt)
               {
                   /* if expired - don't wait; notification will be issued later in next loop */
                   if (currentTimeUsec >= p->fireTimeoutAt)
                       breakCurrentSelectAt = currentTimeUsec;
                   else if ( p->fireTimeoutAt < breakCurrentSelectAt)
                       breakCurrentSelectAt = p->fireTimeoutAt;
               }
  
 /************************************/              p = next;
 /* generic functionality */          }
  
 MI_Result Selector_ContainsHandler(  #if defined(CONFIG_POSIX)
     Selector* self,          _FDSet(rep->notificationSockets[0], &rep->readSet);
     Handler* handler)  #endif /* defined(CONFIG_POSIX) */
   
           /* empty list - return */
           if (!rep->head && !rep->allowEmptySelector)
 { {
     SelectorRep* rep = (SelectorRep*)self->rep;              LOGE2((ZT("Selector_Run - Empty list")));
     Handler* p;              trace_SelectorRun_EmptyList( self );
               return MI_RESULT_FAILED;
           }
  
     for (p = (Handler*)rep->head; p; p = (Handler*)p->next)  #if defined(CONFIG_OS_WINDOWS)
           /* Wait for events on any of the sockets */
           timeoutMsec =
               (breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */
               INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000);
           result = WaitForMultipleObjectsEx(MI_COUNT(handles), handles, FALSE, timeoutMsec, FALSE);
   
           if (result == WAIT_FAILED)
     {     {
         if (p == handler)              trace_SelectorRun_WaitError( self, result );
               return MI_RESULT_FAILED;
           }
   #else
           /* Perform system select */
           n = _Select(&rep->readSet, &rep->writeSet, NULL,
               breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec,
               keepRunningVar);
   
           /* ignore signals, since it canbe part of normal operation */
           if (-1 == n && errno != EINTR)
         {         {
             return MI_RESULT_OK;              LOGE2((ZT("Selector_Run - _Select failed. errno: %d (%s)"), errno, strerror(errno)));
               trace_SelectorRun_WaitError( self, errno );
               return MI_RESULT_FAILED;
         }         }
   #endif
   
           do
           {
               rep->keepDispatching = MI_FALSE;
   
   #if defined(CONFIG_OS_WINDOWS)
               //if ((WAIT_OBJECT_0 + 1) == result)  /* other thread wants to call callback */
   #else
               if (FD_ISSET(rep->notificationSockets[0], &rep->readSet))
   #endif
               {
                   _ProcessCallbacks(rep);
     }     }
  
     return MI_RESULT_NOT_FOUND;              /* Dispatch events on each socket */
               for (p = (Handler*)rep->head; p; )
               {
                   Handler* next = p->next;
                   MI_Uint32 mask = 0;
   
                   /* Refresh current time stamp */
                   if (PAL_TRUE != PAL_Time(&currentTimeUsec))
                   {
                       LOGE2((ZT("Selector_Run - PAL_Time failed")));
                       trace_SelectorRun_PALTimeError( self );
                       return MI_RESULT_FAILED;
 } }
  
 static void _dtor(Message* message, void* callbackData)                  /* Get event mask for this socket */
   #if defined(CONFIG_OS_WINDOWS)
                   if (p->sock != INVALID_SOCK)
   #endif
 { {
     SelectorRep* rep = (SelectorRep*)callbackData;                      r = _GetSockEvents(rep, p, &mask);
  
     MI_UNUSED(message);                      if (r != MI_RESULT_OK)
                       {
                           LOGE2((ZT("Selector_Run - _GetSockEvents failed with result: %d (%s)"), (int)r, mistrerror(r)));
                           trace_SelectorRun_GetSocketEventsError( self, r, p );
                           return r;
                       }
                   }
  
     AtomicDec( &rep->outstandingInstances );                  if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
                   {
                       mask |= SELECTOR_TIMEOUT;
 } }
  
 void Selector_NewInstanceCreated(                  /* If there were any events on this socket, dispatch them */
     Selector* self,                  if (mask)
     Message* msg)  
 { {
     SelectorRep* rep = (SelectorRep*)self->rep;                      /*MI_Uint32 oldMask = p->mask;*/
                       more = (*p->callback)(self, p, mask, currentTimeUsec);
  
     msg->dtor = _dtor;                      /* If callback wants to continue getting events */
     msg->dtorData = rep;                      if (!more)
                       {
                           /* Remove handler */
                           List_Remove(&rep->head, &rep->tail, (ListElem*)p);
  
     AtomicInc( &rep->outstandingInstances );                          /* Refresh current time stamp */
                           if (PAL_TRUE != PAL_Time(&currentTimeUsec))
                           {
                               LOGE2((ZT("Selector_Run - PAL_Time failed")));
                               trace_SelectorRun_PALTimeError( self );
                               return MI_RESULT_FAILED;
 } }
  
 MI_Boolean  Selector_IsStressed(  #if defined(CONFIG_OS_WINDOWS)
     Selector* self)                          /* Unselect events on this socket */
 {                          _SetSockEvents(rep, p, 0, noReadsMode);
     SelectorRep* rep = (SelectorRep*)self->rep;  #endif
   
                           /* Notify handler of removal */
                           (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
                       }
                   }
  
     /* Are we close to be stressed? */                  p = next;
     if (rep->outstandingInstances >= (MAX_ALLOCATED_INSTANCES * 9 / 10))              }
         return MI_TRUE;          }
           while( rep->keepDispatching );
       }
  
     return MI_FALSE;      LOGE2((ZT("Selector_Run - OK exit")));
       trace_SelectorRun_Exit( self );
       return MI_RESULT_OK;
 } }
  
 void Selector_SetAllowEmptyFlag(  int Selector_IsSelectorThread(Selector* self, ThreadID *id)
     Selector* self,  {
     MI_Boolean allowEmptySelector)      if( NULL == self || NULL == self->rep )
       {
           // Some UTs may not have an actual selector
           return 0;
       }
       else
 { {
     SelectorRep* rep = (SelectorRep*)self->rep;     SelectorRep* rep = (SelectorRep*)self->rep;
           return Thread_Equal(&rep->ioThreadHandle, id);
     rep->allowEmptySelector = allowEmptySelector;  
 } }
   }
   
  


Legend:
Removed from v.1.2  
changed lines
  Added in v.1.3

ViewCVS 0.9.2