(file) Return to selector.c CVS log (file) (dir) Up to [OMI] / omi / sock

   1 mike  1.1 /*
   2           **==============================================================================
   3           **
   4           ** Open Management Infrastructure (OMI)
   5           **
   6           ** Copyright (c) Microsoft Corporation
   7           ** 
   8           ** Licensed under the Apache License, Version 2.0 (the "License"); you may not 
   9           ** use this file except in compliance with the License. You may obtain a copy 
  10           ** of the License at 
  11           **
  12           **     http://www.apache.org/licenses/LICENSE-2.0 
  13           **
  14           ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15           ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED 
  16           ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, 
  17           ** MERCHANTABLITY OR NON-INFRINGEMENT. 
  18           **
  19           ** See the Apache 2 License for the specific language governing permissions 
  20           ** and limitations under the License.
  21           **
  22 mike  1.1 **==============================================================================
  23           */
  24           
  25           #include "selector.h"
  26           #include <base/time.h>
  27           #include "thread.h"
  28           #include <base/log.h>
  29           #include <base/result.h>
  30           #include <base/atomic.h>
  31           
  32           /* maximum number of items in the list */
  33           #define LIST_SIZE_LIMIT     321
  34           
  35           /* maximum instances that can be allocated */
  36           #define MAX_ALLOCATED_INSTANCES 500
  37           
  38           
  39           /*
  40           **==============================================================================
  41           **
  42           **
  43 mike  1.1 ** Windows Implementation
  44           **
  45           **
  46           **==============================================================================
  47           */
  48           #if defined(CONFIG_OS_WINDOWS)
  49           # include <winsock2.h>
  50           
  51           /* Notification internal datastructure */
  52           typedef struct _SelectorCallbacksItem
  53           {
  54               /* Links for inserting messages onto linked-lists */
  55               struct _SelectorCallbacksItem* next;
  56               struct _SelectorCallbacksItem* prev;
  57           
  58               Selector_NotificationCallback  callback;
  59               void* callback_self;
  60               /* message has to be add-refed when added and dec-refed upon callback invocation */
  61               Message* message;
  62           }
  63           SelectorCallbacksItem;
  64 mike  1.1 
  65           typedef struct _SelectorCallbacksList
  66           {
  67               /* Linked list of callbacks to call */
  68               ListElem* head;
  69               ListElem* tail;
  70               int         numberOfItem;    /**/
  71           }
  72           SelectorCallbacksList;
  73           
  74           typedef struct _SelectorRep
  75           {
  76               /* Linked list of event watchers */
  77               ListElem* head;
  78               ListElem* tail;
  79           
  80               /* Object for detecting socket events */
  81               WSAEVENT event;
  82           
  83               /* other thread notification */
  84               HANDLE  callbacksAreAvailable;
  85 mike  1.1 
  86               /* list object - never referred directly - only using a pointer */
  87               SelectorCallbacksList   __callback_list_object;
  88               /* list of callbacks to notify ;
  89               storing pointer here, so we can use interlock funciotns for list updating
  90               */
  91               SelectorCallbacksList*  callbacksList;
  92           
  93               /* Number of alive instance */
  94               AtomicInt outstandingInstances;
  95           
  96               /* io thread id */
  97               ThreadHandle    ioThreadHandle;
  98           
  99               /* flag to stop running */
 100               MI_Boolean keepRunning;
 101           
 102               /* flag that allows empty selector running 
 103                   when empty selector runs, it only can be interrupted by
 104                   internal funcitons, since it has no sockets to monitor
 105                   */
 106 mike  1.1     MI_Boolean allowEmptySelector;
 107           }
 108           SelectorRep;
 109           
 110           static SelectorCallbacksList* _GetList(SelectorRep* rep)
 111           {
 112               SelectorCallbacksList * list;
 113           
 114               list = InterlockedExchangePointer( &rep->callbacksList, 0 );
 115           
 116               while ( !list )
 117               {
 118                   Sleep(0);
 119                   list = InterlockedExchangePointer( &rep->callbacksList, 0 );
 120               }
 121           
 122               return list;
 123           }
 124           
 125           static void _SetList(SelectorRep* rep, SelectorCallbacksList * list)
 126           {
 127 mike  1.1     InterlockedExchangePointer( &rep->callbacksList, list );
 128           }
 129           
 130           static MI_Result _SetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32 mask)
 131           {
 132               long e = 0;
 133           
 134               if (mask & SELECTOR_READ)
 135                   e |= FD_ACCEPT | FD_READ | FD_CLOSE;
 136           
 137               if (mask & SELECTOR_WRITE)
 138                   e |= FD_WRITE | FD_CONNECT;
 139           
 140               if (mask & SELECTOR_EXCEPTION)
 141                   e |= FD_OOB;
 142           
 143               if (WSAEventSelect(sock, event, e) == SOCKET_ERROR)
 144                   return MI_RESULT_FAILED;
 145           
 146               return MI_RESULT_OK;
 147           }
 148 mike  1.1 
 149           static MI_Result _GetSockEvents(WSAEVENT event, SOCKET sock, MI_Uint32* mask)
 150           {
 151               WSANETWORKEVENTS networkEvents;
 152               long x;
 153           
 154               ZeroMemory(&networkEvents, sizeof(networkEvents));
 155           
 156               if (WSAEnumNetworkEvents(sock, event, &networkEvents) != 0)
 157                   return MI_RESULT_FAILED;
 158           
 159               x = networkEvents.lNetworkEvents;
 160               *mask = 0;
 161           
 162               if (x & FD_ACCEPT || x & FD_READ || x & FD_CLOSE || x & FD_CONNECT)
 163                   *mask |= SELECTOR_READ;
 164           
 165               if (x & FD_WRITE)
 166                   *mask |= SELECTOR_WRITE;
 167           
 168               if (x & FD_OOB)
 169 mike  1.1         *mask |= SELECTOR_EXCEPTION;
 170           
 171               return MI_RESULT_OK;
 172           }
 173           
 174           MI_Result Selector_Init(
 175               Selector* self)
 176           {
 177               SelectorRep* rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));
 178           
 179               if (!rep)
 180                   return MI_RESULT_FAILED;
 181           
 182               rep->event = WSACreateEvent();
 183           
 184               if (rep->event == WSA_INVALID_EVENT)
 185               {
 186                   free(rep);
 187                   return MI_RESULT_FAILED;
 188               }
 189           
 190 mike  1.1     rep->callbacksList = &rep->__callback_list_object;
 191           
 192               rep->callbacksAreAvailable = CreateEvent( 0, TRUE, FALSE, NULL );
 193           
 194               if (NULL == rep->callbacksAreAvailable)
 195               {
 196                   CloseHandle(rep->event);
 197                   free(rep);
 198                   return MI_RESULT_FAILED;
 199               }
 200           
 201           
 202               self->rep = rep;
 203               return MI_RESULT_OK;
 204           }
 205           
 206           void Selector_Destroy(Selector* self)
 207           {
 208               SelectorRep* rep = (SelectorRep*)self->rep;
 209               Handler* p;
 210               Handler* next;
 211 mike  1.1 
 212               /* Free all watchers */
 213               for (p = (Handler*)rep->head; p; p = next)
 214               {
 215                   next = (Handler*)p->next;
 216           
 217                   /* Unselect events on this socket */
 218                   _SetSockEvents(rep->event, p->sock, 0);
 219           
 220                   /* Invoke user callback */
 221                   (*p->callback)(self, p, SELECTOR_DESTROY, 0);
 222           
 223                   p = next;
 224               }
 225           
 226               CloseHandle(rep->event);
 227               CloseHandle(rep->callbacksAreAvailable);
 228           
 229               /* free messages in callbacks list */
 230               while (rep->callbacksList->head)
 231               {
 232 mike  1.1         SelectorCallbacksItem* item = (SelectorCallbacksItem*)rep->callbacksList->head;
 233           
 234                   List_Remove(&rep->callbacksList->head, &rep->callbacksList->tail, (ListElem*)item);
 235                   Message_Release(item->message);
 236               }
 237           
 238               free(rep);
 239           }
 240           
 241           MI_Result Selector_AddHandler(
 242               Selector* self,
 243               Handler* handler)
 244           {
 245               SelectorRep* rep = (SelectorRep*)self->rep;
 246               Handler* p;
 247               MI_Uint64 currentTimeUsec = 0;
 248           
 249               if (MI_RESULT_OK != Time_Now(&currentTimeUsec))
 250                   return MI_RESULT_FAILED;
 251           
 252               /* Reject duplicates */
 253 mike  1.1     for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
 254               {
 255                   if (p == handler)
 256                       return MI_RESULT_ALREADY_EXISTS;
 257               }
 258           
 259               /* Add new handler to list */
 260               List_Append(&rep->head, &rep->tail, (ListElem*)handler);
 261           
 262               (*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec);
 263           
 264               return MI_RESULT_OK;
 265           }
 266           
 267           MI_Result Selector_RemoveHandler(
 268               Selector* self,
 269               Handler* handler)
 270           {
 271               SelectorRep* rep = (SelectorRep*)self->rep;
 272               Handler* p;
 273           
 274 mike  1.1     /* Find and remove handler from list */
 275               for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
 276               {
 277                   if (p == handler)
 278                   {
 279                       /* Remove handler */
 280                       List_Remove(&rep->head, &rep->tail, (ListElem*)p);
 281           
 282                       /* Unselect events on this socket */
 283                       _SetSockEvents(rep->event, p->sock, 0);
 284                       
 285                       /* Notify handler of removal */
 286                       (*handler->callback)(self, p, SELECTOR_REMOVE, 0);
 287           
 288                       return MI_RESULT_OK;
 289                   }
 290               }
 291           
 292               return MI_RESULT_NOT_FOUND;
 293           }
 294           
 295 mike  1.1 MI_Result Selector_RemoveAllHandlers(
 296               Selector* self)
 297           {
 298               SelectorRep* rep = (SelectorRep*)self->rep;
 299               Handler* p;
 300           
 301               /* Find and remove handler from list */
 302               for (p = (Handler*)rep->head; p; )
 303               {
 304                   Handler* next = p->next;
 305           
 306                   /* Remove handler */
 307                   List_Remove(&rep->head, &rep->tail, (ListElem*)p);
 308           
 309                   /* Unselect events on this socket */
 310                   _SetSockEvents(rep->event, p->sock, 0);
 311                   
 312                   /* Notify handler of removal */
 313                   (*p->callback)(self, p, SELECTOR_REMOVE, 0);
 314           
 315                   p = next;
 316 mike  1.1     }
 317           
 318               return MI_RESULT_OK;
 319           }
 320           
 321           static void _ProcessCallbacks(
 322               SelectorRep* rep)
 323           {
 324               SelectorCallbacksItem* item = 0;
 325               SelectorCallbacksList * list;
 326           
 327               /* remove all items from the list */
 328               list = _GetList(rep);
 329               item = (SelectorCallbacksItem*)list->head;
 330               list->head = list->tail = 0;
 331               list->numberOfItem = 0;
 332               ResetEvent(rep->callbacksAreAvailable);
 333               _SetList(rep, list); 
 334           
 335               /* process all items */
 336               while (item)
 337 mike  1.1     {
 338                   SelectorCallbacksItem* next = item->next;
 339           
 340                   (*item->callback) (item->callback_self, item->message);
 341                   Message_Release(item->message);
 342                   item = next;
 343               }
 344           }
 345           
 346           MI_Result Selector_Run(
 347               Selector* self,
 348               MI_Uint64 timeoutUsec)
 349           {
 350               SelectorRep* rep = (SelectorRep*)self->rep;
 351               MI_Uint64 timeoutSelectorAt = TIME_NEVER;
 352               HANDLE handles[2];
 353               
 354               handles[0] = rep->event;
 355               handles[1] = rep->callbacksAreAvailable;
 356           
 357               if ( TIME_NEVER != timeoutUsec )
 358 mike  1.1     {
 359                   if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))
 360                       return MI_RESULT_FAILED;
 361           
 362                   /* calculate when to terminate selector */
 363                   timeoutSelectorAt += timeoutUsec;
 364               }
 365           
 366               rep->ioThreadHandle = ThreadSelf();
 367           
 368               for (rep->keepRunning = MI_TRUE; rep->keepRunning;)
 369               {
 370                   Handler* p;
 371                   DWORD result;
 372                   DWORD timeoutMsec;
 373                   MI_Result r;
 374                   MI_Boolean more;
 375                   MI_Uint64 currentTimeUsec = 0;
 376                   MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
 377           
 378                   if (MI_RESULT_OK != Time_Now(&currentTimeUsec))
 379 mike  1.1             return MI_RESULT_FAILED;
 380           
 381                   if (TIME_NEVER != timeoutSelectorAt)
 382                   {
 383                       if (currentTimeUsec >= timeoutSelectorAt)
 384                           return MI_RESULT_TIME_OUT;
 385           
 386                       breakCurrentSelectAt = timeoutSelectorAt;
 387                   }
 388           
 389                   /* calculate timeout */
 390                   for (p = (Handler*)rep->head; p; )
 391                   {
 392                       Handler* next = p->next;
 393           
 394                       /* update event mask */
 395                       if (p->sock != INVALID_SOCK)
 396                       {
 397                           r = _SetSockEvents(rep->event, p->sock, p->mask);
 398                           
 399                           if (r != MI_RESULT_OK)
 400 mike  1.1                     return r;
 401                       }
 402           
 403                       /* find the minimum timeout form the list */
 404                       if (TIME_NEVER != p->fireTimeoutAt)
 405                       {
 406                           /* if expired - don't wait; notification will be issued later in next loop */
 407                           if (currentTimeUsec >= p->fireTimeoutAt)
 408                               breakCurrentSelectAt = currentTimeUsec;
 409                           else if ( p->fireTimeoutAt < breakCurrentSelectAt)
 410                               breakCurrentSelectAt = p->fireTimeoutAt;
 411                       }
 412           
 413                       p = next;
 414                   }
 415           
 416                   /* empty list - return */
 417                   if (!rep->head && !rep->allowEmptySelector)
 418                       return MI_RESULT_FAILED;
 419           
 420                   /* Wait for events on any of the sockets */
 421 mike  1.1         timeoutMsec = 
 422                       (breakCurrentSelectAt == ((MI_Uint64)-1) ) ? /* do we have t least one valid timeout? */
 423                       INFINITE : (DWORD)((breakCurrentSelectAt - currentTimeUsec) / 1000);
 424                   result = WaitForMultipleObjects(MI_COUNT(handles), handles, FALSE, timeoutMsec);
 425           
 426                   if (result == WAIT_FAILED)
 427                       return MI_RESULT_FAILED;
 428           
 429                   //if ((WAIT_OBJECT_0 + 1) == result)  /* other thread wants to call callback */
 430                       _ProcessCallbacks(rep);
 431           
 432                   if (MI_RESULT_OK != Time_Now(&currentTimeUsec))
 433                       return MI_RESULT_FAILED;
 434           
 435                   /* Dispatch events on each socket */
 436                   for (p = (Handler*)rep->head; p; )
 437                   {
 438                       Handler* next = p->next;
 439                       MI_Uint32 mask = 0;
 440           
 441                       /* Get event mask for this socket */
 442 mike  1.1             if (p->sock != INVALID_SOCK)
 443                       {
 444                           r = _GetSockEvents(rep->event, p->sock, &mask);
 445           
 446                           if (r != MI_RESULT_OK)
 447                               return r;
 448                       }
 449           
 450                       if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
 451                       {
 452                           mask |= SELECTOR_TIMEOUT;
 453                       }
 454           
 455                       /* If there were any events on this socket, dispatch them */
 456                       if (mask)
 457                       {
 458                           /*MI_Uint32 oldMask = p->mask;*/
 459                           more = (*p->callback)(self, p, mask, currentTimeUsec);
 460           
 461                           /* If callback wants to continue getting events */
 462                           if (!more)
 463 mike  1.1                 {
 464                               /* Remove handler */
 465                               List_Remove(&rep->head, &rep->tail, (ListElem*)p);
 466           
 467                               /* Unselect events on this socket */
 468                               _SetSockEvents(rep->event, p->sock, 0);
 469           
 470                               /* Notify handler of removal */
 471                               (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
 472                           }
 473                       }
 474           
 475                       p = next;
 476                   }
 477               }
 478               return MI_RESULT_OK;
 479           }
 480           
 481           /* Wakes up selector's thread 
 482               Typical usage is to recalculate timeouts on handlers
 483               when selector's Run is running in different thread */
 484 mike  1.1 MI_Result Selector_Wakeup(
 485               Selector* self)
 486           {
 487               SelectorRep* rep = (SelectorRep*)self->rep;
 488           
 489               /* notify running thread (if different) */
 490               if (rep->ioThreadHandle != ThreadSelf())
 491               {
 492                   SetEvent(rep->callbacksAreAvailable);
 493               }
 494           
 495               return MI_RESULT_OK;
 496           }
 497           
 498           MI_Result Selector_StopRunning(
 499               Selector* self)
 500           {
 501               SelectorRep* rep = (SelectorRep*)self->rep;
 502           
 503               rep->keepRunning = MI_FALSE;
 504           
 505 mike  1.1     return Selector_Wakeup(self);
 506           }
 507           
 508           /* 
 509               * This function guaranties that callback is called in 'Run'/'IO' thread context,
 510               * so no locking is required for accessing sokcet objects, updating buffers etc
 511           */
 512           MI_Result Selector_CallInIOThread(
 513               Selector* self,
 514               Selector_NotificationCallback  callback,
 515               void* callback_self,
 516               Message* message)
 517           {
 518               SelectorRep* rep = (SelectorRep*)self->rep;
 519               SelectorCallbacksItem* newItem;
 520               SelectorCallbacksList * list;
 521               int itemsInList;
 522           
 523               if (rep->ioThreadHandle == ThreadSelf())
 524               {
 525                   /* direct call - we can write to socket instantly */
 526 mike  1.1 
 527                   (*callback)(callback_self, message);
 528                   return MI_RESULT_OK;
 529               }
 530           
 531               /* add item to the list and set event */
 532               newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
 533           
 534               if (!newItem)
 535                   return MI_RESULT_FAILED;
 536           
 537               newItem->callback = callback;
 538               newItem->callback_self = callback_self;
 539               newItem->message = message;
 540           
 541               Message_AddRef(message);
 542               list = _GetList(rep);
 543               List_Append(&list->head, &list->tail, (ListElem*)newItem);
 544               list->numberOfItem++;
 545               itemsInList = list->numberOfItem;
 546               _SetList(rep, list); 
 547 mike  1.1     list  = 0;
 548               SetEvent(rep->callbacksAreAvailable);
 549           
 550               while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)
 551                   Sleep_ms(1);
 552           #if 0
 553               /* wait until list is empty to avoid memory explosion */
 554               for ( i = 0; itemsInList > LIST_SIZE_LIMIT && i < 500; i++ )
 555               {
 556                   Sleep(1);
 557                   list = _GetList(rep);
 558                   itemsInList = list->numberOfItem;
 559                   _SetList(rep, list); 
 560               }
 561           #endif
 562           
 563               return MI_RESULT_OK;
 564           }
 565           
 566           #endif /* defined(CONFIG_OS_WINDOWS) */
 567           
 568 mike  1.1 /*
 569           **==============================================================================
 570           **
 571           **
 572           ** POSIX Implementation
 573           **
 574           **
 575           **==============================================================================
 576           */
 577           
 578           #if defined(CONFIG_POSIX)
 579           # include <string.h>
 580           # include <unistd.h>
 581           # include <errno.h>
 582           # include <sys/socket.h>
 583           # include <netinet/tcp.h>
 584           # include <netinet/in.h>
 585           # include <sys/time.h>
 586           # include <sys/types.h>
 587           # include <netdb.h>
 588           # include <fcntl.h>
 589 mike  1.1 # include <arpa/inet.h>
 590           
 591           typedef struct _SelectorCallbacksItem
 592           {
 593               Selector_NotificationCallback  callback;
 594               void* callback_self;
 595               /* message has to be add-refed when added and dec-refed upon callback invocation */
 596               Message* message;
 597           }
 598           SelectorCallbacksItem;
 599           
 600           typedef struct _SelectorRep
 601           {
 602               /* File descriptor sets */
 603               fd_set readSet;
 604               fd_set writeSet;
 605               fd_set exceptSet;
 606           
 607               /* Linked list of event watchers */
 608               ListElem* head;
 609               ListElem* tail;
 610 mike  1.1 
 611               /* notifications channel */
 612               int notificationSockets[2];
 613           
 614               /* Number of notification items in a queue */
 615               //AtomicInt queueLength;
 616           
 617               /* Number of alive instance */
 618               AtomicInt outstandingInstances;
 619           
 620               /* flag to stop running */
 621               MI_Boolean keepRunning;
 622           
 623               /* flag that allows empty selector running 
 624                   when empty selector runs, it only can be interrupted by
 625                   internal funcitons, since it has no sockets to monitor
 626                   */
 627               MI_Boolean allowEmptySelector;
 628           
 629               /* io thread id */
 630               ThreadHandle    ioThreadHandle;
 631 mike  1.1 }
 632           SelectorRep;
 633           
 634           static int _Select(
 635               fd_set* readSet,
 636               fd_set* writeSet,
 637               fd_set* exceptSet,
 638               MI_Uint64 timeoutUsec)
 639           {
 640               int n = FD_SETSIZE;
 641               struct timeval tv;
 642               struct timeval* _tv = 0;
 643           
 644               if ((MI_Uint64)-1 != timeoutUsec)
 645               {
 646                   tv.tv_sec = (long)(timeoutUsec / 1000000);
 647                   tv.tv_usec = (long)(timeoutUsec % 1000000);
 648                   _tv = &tv;
 649               }
 650           
 651               return select(n, readSet, writeSet, exceptSet, _tv);
 652 mike  1.1 }
 653           
 654           MI_INLINE void _FDSet(Sock sock, fd_set* set)
 655           {
 656               FD_SET(sock, set);
 657           }
 658           
 659           MI_INLINE void _FDClr(Sock sock, fd_set* set)
 660           {
 661               FD_CLR(sock, set);
 662           }
 663           
 664           MI_Result Selector_Init(
 665               Selector* self)
 666           {
 667               self->rep = (SelectorRep*)calloc(1, sizeof(SelectorRep));
 668           
 669               if (!self->rep)
 670                   return MI_RESULT_FAILED;
 671           
 672               if (pipe(self->rep->notificationSockets) != 0)
 673 mike  1.1         return MI_RESULT_FAILED;
 674           
 675               /* set non-blocking for reader [0] */
 676               Sock_SetBlocking( self->rep->notificationSockets[0], MI_FALSE);
 677           
 678               /* Protect notification sockets from child processes */
 679               if (MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[0],MI_TRUE) ||
 680                   MI_RESULT_OK != Sock_SetCloseOnExec(self->rep->notificationSockets[1],MI_TRUE))
 681               {
 682                   LOGW_CHAR(("fcntl(F_SETFD) failed"));
 683               }
 684           
 685               return MI_RESULT_OK;
 686           }
 687           
 688           void Selector_Destroy(Selector* self)
 689           {
 690               SelectorRep* rep = (SelectorRep*)self->rep;
 691               Handler* p;
 692               Handler* next;
 693           
 694 mike  1.1     /* Free all watchers */
 695               for (p = (Handler*)rep->head; p; p = next)
 696               {
 697                   next = (Handler*)p->next;
 698           
 699                   (*p->callback)(self, p, SELECTOR_DESTROY, 0);
 700                   p = next;
 701               }
 702           
 703               Sock_Close(rep->notificationSockets[0]);
 704               Sock_Close(rep->notificationSockets[1]);
 705           
 706               free(rep);
 707           }
 708           
 709           MI_Result Selector_AddHandler(
 710               Selector* self,
 711               Handler* handler)
 712           {
 713               SelectorRep* rep = (SelectorRep*)self->rep;
 714               Handler* p;
 715 mike  1.1     MI_Uint64 currentTimeUsec = 0;
 716           
 717               if (MI_RESULT_OK != Time_Now(&currentTimeUsec))
 718                   return MI_RESULT_FAILED;
 719           
 720               /* Reject duplicates */
 721               for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
 722               {
 723                   if (p == handler)
 724                       return MI_RESULT_ALREADY_EXISTS;
 725               }
 726           
 727               /* Add new handler to list */
 728               List_Append(&rep->head, &rep->tail, (ListElem*)handler);
 729           
 730               (*handler->callback)(self, handler, SELECTOR_ADD, currentTimeUsec);
 731           
 732               return MI_RESULT_OK;
 733           }
 734           
 735           MI_Result Selector_RemoveHandler(
 736 mike  1.1     Selector* self,
 737               Handler* handler)
 738           {
 739               SelectorRep* rep = (SelectorRep*)self->rep;
 740               Handler* p;
 741           
 742               /* Find and remove handler from list */
 743               for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
 744               {
 745                   if (p == handler)
 746                   {
 747                       /* Remove handler */
 748                       List_Remove(&rep->head, &rep->tail, (ListElem*)p);
 749           
 750                       /* Notify handler of removal */
 751                       (*handler->callback)(self, p, SELECTOR_REMOVE, 0);
 752           
 753                       return MI_RESULT_OK;
 754                   }
 755               }
 756           
 757 mike  1.1     return MI_RESULT_NOT_FOUND;
 758           }
 759           
 760           MI_Result Selector_RemoveAllHandlers(
 761               Selector* self)
 762           {
 763               SelectorRep* rep = (SelectorRep*)self->rep;
 764               Handler* p;
 765           
 766               /* Find and remove handler from list */
 767               for (p = (Handler*)rep->head; p; )
 768               {
 769                   Handler* next = p->next;
 770           
 771                   /* Remove handler */
 772                   List_Remove(&rep->head, &rep->tail, (ListElem*)p);
 773           
 774                   /* Notify handler of removal */
 775                   (*p->callback)(self, p, SELECTOR_REMOVE, 0);
 776           
 777                   p = next;
 778 mike  1.1     }
 779           
 780               return MI_RESULT_OK;
 781           }
 782           
 783           
 784           
 785           static void _ProcessCallbacks(
 786               SelectorRep* rep)
 787           {
 788               SelectorCallbacksItem* item = 0;
 789               size_t read = 0;
 790           
 791               while( MI_RESULT_OK == Sock_Read( rep->notificationSockets[0], &item, sizeof(item), &read) &&
 792                   read == sizeof(item) )
 793               {
 794               //printf("pipe read: p = %p, sent = %d\n", item, (int)read);
 795                   if (item)
 796                   {
 797                       (*item->callback) (item->callback_self, item->message);
 798                       Message_Release(item->message);
 799 mike  1.1             //AtomicDec( &rep->queueLength );
 800                   }
 801               }
 802           }
 803           
 804           MI_Result Selector_Run(
 805               Selector* self,
 806               MI_Uint64 timeoutUsec)
 807           {
 808               SelectorRep* rep = (SelectorRep*)self->rep;
 809               MI_Uint64 timeoutSelectorAt = TIME_NEVER;
 810           
 811               if ( TIME_NEVER != timeoutUsec )
 812               {
 813                   if (MI_RESULT_OK != Time_Now(&timeoutSelectorAt))
 814                       return MI_RESULT_FAILED;
 815           
 816                   /* calculate when to terminate selector */
 817                   timeoutSelectorAt += timeoutUsec;
 818               }
 819           
 820 mike  1.1     rep->ioThreadHandle = ThreadSelf();
 821           
 822               /* Loop while detecting and dispatching events */
 823               for (rep->keepRunning = MI_TRUE; rep->keepRunning;)
 824               {
 825                   Handler* p;
 826                   MI_Uint64 currentTimeUsec = 0;
 827                   MI_Uint64 breakCurrentSelectAt = (MI_Uint64)-1;
 828                   int n;
 829                   MI_Uint32 count = 0;
 830                   MI_Boolean stressed = Selector_IsStressed(self);
 831           
 832                   if (MI_RESULT_OK != Time_Now(&currentTimeUsec))
 833                       return MI_RESULT_FAILED;
 834           
 835                   if (TIME_NEVER != timeoutSelectorAt)
 836                   {
 837                       if (currentTimeUsec >= timeoutSelectorAt)
 838                           return MI_RESULT_TIME_OUT;
 839           
 840                       breakCurrentSelectAt = timeoutSelectorAt;
 841 mike  1.1         }
 842           
 843                   /* Set up FD sets from handlers */
 844                   memset(&rep->readSet, 0, sizeof(rep->readSet));
 845                   memset(&rep->writeSet, 0, sizeof(rep->writeSet));
 846                   memset(&rep->exceptSet, 0, sizeof(rep->exceptSet));
 847           
 848                   for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
 849                   {
 850                       if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)
 851                       {
 852                           if (p->mask & SELECTOR_READ)
 853                           {
 854                               _FDSet(p->sock, &rep->readSet);
 855                               count++;
 856                           }
 857                           if (p->mask & SELECTOR_WRITE)
 858                           {
 859                               _FDSet(p->sock, &rep->writeSet);
 860                               count++;
 861                           }
 862 mike  1.1                 if (p->mask & SELECTOR_EXCEPTION)
 863                           {
 864                               _FDSet(p->sock, &rep->exceptSet);
 865                               count++;
 866                           }
 867                       }
 868                       /* find the minimum timeout form the list */
 869                       if (TIME_NEVER != p->fireTimeoutAt)
 870                       {
 871                           /* if expired - don't wait; notification will be issued later in next loop */
 872                           if (currentTimeUsec >= p->fireTimeoutAt)
 873                               breakCurrentSelectAt = currentTimeUsec;
 874                           else if ( p->fireTimeoutAt < breakCurrentSelectAt)
 875                               breakCurrentSelectAt = p->fireTimeoutAt;
 876                       }
 877                   }
 878           
 879                   _FDSet(rep->notificationSockets[0], &rep->readSet);
 880           
 881                   /* empty list - return */
 882                   if (!rep->head && !rep->allowEmptySelector)
 883 mike  1.1             return MI_RESULT_FAILED;
 884           
 885           
 886                   /* Perform system select */
 887                   n = _Select(&rep->readSet, &rep->writeSet, &rep->exceptSet, 
 888                       breakCurrentSelectAt == (MI_Uint64)-1 ? (MI_Uint64)-1: breakCurrentSelectAt - currentTimeUsec);
 889           
 890                   /* ignore signals, since it canbe part of normal operation */
 891                   if (-1 == n && errno != EINTR)
 892                       return MI_RESULT_FAILED;
 893           
 894                   if (FD_ISSET(rep->notificationSockets[0], &rep->readSet))
 895                       _ProcessCallbacks(rep);
 896           
 897                   /* Notify handlers of outstanding events */
 898                   for (p = (Handler*)rep->head; p; )
 899                   {
 900                       MI_Uint32 mask = 0;
 901                       Handler* next = p->next;
 902           
 903                       if (!stressed || (p->mask & SELECTOR_IGNORE_READ_OVERLOAD) == 0)
 904 mike  1.1             {
 905                           if (p->mask & (SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION))
 906                           {
 907                               /* Check for read event */
 908                               if (FD_ISSET(p->sock, &rep->readSet))
 909                               {
 910                                   mask |= SELECTOR_READ;
 911                                   _FDClr(p->sock, &rep->readSet);
 912                               }
 913           
 914                               /* Check for write event */
 915                               if (FD_ISSET(p->sock, &rep->writeSet))
 916                               {
 917                                   mask |= SELECTOR_WRITE;
 918                                   _FDClr(p->sock, &rep->writeSet);
 919                               }
 920           
 921                               /* Check for exception event */
 922                               if (FD_ISSET(p->sock, &rep->exceptSet))
 923                               {
 924                                   mask |= SELECTOR_EXCEPTION;
 925 mike  1.1                         _FDClr(p->sock, &rep->exceptSet);
 926                               }
 927                           }
 928                       }
 929                       if (TIME_NEVER != p->fireTimeoutAt && currentTimeUsec >= p->fireTimeoutAt)
 930                       {
 931                           mask |= SELECTOR_TIMEOUT;
 932                       }
 933           
 934                       /* If any events, notify watcher */
 935                       if (mask)
 936                       {
 937                           MI_Boolean more;
 938           
 939                           more = (*p->callback)(self, p, mask, currentTimeUsec);
 940           
 941                           if (!more)
 942                           {
 943                               /* Remove handler */
 944                               List_Remove(&rep->head, &rep->tail, (ListElem*)p);
 945           
 946 mike  1.1                     /* Notify handler of removal */
 947                               (*p->callback)(self, p, SELECTOR_REMOVE, currentTimeUsec);
 948                           }
 949           
 950                       }
 951           
 952                       /* Advance to next handler */
 953                       p = next;
 954                   }
 955               }
 956           
 957               return MI_RESULT_OK;
 958           }
 959           
 960           /* Wakes up selector's thread 
 961               Typical usage is to recalculate timeouts on handlers
 962               when selector's Run is running in different thread */
 963           MI_Result Selector_Wakeup(
 964               Selector* self)
 965           {
 966               SelectorRep* rep = (SelectorRep*)self->rep;
 967 mike  1.1 
 968               /* notify running thread */
 969               {
 970                   void* p = 0;
 971                   size_t sent = 0;
 972           
 973                   Sock_Write( rep->notificationSockets[1], &p, sizeof(p), &sent);
 974               }
 975           
 976               return MI_RESULT_OK;
 977           }
 978           
 979           
 980           MI_Result Selector_StopRunning(
 981               Selector* self)
 982           {
 983               SelectorRep* rep = (SelectorRep*)self->rep;
 984           
 985               rep->keepRunning = MI_FALSE;
 986           
 987               return Selector_Wakeup(self);
 988 mike  1.1 }
 989           
 990           /* 
 991               * This funciton guaranties that callback is called in 'Run'/'IO' thread context,
 992               * so locking is required for accessing sokcet objects, updating buffers etc
 993           */
 994           MI_Result Selector_CallInIOThread(
 995               Selector* self,
 996               Selector_NotificationCallback  callback,
 997               void* callback_self,
 998               Message* message)
 999           {
1000               SelectorRep* rep = (SelectorRep*)self->rep;
1001               SelectorCallbacksItem* newItem;
1002               MI_Result r;
1003               size_t sent = 0;
1004           
1005               if (rep->ioThreadHandle == ThreadSelf())
1006               {
1007                   /* direct call - we can write to socket instantly */
1008           
1009 mike  1.1         (*callback)(callback_self, message);
1010                   return MI_RESULT_OK;
1011               }
1012           
1013               /* add item to the list and set event */
1014               newItem = (SelectorCallbacksItem*) Batch_GetClear( message->batch, sizeof(SelectorCallbacksItem));
1015           
1016               if (!newItem)
1017                   return MI_RESULT_FAILED;
1018           
1019               newItem->callback = callback;
1020               newItem->callback_self = callback_self;
1021               newItem->message = message;
1022           
1023               //AtomicInc( &rep->queueLength );
1024           
1025               while (rep->outstandingInstances > MAX_ALLOCATED_INSTANCES)
1026                   Sleep_ms(1);
1027           
1028           #if 0
1029               {
1030 mike  1.1         int counter = 0;
1031           
1032                   while (rep->queueLength > LIST_SIZE_LIMIT )
1033                   {
1034                       counter++;
1035                       /* give system a chance to clear backlog */
1036                       Sleep_ms(1);
1037           
1038                       if (counter > 1000)
1039                       {
1040                           LOGW((MI_T("cannot send message: queue overflow)\n") ));
1041                           return MI_RESULT_FAILED;
1042                       }
1043                   }
1044               }
1045           #endif
1046           
1047               Message_AddRef(message);
1048               r = Sock_Write( rep->notificationSockets[1], &newItem, sizeof(newItem), &sent);
1049           
1050           
1051 mike  1.1     //printf("pipe write: p = %p, sent = %d\n", newItem, (int)sent);
1052               if ( MI_RESULT_OK != r )
1053                   Message_Release(message);
1054           
1055               return r;
1056           }
1057           
1058           #endif /* defined(CONFIG_POSIX) */
1059           
1060           /************************************/
1061           /* generic functionality */
1062           
1063           MI_Result Selector_ContainsHandler(
1064               Selector* self,
1065               Handler* handler)
1066           {
1067               SelectorRep* rep = (SelectorRep*)self->rep;
1068               Handler* p;
1069           
1070               for (p = (Handler*)rep->head; p; p = (Handler*)p->next)
1071               {
1072 mike  1.1         if (p == handler)
1073                   {
1074                       return MI_RESULT_OK;
1075                   }
1076               }
1077           
1078               return MI_RESULT_NOT_FOUND;
1079           }
1080           
1081           static void _dtor(Message* message, void* callbackData)
1082           {
1083               SelectorRep* rep = (SelectorRep*)callbackData;
1084           
1085               MI_UNUSED(message);
1086           
1087               AtomicDec( &rep->outstandingInstances );
1088           }
1089           
1090           void Selector_NewInstanceCreated(
1091               Selector* self,
1092               Message* msg)
1093 mike  1.1 {
1094               SelectorRep* rep = (SelectorRep*)self->rep;
1095           
1096               msg->dtor = _dtor;
1097               msg->dtorData = rep;
1098           
1099               AtomicInc( &rep->outstandingInstances );
1100           }
1101           
1102           MI_Boolean  Selector_IsStressed(
1103               Selector* self)
1104           {
1105               SelectorRep* rep = (SelectorRep*)self->rep;
1106           
1107               /* Are we close to be stressed? */
1108               if (rep->outstandingInstances >= (MAX_ALLOCATED_INSTANCES * 9 / 10))
1109                   return MI_TRUE;
1110           
1111               return MI_FALSE;
1112           }
1113           
1114 mike  1.1 void Selector_SetAllowEmptyFlag(
1115               Selector* self,
1116               MI_Boolean allowEmptySelector)
1117           {
1118               SelectorRep* rep = (SelectorRep*)self->rep;
1119           
1120               rep->allowEmptySelector = allowEmptySelector;
1121           }
1122           

ViewCVS 0.9.2