(file) Return to protocol.c CVS log (file) (dir) Up to [OMI] / omi / protocol

   1 mike  1.1 /*
   2           **==============================================================================
   3           **
   4           ** Open Management Infrastructure (OMI)
   5           **
   6           ** Copyright (c) Microsoft Corporation
   7           ** 
   8           ** Licensed under the Apache License, Version 2.0 (the "License"); you may not 
   9           ** use this file except in compliance with the License. You may obtain a copy 
  10           ** of the License at 
  11           **
  12           **     http://www.apache.org/licenses/LICENSE-2.0 
  13           **
  14           ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15           ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED 
  16           ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, 
  17           ** MERCHANTABLITY OR NON-INFRINGEMENT. 
  18           **
  19           ** See the Apache 2 License for the specific language governing permissions 
  20           ** and limitations under the License.
  21           **
  22 mike  1.1 **==============================================================================
  23           */
  24           
  25           #include <assert.h>
  26           #include "protocol.h"
  27           #include "addr.h"
  28           #include "sock.h"
  29           #include "selector.h"
  30           #include "header.h"
  31           #include "thread.h"
  32           #include <base/buf.h>
  33           #include <base/log.h>
  34           #include <base/result.h>
  35           #include <base/user.h>
  36           #include <base/io.h>
  37           
  38           #define T MI_T
  39           
  40           #if 0
  41           #define ENABLE_TRACING
  42           #endif
  43 mike  1.1 
  44           #ifdef  ENABLE_TRACING
  45           #define PRINTF(a)  printf a
  46           #else
  47           #define PRINTF(a)
  48           #endif
  49           
  50           /*
  51           **==============================================================================
  52           **
  53           ** Local definitions:
  54           **
  55           **==============================================================================
  56           */
  57           
  58           static const MI_Uint32 _MAGIC = 0xC764445E;
  59           #define SR_SOCKET_OUT_QUEUE_WATERMARK_LOW  4
  60           #define SR_SOCKET_OUT_QUEUE_WATERMARK_HIGH  12
  61           #define SR_SOCKET_OUT_QUEUE_WATERMARK_CRITICAL  256
  62           
  63           typedef enum _Protocol_AuthState
  64 mike  1.1 {
  65               /* authentication failed (intentionaly takes value '0')*/
  66               PRT_AUTH_FAILED,
  67           
  68               /* listener (server) waits for connect request */
  69               PRT_AUTH_WAIT_CONNECTION_REQUEST,
  70           
  71               /* listener (server) waits for second connect request with random data from file */
  72               PRT_AUTH_WAIT_CONNECTION_REQUEST_WITH_FILE_DATA,
  73           
  74               /* connector (client) waits for server's response */
  75               PRT_AUTH_WAIT_CONNECTION_RESPONSE,
  76           
  77               /* authentication completed */
  78               PRT_AUTH_OK
  79           }
  80           Protocol_AuthState;
  81           
  82           typedef enum _Protocol_Type
  83           {
  84               PRT_TYPE_LISTENER,
  85 mike  1.1     PRT_TYPE_CONNECTOR,
  86               PRT_TYPE_FROM_SOCKET
  87           }
  88           Protocol_Type;
  89           
  90           struct _Protocol
  91           {
  92               MI_Uint32 magic;
  93               Selector internal_selector;
  94               Selector*   selector;
  95               Addr addr;
  96               struct _Protocol_SR_SocketData* connectorHandle;
  97               ProtocolCallback callback;
  98               void* callbackData;
  99               ProtocolEventCallback eventCallback;
 100               void* eventCallbackData;
 101               Protocol_Type   type;
 102               MI_Boolean  internal_selector_used;
 103               /* Indicates whether instance has to be upacked or stored as byte array */
 104               MI_Boolean  skipInstanceUnpack;
 105           };
 106 mike  1.1 
 107           /* Keeps data for file-based authentication */
 108           typedef struct _Protocol_AuthData
 109           {
 110               char path[MAX_PATH_SIZE];
 111               char   authRandom[AUTH_RANDOM_DATA_SIZE];
 112           }
 113           Protocol_AuthData;
 114           
 115           typedef struct _Protocol_SR_SocketData
 116           {
 117               /* based member*/
 118               Handler     base;
 119           
 120               /* sending data */
 121               /* Linked list of messages to send */
 122               ListElem* head;
 123               ListElem* tail;
 124           
 125               /* ref counter 
 126               NOTE:
 127 mike  1.1         socket may be disconnected, but structure is still alive
 128                   if some outstanding request are not completed.
 129                   refcounter gets '+1' for being connected and '+1' for each outstanding request */
 130               int     refcounter;
 131           
 132               /* currently sending message */
 133               Message*    message;
 134               size_t          sentCurrentBlockBytes;
 135               int             sendingPageIndex; /* 0 for header otherwise 1-N page index */
 136               int             sendingQueueLength; /* number of messages in list to send; -1 to disbale watermarks */
 137           
 138               /* receiving data */
 139               Batch       *receivingBatch;
 140               size_t          receivedCurrentBlockBytes;
 141               int             receivingPageIndex; /* 0 for header otherwise 1-N page index */
 142           
 143               /* send/recv buffers */
 144               Header recv_buffer;
 145               Header send_buffer;
 146           
 147               /* Auth state */
 148 mike  1.1     Protocol_AuthState  authState;
 149               /* server side - auhtenticated user's ids */
 150               uid_t uid;
 151               gid_t gid;
 152               Protocol_AuthData* authData;
 153           
 154               /* Whether this is a connector */
 155               MI_Boolean isConnector;
 156           
 157               /* Whether connection has been established */
 158               MI_Boolean isConnected;
 159           
 160           } 
 161           Protocol_SR_SocketData;
 162           
 163           /* helper functions result */
 164           typedef enum _Protocol_CallbackResult
 165           {
 166               PRT_CONTINUE,
 167               PRT_RETURN_TRUE,
 168               PRT_RETURN_FALSE
 169 mike  1.1 }
 170           Protocol_CallbackResult;
 171           
 172           /* Forward declaration */
 173           static void _PrepareMessageForSending(
 174               Protocol_SR_SocketData *handler);
 175           
 176           static MI_Boolean _RequestCallbackWrite(
 177               Protocol_SR_SocketData* handler);
 178           
 179           /**************** Auth-support **********************************************************/
 180           /* remove auth file and free auth data */
 181           static void _FreeAuthData(
 182               Protocol_SR_SocketData* h)
 183           {
 184               if (h->authData)
 185               {
 186           #if defined(CONFIG_POSIX)
 187                   unlink(h->authData->path);
 188           #endif
 189                   free(h->authData);
 190 mike  1.1         h->authData = 0;
 191               }
 192           }
 193           
 194           /* Creates and sends authentication request message */
 195           static MI_Boolean _SendAuthRequest(
 196               Protocol_SR_SocketData* h,
 197               const char* user,
 198               const char* password,
 199               const char* fileContent)
 200           {
 201               BinProtocolNotification* req;
 202           
 203               req = BinProtocolNotification_New(BinNotificationConnectRequest);
 204           
 205               if (!req)
 206                   return MI_FALSE;
 207           
 208               if (user && *user)
 209                   req->user = Batch_Strdup(req->base.batch, user);
 210           
 211 mike  1.1     if (password && *password)
 212                   req->password = Batch_Strdup(req->base.batch, password);
 213           
 214               req->uid = geteuid();
 215               req->gid = getegid();
 216           
 217               if (fileContent)
 218               {
 219                   memcpy(req->authData, fileContent, sizeof(req->authData));
 220               }
 221           
 222               /* send message */
 223               {
 224                   /* add message to the list */
 225                   List_Prepend(&h->head, &h->tail, (ListElem*)req);
 226           
 227                   if (-1 != h->sendingQueueLength)
 228                       h->sendingQueueLength++;
 229           
 230                   Message_AddRef(&req->base);
 231           
 232 mike  1.1         _PrepareMessageForSending(h);
 233                   _RequestCallbackWrite(h);
 234               }
 235           
 236               BinProtocolNotification_Release(req);
 237           
 238               return MI_TRUE;
 239           }
 240           
 241           static MI_Boolean _SendAuthResponse(
 242               Protocol_SR_SocketData* h,
 243               MI_Result result,
 244               const char* path)
 245           {
 246               BinProtocolNotification* req;
 247           
 248               req = BinProtocolNotification_New(BinNotificationConnectResponse);
 249           
 250               if (!req)
 251                   return MI_FALSE;
 252           
 253 mike  1.1     req->result = result;
 254               if (path && *path)
 255                   req->authFile = Batch_Strdup(req->base.batch, path);
 256           
 257               /* send message */
 258               {
 259                   /* add message to the list */
 260                   List_Prepend(&h->head, &h->tail, (ListElem*)req);
 261           
 262                   if (-1 != h->sendingQueueLength)
 263                       h->sendingQueueLength++;
 264           
 265                   Message_AddRef(&req->base);
 266           
 267                   _PrepareMessageForSending(h);
 268                   _RequestCallbackWrite(h);
 269               }
 270           
 271               BinProtocolNotification_Release(req);
 272           
 273               return MI_TRUE;
 274 mike  1.1 }
 275           
 276           /*
 277               Processes auht message while waiting second connect request 
 278               with content of the file.
 279               Updates auth states correspondingly.
 280               Parameters:
 281               handler - socket handler
 282               binMsg - BinProtocolNotification message with connect request/response
 283           
 284               Return:
 285               "TRUE" if connection should stay open; "FALSE" if auth failed
 286                   and conneciton should be closed immediately
 287           */
 288           static MI_Boolean _ProcessAuthMessageWaitingConnectRequestFileData(
 289               Protocol_SR_SocketData* handler,
 290               BinProtocolNotification* binMsg)
 291           {
 292               /* un-expected message */
 293               if (BinNotificationConnectRequest != binMsg->type)
 294                   return MI_FALSE;
 295 mike  1.1 
 296               /* Check internal state */
 297               if (!handler->authData)
 298                   return MI_FALSE;
 299           
 300               if (0 == memcmp(binMsg->authData, handler->authData->authRandom, AUTH_RANDOM_DATA_SIZE))
 301               {
 302                   if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
 303                       return MI_FALSE;
 304           
 305                   /* Auth ok */
 306                   handler->authState = PRT_AUTH_OK;
 307                   _FreeAuthData(handler);
 308           
 309                   /* Get gid from user name */
 310                   if (0 != GetUserGidByUid(handler->uid, &handler->gid))
 311                   {
 312                       LOGW_CHAR(("cannot get user's gid for uid %d", (int)handler->uid));
 313                       return MI_FALSE;
 314                   }
 315           
 316 mike  1.1         return MI_TRUE;
 317               }
 318           
 319               LOGW_CHAR(("auth failed - random data mismatch"));
 320           
 321               /* Auth failed */
 322               _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
 323               handler->authState = PRT_AUTH_FAILED;
 324               return MI_FALSE;
 325           }
 326           
 327           /*
 328               Processes auht message while waiting connect request 
 329               Updates auth states correspondingly.
 330               Parameters:
 331               handler - socket handler
 332               binMsg - BinProtocolNotification message with connect request/response
 333           
 334               Return:
 335               "TRUE" if connection should stay open; "FALSE" if auth failed
 336                   and conneciton should be closed immediately
 337 mike  1.1 */
 338           static MI_Boolean _ProcessAuthMessageWaitingConnectRequest(
 339               Protocol_SR_SocketData* handler,
 340               BinProtocolNotification* binMsg)
 341           {
 342               /* un-expected message */
 343               if (BinNotificationConnectRequest != binMsg->type)
 344                   return MI_FALSE;
 345           
 346               /* Use explicit credentials if provided */
 347               if (binMsg->user)
 348               {
 349                   /* use empty password if not set */
 350                   if (!binMsg->password)
 351                       binMsg->password = "";
 352           
 353                   if ( 0 == AuthenticateUser(binMsg->user, binMsg->password) &&
 354                       0 == LookupUser(binMsg->user, &handler->uid, &handler->gid))
 355                   {
 356                       if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
 357                           return MI_FALSE;
 358 mike  1.1 
 359                       /* Auth ok */
 360                       handler->authState = PRT_AUTH_OK;
 361                       _FreeAuthData(handler);
 362                       return MI_TRUE;
 363                   }
 364           
 365                   LOGW_CHAR(("auth failed for user [%s]", binMsg->user));
 366           
 367                   /* Auth failed */
 368                   _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
 369                   handler->authState = PRT_AUTH_FAILED;
 370                   return MI_FALSE;
 371               }
 372           
 373               /* If system supports connection-based auth, use it for 
 374                   implicit auth */
 375               if (0 == GetUIDByConnection((int)handler->base.sock, &handler->uid, &handler->gid))
 376               {
 377                   if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
 378                       return MI_FALSE;
 379 mike  1.1 
 380                   /* Auth ok */
 381                   handler->authState = PRT_AUTH_OK;
 382                   return MI_TRUE;
 383               }
 384           #if defined(CONFIG_OS_WINDOWS)
 385               /* ignore auth on Windows */
 386               {
 387                   if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
 388                       return MI_FALSE;
 389           
 390                   /* Auth ok */
 391                   handler->uid = -1;
 392                   handler->gid = -1;
 393                   handler->authState = PRT_AUTH_OK;
 394                   return MI_TRUE;
 395               }
 396           #else
 397           
 398               /* If valid uid provided, try implicit credentials (file-based) 
 399                   gid will be taken from user name */
 400 mike  1.1     {
 401                   handler->authData = (Protocol_AuthData*)calloc(1, sizeof(Protocol_AuthData));
 402           
 403                   if (!handler->authData)
 404                   {
 405                       /* Auth failed */
 406                       _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
 407                       handler->authState = PRT_AUTH_FAILED;
 408                       return MI_FALSE;
 409                   }
 410           
 411                   if (0 != CreateAuthFile(binMsg->uid, handler->authData->authRandom, AUTH_RANDOM_DATA_SIZE, handler->authData->path))
 412                   {
 413                       LOGW_CHAR(("cannot create file for user uid [%d]", (int)binMsg->uid));
 414           
 415                       /* Auth failed */
 416                       _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
 417                       handler->authState = PRT_AUTH_FAILED;
 418                       return MI_FALSE;
 419                   }
 420           
 421 mike  1.1         /* send file name to the client */
 422                   if (!_SendAuthResponse(handler, MI_RESULT_IN_PROGRESS, handler->authData->path))
 423                       return MI_FALSE;
 424           
 425                   /* Auth posponed */
 426                   handler->authState = PRT_AUTH_WAIT_CONNECTION_REQUEST_WITH_FILE_DATA;
 427           
 428                   /* Remember uid we used to create file */
 429                   handler->uid = binMsg->uid;
 430                   handler->gid = -1;
 431           
 432                   return MI_TRUE;
 433           
 434               }
 435           #endif
 436           }
 437           
 438           /*
 439               Processes auht message (either connect request or connect-response)
 440               Updates auth states correspondingly.
 441               Parameters:
 442 mike  1.1     handler - socket handler
 443               msg - BinProtocolNotification message with connect request/response
 444           
 445               Return:
 446               "TRUE" if connection should stay open; "FALSE" if auth failed
 447                   and conneciton should be closed immediately
 448           */
 449           static MI_Boolean _ProcessAuthMessage(
 450               Protocol_SR_SocketData* handler,
 451               Message *msg)
 452           {
 453               BinProtocolNotification* binMsg;
 454           
 455               if (msg->tag != BinProtocolNotificationTag)
 456                   return MI_FALSE;
 457           
 458               binMsg = (BinProtocolNotification*) msg;
 459           
 460               /* server waiting client's first request? */
 461               if (PRT_AUTH_WAIT_CONNECTION_REQUEST == handler->authState)
 462               {
 463 mike  1.1         return _ProcessAuthMessageWaitingConnectRequest(handler, binMsg);
 464               }
 465           
 466               /* server waiting for client's file's content request? */
 467               if (PRT_AUTH_WAIT_CONNECTION_REQUEST_WITH_FILE_DATA == handler->authState)
 468               {
 469                   return _ProcessAuthMessageWaitingConnectRequestFileData(handler, binMsg);
 470               }
 471           
 472               /* client waiting for server's response? */
 473               if (PRT_AUTH_WAIT_CONNECTION_RESPONSE == handler->authState)
 474               {
 475                   /* un-expected message */
 476                   if (BinNotificationConnectResponse != binMsg->type)
 477                       return MI_FALSE;
 478           
 479                   if (binMsg->result == MI_RESULT_OK)
 480                   {
 481                       handler->authState = PRT_AUTH_OK;
 482           
 483                       /* process backlog items (if any) */
 484 mike  1.1             _PrepareMessageForSending(handler);
 485                       _RequestCallbackWrite(handler);
 486                       return MI_TRUE;
 487                   }
 488                   else if (binMsg->result == MI_RESULT_IN_PROGRESS && binMsg->authFile)
 489                   {
 490                       /* send back file's content */
 491                       char buf[AUTH_RANDOM_DATA_SIZE];
 492                       FILE* is = Fopen(binMsg->authFile, "r");
 493           
 494                       if (!is)
 495                       {
 496                           LOGE_CHAR(("cannot open auth data file: %s", binMsg->authFile));
 497                           return MI_FALSE;
 498                       }
 499           
 500                       /* Read auth data from the file. */
 501                       if (sizeof(buf) != fread(buf, 1, sizeof(buf), is))
 502                       {
 503                           LOGE_CHAR(("cannot read from auth data file: %s", binMsg->authFile));
 504                           fclose(is);
 505 mike  1.1                 return MI_FALSE;
 506                       }
 507           
 508                       fclose(is);
 509                       return _SendAuthRequest(handler, 0, 0, buf);
 510                   }
 511                   else
 512                   {
 513                       /* PROTOCOLEVENT_DISCONNECT */
 514                       if (handler->isConnector)
 515                       {
 516                           Protocol* self = (Protocol*)handler->base.data;
 517                           if (self->eventCallback)
 518                           {
 519                               (*self->eventCallback)(self,  handler->isConnected ? PROTOCOLEVENT_DISCONNECT : PROTOCOLEVENT_CONNECT_FAILED,
 520                                   self->eventCallbackData);
 521                           }
 522                           handler->isConnected = MI_FALSE;
 523                       }
 524                   }
 525           
 526 mike  1.1         return MI_FALSE;
 527               }
 528           
 529               /* unknown state? */
 530               return MI_FALSE;
 531           }
 532           
 533           
 534           /****************************************************************************************/
 535           static void _RemoveAllMessages(
 536               Protocol_SR_SocketData* handler)
 537           {
 538               while (handler->head)
 539               {
 540                   Message* msg = (Message*)handler->head;
 541           
 542                   List_Remove(&handler->head, &handler->tail, (ListElem*)msg);
 543                   Message_Release(msg);
 544           
 545                   if (-1 != handler->sendingQueueLength)
 546                       handler->sendingQueueLength--;
 547 mike  1.1     }
 548           }
 549           
 550           static void _Release(
 551               Protocol_SR_SocketData* handler)
 552           {
 553               if (--handler->refcounter == 0)
 554               {
 555                   free(handler);
 556               }
 557           }
 558           
 559           static void _PrepareMessageForSending(
 560               Protocol_SR_SocketData *handler)
 561           {
 562               /* check for hi watermark */
 563               if (handler->sendingQueueLength == SR_SOCKET_OUT_QUEUE_WATERMARK_HIGH)
 564                   handler->base.mask &= ~SELECTOR_READ;
 565           
 566               /* check for hi watermark */
 567               if (handler->sendingQueueLength == SR_SOCKET_OUT_QUEUE_WATERMARK_LOW)
 568 mike  1.1         handler->base.mask |= SELECTOR_READ;
 569           
 570               if (handler->message)
 571                   return; /* already sending */
 572           
 573               if (!handler->head)
 574               {
 575                   handler->base.mask &= ~SELECTOR_WRITE;
 576                   return; /*nothing to do*/
 577               }
 578           
 579               /* before auht is complete, only auht-related messages should be sent */
 580               if (PRT_AUTH_OK != handler->authState && BinProtocolNotificationTag != ((Message*)handler->head)->tag)
 581               {
 582                   handler->base.mask &= ~SELECTOR_WRITE;
 583                   return; /*nothing to do*/
 584               }
 585           
 586               handler->message = (Message*)handler->head;
 587               List_Remove(&handler->head, &handler->tail, (ListElem*)handler->message);
 588           
 589 mike  1.1     if (-1 != handler->sendingQueueLength)
 590                   handler->sendingQueueLength--;
 591           
 592               /* reset sending attributes */
 593               handler->sendingPageIndex = 0;
 594               handler->sentCurrentBlockBytes = 0;
 595           
 596               memset(&handler->send_buffer,0,sizeof(handler->send_buffer));
 597               handler->send_buffer.base.magic = PROTOCOL_MAGIC;
 598               handler->send_buffer.base.version = PROTOCOL_VERSION;
 599               handler->send_buffer.base.pageCount = (MI_Uint32)Batch_GetPageCount(handler->message->batch);
 600               handler->send_buffer.base.originalMessagePointer = handler->message;
 601           
 602               /* ATTN! */
 603               assert (handler->send_buffer.base.pageCount <= PROTOCOL_HEADER_MAX_PAGES);
 604           
 605               /* get page info */
 606           
 607               Batch_GetPageInfo(
 608                   handler->message->batch, handler->send_buffer.batchInfo);
 609           
 610 mike  1.1     /* mark handler as 'want-write' */
 611               handler->base.mask |= SELECTOR_WRITE;
 612           
 613           }
 614           
 615           static MI_Boolean _RequestCallbackWrite(
 616               Protocol_SR_SocketData* handler)
 617           {
 618               /* try to write to socket as much as possible */
 619               size_t sent;
 620               MI_Result r;
 621           
 622               for(;;)
 623               {
 624                   /* buffers to write */
 625                   IOVec   buffers[32];
 626                   size_t counter;
 627           
 628                   if ( !handler->message )
 629                   { /* nothing to send */
 630                       handler->base.mask &= ~SELECTOR_WRITE;
 631 mike  1.1             return MI_TRUE;
 632                   }
 633           
 634                   for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
 635                   {
 636                       const char* buf;
 637                       MI_Uint32 index = (MI_Uint32)(handler->sendingPageIndex + counter);
 638           
 639                       buf = (index == 0) ? 
 640                           &handler->send_buffer : 
 641                           handler->send_buffer.batchInfo[index - 1].pagePointer;
 642           
 643                       if (!counter)
 644                           buf += handler->sentCurrentBlockBytes;
 645           
 646                       buffers[counter].ptr = (void*)buf;
 647           
 648                       buffers[counter].len = (index == 0) ? (sizeof(HeaderBase) + sizeof(Header_BatchInfoItem) * handler->send_buffer.base.pageCount)
 649                           : handler->send_buffer.batchInfo[index - 1].pageSize;
 650           
 651                       if (!counter)
 652 mike  1.1                 buffers[counter].len -= handler->sentCurrentBlockBytes;
 653           
 654                       if ( index == handler->send_buffer.base.pageCount)
 655                       {
 656                           counter++;
 657                           break;
 658                       }
 659                   }
 660           
 661                   sent = 0;
 662           
 663                   r = Sock_WriteV(handler->base.sock, buffers, counter, &sent);
 664           
 665                   PRINTF(("sent %d\n", sent));
 666           
 667                   if ( r == MI_RESULT_OK && 0 == sent )
 668                       return MI_FALSE; /* conection closed */
 669           
 670                   if ( r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK )
 671                       return MI_FALSE;
 672           
 673 mike  1.1         if (!sent)
 674                       return MI_TRUE;
 675           
 676                   /* update index */
 677                   for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
 678                   {
 679                       if (!sent)
 680                           break;
 681           
 682                       if (sent >= buffers[counter].len)
 683                       {
 684                           sent -= buffers[counter].len;
 685                           handler->sendingPageIndex++;
 686                           handler->sentCurrentBlockBytes = 0;
 687                           continue;
 688                       }
 689           
 690                       handler->sentCurrentBlockBytes += sent;
 691                       break;
 692                   }
 693           
 694 mike  1.1         if ( (handler->sendingPageIndex - 1) == (int)handler->send_buffer.base.pageCount )
 695                   {
 696                       PRINTF(("done with sending message tag %d\n", handler->message->tag));
 697           
 698                       /* next message */
 699                       Message_Release(handler->message);
 700                       handler->message = 0;
 701           
 702                       _PrepareMessageForSending(handler);
 703                   }
 704               }
 705           }
 706           
 707           /*
 708               Processes incoming message, including:
 709                   - decoding message from batch
 710                   - invoking callback to process message
 711           
 712               Parameters:
 713                   handler - pointer to received data
 714               Returns:
 715 mike  1.1         it returns result if 'callback' with the followinf meaning:
 716                   MI_TRUE - to continue normal operations
 717                   MI_FALSE - to close connection
 718           */
 719           static MI_Boolean _ProcessReceivedMessage(
 720               Protocol_SR_SocketData* handler)
 721           {
 722               MI_Result r;
 723               Message* msg = 0;
 724               Protocol* self = (Protocol*)handler->base.data;
 725               MI_Boolean ret = MI_TRUE;
 726           
 727               /* create a message from a batch */
 728               r = MessageFromBatch(
 729                   handler->receivingBatch,
 730                   handler->recv_buffer.base.originalMessagePointer,
 731                   handler->recv_buffer.batchInfo,
 732                   handler->recv_buffer.base.pageCount,
 733                   self->skipInstanceUnpack,
 734                   &msg);
 735           
 736 mike  1.1     if (MI_RESULT_OK == r)
 737               {
 738                   PRINTF(("done with receiving message tag %d\n", msg->tag));
 739           
 740                   if (PRT_AUTH_OK != handler->authState)
 741                   {
 742                       ret = _ProcessAuthMessage(handler, msg);
 743                   }
 744                   else
 745                   {
 746                       /* attach client id */
 747                       msg->clientID = PtrToUint64(handler);
 748           
 749                       /* +1 for incoming request */
 750                       handler->refcounter++;
 751           
 752                       /* auth info */
 753                       msg->uid = handler->uid;
 754                       msg->gid = handler->gid;
 755           
 756                       /* count message in for back-pressure feature (only Instances) */
 757 mike  1.1             if (PostInstanceMsgTag == msg->tag &&
 758                           PRT_TYPE_FROM_SOCKET == self->type)
 759                           Selector_NewInstanceCreated(self->selector, msg);
 760           
 761                       ret = (*self->callback)(self, msg, self->callbackData);
 762                   }
 763           
 764                   Message_Release(msg);
 765               }
 766               else
 767               {
 768                   LOGW((T("failed to restore message %d [%s]\n"), r,
 769                       Result_ToString(r)));
 770                   Batch_Destroy( handler->receivingBatch );
 771               }
 772           
 773               /* clean up the state */
 774               handler->receivingBatch = 0;
 775               handler->receivingPageIndex = 0;
 776               memset(&handler->recv_buffer,0,sizeof(handler->recv_buffer));
 777           
 778 mike  1.1     return ret;
 779           }
 780           
 781           
 782           static Protocol_CallbackResult _ReadHeader(
 783               Protocol_SR_SocketData* handler)
 784           {
 785               char* buf;
 786               size_t buf_size, received;
 787               MI_Result r;
 788               MI_Uint32 index;
 789           
 790               /* are we done with header? */
 791               if (0!= handler->receivingPageIndex)
 792                   return PRT_CONTINUE;
 793           
 794               for (;;)
 795               {
 796                   buf = (char*)&handler->recv_buffer;
 797                   buf_size = (sizeof(HeaderBase) + sizeof(Header_BatchInfoItem) * handler->recv_buffer.base.pageCount);
 798                   received = 0;
 799 mike  1.1 
 800                   r = Sock_Read(handler->base.sock, buf + handler->receivedCurrentBlockBytes, buf_size - handler->receivedCurrentBlockBytes, &received);
 801           
 802                   PRINTF(("read %d\n", received));
 803           
 804                   if ( r == MI_RESULT_OK && 0 == received )
 805                       return PRT_RETURN_FALSE; /* conection closed */
 806           
 807                   if ( r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK )
 808                       return PRT_RETURN_FALSE;
 809           
 810                   if (!received)
 811                       return PRT_RETURN_TRUE;
 812           
 813                   handler->receivedCurrentBlockBytes += received;
 814           
 815                   if (handler->receivedCurrentBlockBytes == buf_size)
 816                   {
 817                       /* got header - validate/allocate as required */
 818                       if (handler->recv_buffer.base.pageCount > PROTOCOL_HEADER_MAX_PAGES)
 819                           return PRT_RETURN_FALSE;
 820 mike  1.1 
 821                       if (handler->recv_buffer.base.magic != PROTOCOL_MAGIC)
 822                           return PRT_RETURN_FALSE;
 823           
 824                       for (index =0; index < handler->recv_buffer.base.pageCount; index++)
 825                       {
 826                           if (handler->recv_buffer.batchInfo[index].pageSize > (64*1024))
 827                               return PRT_RETURN_FALSE;
 828                       }
 829           
 830                       /* check if page info is also retrieved */
 831                       if (buf_size != ((sizeof(HeaderBase) + sizeof(Header_BatchInfoItem) * handler->recv_buffer.base.pageCount)) )
 832                           continue;
 833           
 834                       /* create a batch */
 835                       if (!Batch_CreateBatchByPageInfo(
 836                           &handler->receivingBatch, 
 837                           handler->recv_buffer.batchInfo,
 838                           handler->recv_buffer.base.pageCount))
 839                           return PRT_RETURN_FALSE;
 840           
 841 mike  1.1             /* skip to next page */
 842                       handler->receivingPageIndex++;
 843                       handler->receivedCurrentBlockBytes = 0;
 844           
 845                       if ( (handler->receivingPageIndex - 1) == (int)handler->recv_buffer.base.pageCount )
 846                       {   /* received the whole message - process it */
 847                           if (!_ProcessReceivedMessage(handler))
 848                               return PRT_RETURN_FALSE;
 849                       }
 850                       break;
 851                   } /* if we read the whole buffer */
 852               } /* for(;;)*/
 853               return PRT_CONTINUE;
 854           }
 855           
 856           
 857           static Protocol_CallbackResult _ReadAllPages(
 858               Protocol_SR_SocketData* handler)
 859           {
 860               size_t received;
 861               MI_Result r;
 862 mike  1.1     /* buffers to write */
 863               IOVec   buffers[32];
 864               size_t counter;
 865           
 866               /* are we done with header? - if not, return 'continue' */
 867               if (0== handler->receivingPageIndex)
 868                   return PRT_CONTINUE;
 869           
 870           
 871               for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
 872               {
 873                   const char* buf;
 874                   MI_Uint32 index = (MI_Uint32)(handler->receivingPageIndex + counter);
 875           
 876                   buf = Batch_GetPageByIndex(handler->receivingBatch, index - 1);
 877                   
 878                   if (!counter)
 879                       buf += handler->receivedCurrentBlockBytes;
 880           
 881                   buffers[counter].ptr = (void*)buf;
 882                   buffers[counter].len = handler->recv_buffer.batchInfo[index - 1].pageSize;
 883 mike  1.1 
 884                   if (!counter)
 885                       buffers[counter].len -= handler->receivedCurrentBlockBytes;
 886           
 887                   if ( index == handler->recv_buffer.base.pageCount)
 888                   {
 889                       counter++;
 890                       break;
 891                   }
 892               }
 893           
 894               received = 0;
 895           
 896               r = Sock_ReadV(handler->base.sock, buffers, counter, &received);
 897           
 898               PRINTF(("read %d\n", received));
 899           
 900               if ( r == MI_RESULT_OK && 0 == received )
 901                   return PRT_RETURN_FALSE; /* conection closed */
 902           
 903               if ( r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK )
 904 mike  1.1         return PRT_RETURN_FALSE;
 905           
 906               if (!received)
 907                   return PRT_RETURN_TRUE;
 908           
 909               /* update index */
 910               for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
 911               {
 912                   if (!received)
 913                       break;
 914           
 915                   if (received >= buffers[counter].len)
 916                   {
 917                       received -= buffers[counter].len;
 918                       handler->receivingPageIndex++;
 919                       handler->receivedCurrentBlockBytes = 0;
 920                       continue;
 921                   }
 922           
 923                   handler->receivedCurrentBlockBytes += received;
 924                   break;
 925 mike  1.1     }
 926           
 927               if ( (handler->receivingPageIndex - 1) == (int)handler->recv_buffer.base.pageCount )
 928               {   /* received the whole message - process it */
 929                   if (!_ProcessReceivedMessage(handler))
 930                       return PRT_RETURN_FALSE;
 931               }
 932           
 933               return PRT_CONTINUE;
 934           }
 935           
 936           static MI_Boolean _RequestCallbackRead(
 937               Protocol_SR_SocketData* handler)
 938           {
 939               int fullMessagesREceived = 0;
 940           
 941               /* we have to keep repeating read until 'WOULD_BLOCK is returned;
 942                   windows does not reset event until read buffer is empty */
 943               for (;fullMessagesREceived < 3;)
 944               {
 945                   switch (_ReadHeader(handler))
 946 mike  1.1         {
 947                   case PRT_CONTINUE: break;
 948                   case PRT_RETURN_TRUE: return MI_TRUE;
 949                   case PRT_RETURN_FALSE: return MI_FALSE;
 950                   }
 951           
 952                   switch (_ReadAllPages(handler))
 953                   {
 954                   case PRT_CONTINUE: break;
 955                   case PRT_RETURN_TRUE: return MI_TRUE;
 956                   case PRT_RETURN_FALSE: return MI_FALSE;
 957                   }
 958               } /* for(;;)*/
 959               return MI_TRUE;
 960           }
 961           
 962           static MI_Boolean _RequestCallback(
 963               Selector* sel,
 964               Handler* handlerIn,
 965               MI_Uint32 mask, 
 966               MI_Uint64 currentTimeUsec)
 967 mike  1.1 {
 968               Protocol_SR_SocketData* handler = (Protocol_SR_SocketData*)handlerIn;
 969           
 970               MI_UNUSED(sel);
 971               MI_UNUSED(currentTimeUsec);
 972           
 973               if (mask & SELECTOR_READ)
 974               {
 975                   if (!_RequestCallbackRead(handler))
 976                   {
 977                       /* PROTOCOLEVENT_DISCONNECT */
 978                       if (handler->isConnector)
 979                       {
 980                           Protocol* self = (Protocol*)handler->base.data;
 981                           if (self->eventCallback)
 982                           {
 983                               (*self->eventCallback)(self,  handler->isConnected ? PROTOCOLEVENT_DISCONNECT : PROTOCOLEVENT_CONNECT_FAILED,
 984                                   self->eventCallbackData);
 985                           }
 986                           handler->isConnected = MI_FALSE;
 987                       }
 988 mike  1.1             goto closeConnection;
 989                   }
 990                   else if (handler->isConnector && !handler->isConnected)
 991                   {
 992                       Protocol* self = (Protocol*)handler->base.data;
 993                       if (self->eventCallback)
 994                       {
 995                           (*self->eventCallback)(self, PROTOCOLEVENT_CONNECT,
 996                               self->eventCallbackData);
 997                       }
 998                       handler->isConnected = MI_TRUE;
 999                   }
1000               }
1001           
1002               if (mask & SELECTOR_WRITE)
1003               {
1004                   if (!_RequestCallbackWrite(handler))
1005                   {
1006                       /* PROTOCOLEVENT_DISCONNECT */
1007                       if (handler->isConnector && handler->isConnected)
1008                       {
1009 mike  1.1                 Protocol* self = (Protocol*)handler->base.data;
1010                           if (self->eventCallback)
1011                           {
1012                               (*self->eventCallback)(self, PROTOCOLEVENT_DISCONNECT,
1013                                   self->eventCallbackData);
1014                           }
1015                           handler->isConnected = MI_FALSE;
1016                       }
1017                       goto closeConnection;
1018                   }
1019                   else if (handler->isConnector && !handler->isConnected)
1020                   {
1021                       Protocol* self = (Protocol*)handler->base.data;
1022                       if (self->eventCallback)
1023                       {
1024                           (*self->eventCallback)(self, PROTOCOLEVENT_CONNECT,
1025                               self->eventCallbackData);
1026                       }
1027                       handler->isConnected = MI_TRUE;
1028                   }
1029               }
1030 mike  1.1 
1031               /* Close connection by timeout */
1032               if (mask & SELECTOR_TIMEOUT)
1033                   return MI_FALSE;
1034           
1035               if ((mask & SELECTOR_REMOVE) != 0 ||
1036                   (mask & SELECTOR_DESTROY) != 0)
1037               {
1038                   Protocol* self = (Protocol*)handler->base.data;
1039           
1040                   _FreeAuthData(handler);
1041           
1042                   /* free outstanding messages, batch */
1043                   if (handler->receivingBatch)
1044                       Batch_Destroy( handler->receivingBatch );
1045           
1046                   handler->receivingBatch = 0;
1047           
1048                   if (handler->message)
1049                       Message_Release(handler->message);
1050           
1051 mike  1.1         handler->message = 0;
1052           
1053                   _RemoveAllMessages(handler);
1054           
1055                   Sock_Close(handler->base.sock);
1056           
1057                   /* Mark handler as closed */
1058                   handler->base.sock = INVALID_SOCK;
1059           
1060                   /* if connection sokcet was released, invalidate pointer to it */
1061                   if (self && handler == self->connectorHandle)
1062                       self->connectorHandle = 0;
1063           
1064                   if (handler->isConnector)
1065                       free(handler);
1066                   else
1067                       _Release(handler);
1068               }
1069           
1070               return MI_TRUE;
1071           
1072 mike  1.1 closeConnection:
1073           
1074               PRINTF(("LOG: closed client connection\n"));
1075           
1076               return MI_FALSE;
1077           }
1078           
1079           static MI_Boolean _ListenerCallback(
1080               Selector* sel,
1081               Handler* handler,
1082               MI_Uint32 mask, 
1083               MI_Uint64 currentTimeUsec)
1084           {
1085               Protocol* self = (Protocol*)handler->data;
1086               MI_Result r;
1087               Sock s;
1088               Addr addr;
1089               Protocol_SR_SocketData* h;
1090           
1091               sel=sel;
1092               mask=mask;
1093 mike  1.1     currentTimeUsec = currentTimeUsec;
1094           
1095               if (mask & SELECTOR_READ)
1096               {
1097                   /* Accept the incoming connection */
1098                   r = Sock_Accept(handler->sock, &s, &addr);
1099           
1100                   if (MI_RESULT_WOULD_BLOCK == r)
1101                       return MI_TRUE;
1102           
1103                   if (r != MI_RESULT_OK)
1104                   {
1105                       LOGW((T("Sock_Accept() failed; err %d\n"), Sock_GetLastError()));
1106                       return MI_TRUE;
1107                   }
1108           
1109                   r = Sock_SetBlocking(s, MI_FALSE);
1110                   if (r != MI_RESULT_OK)
1111                   {
1112                       LOGW((T("Sock_SetBlocking() failed\n")));
1113                       Sock_Close(s);
1114 mike  1.1             return MI_TRUE;
1115                   }
1116           
1117                   /* Create handler */
1118                   h = (Protocol_SR_SocketData*)calloc(1, sizeof(Protocol_SR_SocketData));
1119           
1120                   if (!h)
1121                   {
1122                       Sock_Close(s);
1123                       return MI_TRUE;
1124                   }
1125           
1126                   h->base.sock = s;
1127                   h->base.mask = SELECTOR_READ | SELECTOR_EXCEPTION;
1128                   h->base.callback = _RequestCallback;
1129                   h->base.data = self;
1130           
1131                   /* get '1' for connected */
1132                   h->refcounter = 1;
1133           
1134                   /* waiting for connect-request */
1135 mike  1.1         h->authState = PRT_AUTH_WAIT_CONNECTION_REQUEST;
1136           
1137                   /* Watch for read events on the incoming connection */
1138                   r = Selector_AddHandler(self->selector, &h->base);
1139           
1140                   if (r != MI_RESULT_OK)
1141                   {
1142                       LOGW((T("Selector_AddHandler() failed\n")));
1143                       return MI_TRUE;
1144                   }
1145               }
1146           
1147               if ((mask & SELECTOR_REMOVE) != 0 ||
1148                   (mask & SELECTOR_DESTROY) != 0)
1149               {
1150                   Sock_Close(handler->sock);
1151                   free(handler);
1152               }
1153           
1154               return MI_TRUE;
1155           }
1156 mike  1.1 
1157           static MI_Result _CreateListener(
1158               Sock* s,
1159               const char* locator)
1160           {
1161               const char* posColon;
1162           
1163               posColon = strchr(locator, ':');
1164           
1165               if (!posColon)
1166                   return Sock_CreateLocalListener(s, locator);
1167           
1168               /* create listener for remote address like host:port or :port (ANYADDR) */
1169               {
1170                   unsigned short port = (unsigned short)atol(posColon+1);
1171                   char host[128];
1172                   unsigned int len = (unsigned int)(posColon - locator);
1173                   Addr addr;
1174                   MI_Result r;
1175           
1176                   if (len > 0)
1177 mike  1.1         {
1178                       if (len >= sizeof(host))
1179                           return MI_RESULT_FAILED;
1180           
1181                       memcpy(host, locator, len);
1182                       host[len] = 0;
1183           
1184                       // Initialize address.
1185                       r = Addr_Init(&addr, host, port);
1186                       if (r != MI_RESULT_OK)
1187                           return MI_RESULT_FAILED;
1188                   }
1189                   else
1190                   {
1191                       Addr_InitAny(&addr, port);
1192                   }
1193           
1194                   return Sock_CreateListener(s, &addr);
1195               }
1196           }
1197           
1198 mike  1.1 static MI_Result _CreateConnector(
1199               Sock* s,
1200               const char* locator)
1201           {
1202               const char* posColon;
1203           
1204               posColon = strchr(locator, ':');
1205           
1206               if (!posColon)
1207                   return Sock_CreateLocalConnector(s, locator);
1208           
1209               /* create connector to remote address like host:port */
1210               {
1211                   unsigned short port = (unsigned short)atol(posColon+1);
1212                   char host[128];
1213                   unsigned int len = (unsigned int)(posColon - locator);
1214                   Addr addr;
1215                   MI_Result r;
1216           
1217                   if (len >= sizeof(host))
1218                       return MI_RESULT_FAILED;
1219 mike  1.1 
1220                   memcpy(host, locator, len);
1221                   host[len] = 0;
1222           
1223                   // Initialize address.
1224                   r = Addr_Init(&addr, host, port);
1225                   if (r != MI_RESULT_OK)
1226                       return MI_RESULT_FAILED;
1227           
1228                   // Create client socket.
1229                   r = Sock_Create(s);
1230                   if (r != MI_RESULT_OK)
1231                   {
1232                       Sock_Close(*s);
1233                       return MI_RESULT_FAILED;
1234                   }
1235           
1236                   r = Sock_SetBlocking(*s, MI_FALSE);
1237                   if (r != MI_RESULT_OK)
1238                   {
1239                       Sock_Close(*s);
1240 mike  1.1             return MI_RESULT_FAILED;
1241                   }
1242           
1243                   // Connect to server.
1244                   r = Sock_Connect(*s, &addr);
1245                   if (r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK)
1246                   {
1247                       Sock_Close(*s);
1248                       return MI_RESULT_FAILED;
1249                   }
1250                   return r;
1251               }
1252           }
1253           
1254           static MI_Result _New_Protocol(
1255               Protocol** selfOut,
1256               Selector* selector, /*optional, maybe NULL*/
1257               ProtocolCallback callback,
1258               void* callbackData,
1259               ProtocolEventCallback eventCallback,
1260               void* eventCallbackData)
1261 mike  1.1 {
1262               Protocol* self;
1263           
1264               /* Check parameters */
1265               if (!selfOut)
1266                   return MI_RESULT_INVALID_PARAMETER;
1267           
1268               /* Clear output parameter */
1269               *selfOut = NULL;
1270           
1271               /* Allocate structure */
1272               {
1273                   self = (Protocol*)calloc(1, sizeof(Protocol));
1274           
1275                   if (!self)
1276                       return MI_RESULT_FAILED;
1277               }
1278           
1279               if (selector)
1280               {   /* attach the exisiting selector */
1281                   self->selector = selector;
1282 mike  1.1         self->internal_selector_used = MI_FALSE;
1283               }
1284               else
1285               {   /* creaet a new selector */
1286                   /* Initialize the network */
1287                   Sock_Start();
1288           
1289                   /* Initialize the selector */
1290                   if (Selector_Init(&self->internal_selector) != MI_RESULT_OK)
1291                   {
1292                       free(self);
1293                       return MI_RESULT_FAILED;
1294                   }
1295                   self->selector = &self->internal_selector;
1296                   self->internal_selector_used = MI_TRUE;
1297               }
1298           
1299               /* Save the callback and callbackData */
1300               self->callback = callback;
1301               self->callbackData = callbackData;
1302               self->eventCallback = eventCallback;
1303 mike  1.1     self->eventCallbackData = eventCallbackData;
1304           
1305               /* Set the magic number */
1306               self->magic = _MAGIC;
1307           
1308               /* Set output parameter */
1309               *selfOut = self;
1310               return MI_RESULT_OK;
1311           }
1312           
1313           /*
1314           **==============================================================================
1315           **
1316           ** Public definitions:
1317           **
1318           **==============================================================================
1319           */
1320           
1321           MI_Result Protocol_New_Listener(
1322               Protocol** selfOut,
1323               Selector* selector, /*optional, maybe NULL*/
1324 mike  1.1     const char* locator,
1325               ProtocolCallback callback,
1326               void* callbackData)
1327           {
1328               Protocol* self;
1329               MI_Result r;
1330               Sock listener;
1331           
1332               r = _New_Protocol(selfOut, selector, callback, callbackData, NULL, NULL);
1333           
1334               if (MI_RESULT_OK != r)
1335                   return r;
1336           
1337               self = *selfOut;
1338           
1339               self->type = PRT_TYPE_LISTENER;
1340           
1341               /* Create listener socket */
1342               {
1343                   r = _CreateListener(&listener, locator);
1344           
1345 mike  1.1         if (r != MI_RESULT_OK)
1346                   {
1347                       Protocol_Delete(self);
1348                       return r;
1349                   }
1350           
1351                   r = Sock_SetBlocking(listener, MI_FALSE);
1352           
1353                   if (r != MI_RESULT_OK)
1354                   {
1355                       Sock_Close(listener);
1356                       Protocol_Delete(self);
1357                       return r;
1358                   }
1359               }
1360           
1361               /* Watch for read events on the listener socket (client connections) */
1362               {
1363                   Handler* h = (Handler*)calloc(1, sizeof(Handler));
1364           
1365                   if (!h)
1366 mike  1.1         {
1367                       Sock_Close(listener);
1368                       Protocol_Delete(self);
1369                       return MI_RESULT_FAILED;
1370                   }
1371           
1372                   h->sock = listener;
1373                   h->mask = SELECTOR_READ | SELECTOR_EXCEPTION;
1374                   h->callback = _ListenerCallback;
1375                   h->data = self;
1376           
1377                   r = Selector_AddHandler(self->selector, h);
1378           
1379                   if (r != MI_RESULT_OK)
1380                   {
1381                       Sock_Close(listener);
1382                       free(h);
1383                       Protocol_Delete(self);
1384                       return r;
1385                   }
1386               }
1387 mike  1.1 
1388               return MI_RESULT_OK;
1389           }
1390           
1391           MI_Result Protocol_New_Connector(
1392               Protocol** selfOut,
1393               Selector* selector, /*optional, maybe NULL*/
1394               const char* locator,
1395               ProtocolCallback callback,
1396               void* callbackData,
1397               ProtocolEventCallback eventCallback,
1398               void* eventCallbackData,
1399               const char* user,
1400               const char* password)
1401           {
1402               Protocol* self;
1403               MI_Result r;
1404               Sock connector;
1405           
1406               r = _New_Protocol(selfOut, selector, callback, callbackData,
1407                   eventCallback, eventCallbackData);
1408 mike  1.1 
1409               if (MI_RESULT_OK != r)
1410                   return r;
1411           
1412               self = *selfOut;
1413               *selfOut = 0;
1414           
1415               self->type = PRT_TYPE_CONNECTOR;
1416           
1417               /* Create connector socket */
1418               {
1419                   // Connect to server.
1420                   r = _CreateConnector(&connector, locator);
1421                   if (r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK)
1422                   {
1423                       Protocol_Delete(self);
1424                       return MI_RESULT_FAILED;
1425                   }
1426               }
1427           
1428               /* Allocating connector's structure */
1429 mike  1.1     {
1430                   Protocol_SR_SocketData* h = (Protocol_SR_SocketData*)calloc(1, sizeof(Protocol_SR_SocketData));
1431           
1432           
1433                   if (!h)
1434                   {
1435                       Sock_Close(connector);
1436                       Protocol_Delete(self);
1437                       return MI_RESULT_FAILED;
1438                   }
1439           
1440                   h->base.sock = connector;
1441                   h->base.mask = SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION;
1442                   h->base.callback = _RequestCallback;
1443                   h->base.data = self;
1444                   h->sendingQueueLength = -1; /* disable watermarks for client */
1445                   h->isConnector = MI_TRUE;
1446                   h->isConnected = MI_FALSE;
1447                   h->authState = PRT_AUTH_WAIT_CONNECTION_RESPONSE;
1448           
1449                   r = Selector_AddHandler(self->selector, &h->base);
1450 mike  1.1 
1451                   if (r != MI_RESULT_OK)
1452                   {
1453                       Sock_Close(connector);
1454                       Protocol_Delete(self);
1455                       free(h);
1456                       return MI_RESULT_FAILED;
1457                   }
1458                   self->connectorHandle = h;
1459           
1460                   /* send connect request */
1461                   if (!_SendAuthRequest(h, user, password, NULL))
1462                   {
1463                       /* remove handler will free 'h' pointer */
1464                       Selector_RemoveHandler(self->selector, &h->base);
1465                       Protocol_Delete(self);
1466                       return MI_RESULT_FAILED;
1467                   }
1468               }
1469           
1470               /* Set output parameter */
1471 mike  1.1     *selfOut = self;
1472               return MI_RESULT_OK;
1473           }
1474           
1475           MI_Result Protocol_New_From_Socket(
1476               Protocol** selfOut,
1477               Selector* selector, /*optional, maybe NULL*/
1478               Sock s,
1479               MI_Boolean skipInstanceUnpack,
1480               ProtocolCallback callback,
1481               void* callbackData,
1482               ProtocolEventCallback eventCallback,
1483               void* eventCallbackData)
1484           {
1485               Protocol* self;
1486               MI_Result r;
1487           
1488               r = _New_Protocol(selfOut, selector, callback, callbackData,
1489                   eventCallback, eventCallbackData);
1490           
1491               if (MI_RESULT_OK != r)
1492 mike  1.1         return r;
1493           
1494               self = *selfOut;
1495               *selfOut = 0;
1496           
1497               self->type = PRT_TYPE_FROM_SOCKET;
1498           
1499               self->skipInstanceUnpack = skipInstanceUnpack;
1500           
1501               /* Attach provided socket to connector */
1502               {
1503                   Protocol_SR_SocketData* h = (Protocol_SR_SocketData*)calloc(1, sizeof(Protocol_SR_SocketData));
1504           
1505                   if (!h)
1506                   {
1507                       Protocol_Delete(self);
1508                       return MI_RESULT_FAILED;
1509                   }
1510           
1511                   h->base.sock = s;
1512                   h->base.mask = SELECTOR_READ  | SELECTOR_EXCEPTION;
1513 mike  1.1 
1514                   if (skipInstanceUnpack)
1515                   {
1516                       /* skipInstanceUnpack indicates that call made from server
1517                           and socket connected to the agent
1518                           In that case we can use back=pressure feature and
1519                           ignore socket operations under stress */
1520                       h->base.mask |= SELECTOR_IGNORE_READ_OVERLOAD;
1521                   }
1522           
1523                   h->base.callback = _RequestCallback;
1524                   h->base.data = self;
1525                   h->isConnector = MI_TRUE;
1526                   h->isConnected = MI_TRUE;
1527                   /* skip authentication for established connections 
1528                       (only used in server/agent communication) */
1529                   h->authState = PRT_AUTH_OK;
1530           
1531                   r = Selector_AddHandler(self->selector, &h->base);
1532           
1533                   if (r != MI_RESULT_OK)
1534 mike  1.1         {
1535                       Protocol_Delete(self);
1536                       free(h);
1537                       return MI_RESULT_FAILED;
1538                   }
1539                   self->connectorHandle = h;
1540               }
1541           
1542               /* Set output parameter */
1543               *selfOut = self;
1544               return MI_RESULT_OK;
1545           }
1546           
1547           MI_Result Protocol_Delete(
1548               Protocol* self)
1549           {
1550               /* Check parameters */
1551               if (!self)
1552                   return MI_RESULT_INVALID_PARAMETER;
1553           
1554               /* Check magic number */
1555 mike  1.1     if (self->magic != _MAGIC)
1556                   return MI_RESULT_INVALID_PARAMETER;
1557           
1558               if (self->internal_selector_used)
1559               {
1560                   /* Release selector;
1561                   Note: selector-destory closes all sockects in a list including connector and listener */
1562                   Selector_Destroy(self->selector);
1563           
1564                   /* Shutdown the network */
1565                   Sock_Stop();
1566               }
1567           
1568               /* if connector, invalide 'self' pointr in connector */
1569               if (self->connectorHandle)
1570                   self->connectorHandle->base.data = 0;
1571           
1572               /* Clear magic number */
1573               self->magic = 0xDDDDDDDD;
1574           
1575               /* Free self pointer */
1576 mike  1.1     free(self);
1577           
1578           
1579               return MI_RESULT_OK;
1580           }
1581           
1582           MI_Result Protocol_Run(
1583               Protocol* self,
1584               MI_Uint64 timeoutUsec)
1585           {
1586               /* Run the selector */
1587               return Selector_Run(self->selector, timeoutUsec);
1588           }
1589           
1590           static MI_Result _SendIN_IO_thread(
1591               void* self_,
1592               Message* message)
1593           {
1594               Protocol* self = (Protocol*)self_;
1595               Protocol_SR_SocketData* sendSock;
1596           
1597 mike  1.1     /* check params */
1598               if (!self || !message )
1599                   return MI_RESULT_INVALID_PARAMETER;
1600           
1601               if (self->magic != _MAGIC)
1602               {
1603                   LOGW((T("_SendIN_IO_thread: invalid magic!") ));
1604                   return MI_RESULT_INVALID_PARAMETER;
1605               }
1606           
1607               /* find where to send it */
1608               if (self->connectorHandle)
1609                   sendSock = self->connectorHandle;
1610               else
1611                   sendSock = (Protocol_SR_SocketData*)Uint64ToPtr(message->clientID);
1612           
1613               /* validate handler */
1614               if (!sendSock || INVALID_SOCK == sendSock->base.sock) 
1615               {
1616                   //LOGW((T("cannot send message: expired handler (msg->clientID) %p\n"), sendSock));
1617           
1618 mike  1.1         /* connection was closed - ignore message, but release handler if needed */
1619                   if (sendSock && Message_IsFinalRepsonse(message))
1620                       _Release(sendSock);
1621           
1622                   return MI_RESULT_FAILED;
1623               }
1624           
1625               /* decrement number of outstanding requests */
1626               if (Message_IsFinalRepsonse(message) && !sendSock->isConnector)
1627               {
1628                   DEBUG_ASSERT(sendSock->refcounter > 1);
1629                   _Release(sendSock);
1630               }
1631           
1632           
1633               /* add message to the list */
1634               List_Append(&sendSock->head, &sendSock->tail, (ListElem*)message);
1635           
1636               if (-1 != sendSock->sendingQueueLength)
1637                   sendSock->sendingQueueLength++;
1638           
1639 mike  1.1     Message_AddRef(message);
1640           
1641               _PrepareMessageForSending(sendSock);
1642               
1643               if (!_RequestCallbackWrite(sendSock) && !sendSock->isConnector)
1644               {
1645                   //LOGW((T("cannot send message: queue overflow) %p\n"), sendSock));
1646                   _RemoveAllMessages(sendSock);
1647                   return MI_RESULT_FAILED;
1648               }
1649           
1650               {
1651                   int counter = 0;
1652           
1653                   while (sendSock->sendingQueueLength > SR_SOCKET_OUT_QUEUE_WATERMARK_CRITICAL)
1654                   {
1655                       _RequestCallbackWrite(sendSock);
1656           
1657                       counter++;
1658                       /* give system a chance to clear backlog */
1659                       Sleep_ms(1);
1660 mike  1.1 
1661                       if (counter > 40000)
1662                       {
1663                           LOGW((T("cannot send message: queue overflow) %p\n"), sendSock));
1664                           _RemoveAllMessages(sendSock);
1665                           return MI_RESULT_FAILED;
1666                       }
1667                   }
1668               }
1669               return MI_RESULT_OK;
1670           }
1671           
1672           /* Signature must not have return type so we created this wrapper */
1673           static void _SendIN_IO_thread_wrapper(void* self, Message* message)
1674           {
1675               MI_Result r;
1676               r = _SendIN_IO_thread(self, message);
1677           
1678               /* ATTN: log failed result? */
1679           }
1680           
1681 mike  1.1 MI_Result Protocol_Send(
1682               Protocol* self,
1683               Message* message)
1684           {
1685               return Selector_CallInIOThread(
1686                   self->selector, _SendIN_IO_thread_wrapper, self, message );
1687           }

ViewCVS 0.9.2