1 mike 1.1 /*
2 **==============================================================================
3 **
4 ** Open Management Infrastructure (OMI)
5 **
6 ** Copyright (c) Microsoft Corporation
7 **
8 ** Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 ** use this file except in compliance with the License. You may obtain a copy
10 ** of the License at
11 **
12 ** http://www.apache.org/licenses/LICENSE-2.0
13 **
14 ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
16 ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
17 ** MERCHANTABLITY OR NON-INFRINGEMENT.
18 **
19 ** See the Apache 2 License for the specific language governing permissions
20 ** and limitations under the License.
21 **
22 mike 1.1 **==============================================================================
23 */
24
25 #include <assert.h>
26 #include "protocol.h"
27 #include "addr.h"
28 #include "sock.h"
29 #include "selector.h"
30 #include "header.h"
31 #include "thread.h"
32 #include <base/buf.h>
33 #include <base/log.h>
34 #include <base/result.h>
35 #include <base/user.h>
36 #include <base/io.h>
37
38 #define T MI_T
39
40 #if 0
41 #define ENABLE_TRACING
42 #endif
43 mike 1.1
44 #ifdef ENABLE_TRACING
45 #define PRINTF(a) printf a
46 #else
47 #define PRINTF(a)
48 #endif
49
50 /*
51 **==============================================================================
52 **
53 ** Local definitions:
54 **
55 **==============================================================================
56 */
57
58 static const MI_Uint32 _MAGIC = 0xC764445E;
59 #define SR_SOCKET_OUT_QUEUE_WATERMARK_LOW 4
60 #define SR_SOCKET_OUT_QUEUE_WATERMARK_HIGH 12
61 #define SR_SOCKET_OUT_QUEUE_WATERMARK_CRITICAL 256
62
63 typedef enum _Protocol_AuthState
64 mike 1.1 {
65 /* authentication failed (intentionaly takes value '0')*/
66 PRT_AUTH_FAILED,
67
68 /* listener (server) waits for connect request */
69 PRT_AUTH_WAIT_CONNECTION_REQUEST,
70
71 /* listener (server) waits for second connect request with random data from file */
72 PRT_AUTH_WAIT_CONNECTION_REQUEST_WITH_FILE_DATA,
73
74 /* connector (client) waits for server's response */
75 PRT_AUTH_WAIT_CONNECTION_RESPONSE,
76
77 /* authentication completed */
78 PRT_AUTH_OK
79 }
80 Protocol_AuthState;
81
82 typedef enum _Protocol_Type
83 {
84 PRT_TYPE_LISTENER,
85 mike 1.1 PRT_TYPE_CONNECTOR,
86 PRT_TYPE_FROM_SOCKET
87 }
88 Protocol_Type;
89
90 struct _Protocol
91 {
92 MI_Uint32 magic;
93 Selector internal_selector;
94 Selector* selector;
95 Addr addr;
96 struct _Protocol_SR_SocketData* connectorHandle;
97 ProtocolCallback callback;
98 void* callbackData;
99 ProtocolEventCallback eventCallback;
100 void* eventCallbackData;
101 Protocol_Type type;
102 MI_Boolean internal_selector_used;
103 /* Indicates whether instance has to be upacked or stored as byte array */
104 MI_Boolean skipInstanceUnpack;
105 };
106 mike 1.1
107 /* Keeps data for file-based authentication */
108 typedef struct _Protocol_AuthData
109 {
110 char path[MAX_PATH_SIZE];
111 char authRandom[AUTH_RANDOM_DATA_SIZE];
112 }
113 Protocol_AuthData;
114
115 typedef struct _Protocol_SR_SocketData
116 {
117 /* based member*/
118 Handler base;
119
120 /* sending data */
121 /* Linked list of messages to send */
122 ListElem* head;
123 ListElem* tail;
124
125 /* ref counter
126 NOTE:
127 mike 1.1 socket may be disconnected, but structure is still alive
128 if some outstanding request are not completed.
129 refcounter gets '+1' for being connected and '+1' for each outstanding request */
130 int refcounter;
131
132 /* currently sending message */
133 Message* message;
134 size_t sentCurrentBlockBytes;
135 int sendingPageIndex; /* 0 for header otherwise 1-N page index */
136 int sendingQueueLength; /* number of messages in list to send; -1 to disbale watermarks */
137
138 /* receiving data */
139 Batch *receivingBatch;
140 size_t receivedCurrentBlockBytes;
141 int receivingPageIndex; /* 0 for header otherwise 1-N page index */
142
143 /* send/recv buffers */
144 Header recv_buffer;
145 Header send_buffer;
146
147 /* Auth state */
148 mike 1.1 Protocol_AuthState authState;
149 /* server side - auhtenticated user's ids */
150 uid_t uid;
151 gid_t gid;
152 Protocol_AuthData* authData;
153
154 /* Whether this is a connector */
155 MI_Boolean isConnector;
156
157 /* Whether connection has been established */
158 MI_Boolean isConnected;
159
160 }
161 Protocol_SR_SocketData;
162
163 /* helper functions result */
164 typedef enum _Protocol_CallbackResult
165 {
166 PRT_CONTINUE,
167 PRT_RETURN_TRUE,
168 PRT_RETURN_FALSE
169 mike 1.1 }
170 Protocol_CallbackResult;
171
172 /* Forward declaration */
173 static void _PrepareMessageForSending(
174 Protocol_SR_SocketData *handler);
175
176 static MI_Boolean _RequestCallbackWrite(
177 Protocol_SR_SocketData* handler);
178
179 /**************** Auth-support **********************************************************/
180 /* remove auth file and free auth data */
181 static void _FreeAuthData(
182 Protocol_SR_SocketData* h)
183 {
184 if (h->authData)
185 {
186 #if defined(CONFIG_POSIX)
187 unlink(h->authData->path);
188 #endif
189 free(h->authData);
190 mike 1.1 h->authData = 0;
191 }
192 }
193
194 /* Creates and sends authentication request message */
195 static MI_Boolean _SendAuthRequest(
196 Protocol_SR_SocketData* h,
197 const char* user,
198 const char* password,
199 const char* fileContent)
200 {
201 BinProtocolNotification* req;
202
203 req = BinProtocolNotification_New(BinNotificationConnectRequest);
204
205 if (!req)
206 return MI_FALSE;
207
208 if (user && *user)
209 req->user = Batch_Strdup(req->base.batch, user);
210
211 mike 1.1 if (password && *password)
212 req->password = Batch_Strdup(req->base.batch, password);
213
214 req->uid = geteuid();
215 req->gid = getegid();
216
217 if (fileContent)
218 {
219 memcpy(req->authData, fileContent, sizeof(req->authData));
220 }
221
222 /* send message */
223 {
224 /* add message to the list */
225 List_Prepend(&h->head, &h->tail, (ListElem*)req);
226
227 if (-1 != h->sendingQueueLength)
228 h->sendingQueueLength++;
229
230 Message_AddRef(&req->base);
231
232 mike 1.1 _PrepareMessageForSending(h);
233 _RequestCallbackWrite(h);
234 }
235
236 BinProtocolNotification_Release(req);
237
238 return MI_TRUE;
239 }
240
241 static MI_Boolean _SendAuthResponse(
242 Protocol_SR_SocketData* h,
243 MI_Result result,
244 const char* path)
245 {
246 BinProtocolNotification* req;
247
248 req = BinProtocolNotification_New(BinNotificationConnectResponse);
249
250 if (!req)
251 return MI_FALSE;
252
253 mike 1.1 req->result = result;
254 if (path && *path)
255 req->authFile = Batch_Strdup(req->base.batch, path);
256
257 /* send message */
258 {
259 /* add message to the list */
260 List_Prepend(&h->head, &h->tail, (ListElem*)req);
261
262 if (-1 != h->sendingQueueLength)
263 h->sendingQueueLength++;
264
265 Message_AddRef(&req->base);
266
267 _PrepareMessageForSending(h);
268 _RequestCallbackWrite(h);
269 }
270
271 BinProtocolNotification_Release(req);
272
273 return MI_TRUE;
274 mike 1.1 }
275
276 /*
277 Processes auht message while waiting second connect request
278 with content of the file.
279 Updates auth states correspondingly.
280 Parameters:
281 handler - socket handler
282 binMsg - BinProtocolNotification message with connect request/response
283
284 Return:
285 "TRUE" if connection should stay open; "FALSE" if auth failed
286 and conneciton should be closed immediately
287 */
288 static MI_Boolean _ProcessAuthMessageWaitingConnectRequestFileData(
289 Protocol_SR_SocketData* handler,
290 BinProtocolNotification* binMsg)
291 {
292 /* un-expected message */
293 if (BinNotificationConnectRequest != binMsg->type)
294 return MI_FALSE;
295 mike 1.1
296 /* Check internal state */
297 if (!handler->authData)
298 return MI_FALSE;
299
300 if (0 == memcmp(binMsg->authData, handler->authData->authRandom, AUTH_RANDOM_DATA_SIZE))
301 {
302 if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
303 return MI_FALSE;
304
305 /* Auth ok */
306 handler->authState = PRT_AUTH_OK;
307 _FreeAuthData(handler);
308
309 /* Get gid from user name */
310 if (0 != GetUserGidByUid(handler->uid, &handler->gid))
311 {
312 LOGW_CHAR(("cannot get user's gid for uid %d", (int)handler->uid));
313 return MI_FALSE;
314 }
315
316 mike 1.1 return MI_TRUE;
317 }
318
319 LOGW_CHAR(("auth failed - random data mismatch"));
320
321 /* Auth failed */
322 _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
323 handler->authState = PRT_AUTH_FAILED;
324 return MI_FALSE;
325 }
326
327 /*
328 Processes auht message while waiting connect request
329 Updates auth states correspondingly.
330 Parameters:
331 handler - socket handler
332 binMsg - BinProtocolNotification message with connect request/response
333
334 Return:
335 "TRUE" if connection should stay open; "FALSE" if auth failed
336 and conneciton should be closed immediately
337 mike 1.1 */
338 static MI_Boolean _ProcessAuthMessageWaitingConnectRequest(
339 Protocol_SR_SocketData* handler,
340 BinProtocolNotification* binMsg)
341 {
342 /* un-expected message */
343 if (BinNotificationConnectRequest != binMsg->type)
344 return MI_FALSE;
345
346 /* Use explicit credentials if provided */
347 if (binMsg->user)
348 {
349 /* use empty password if not set */
350 if (!binMsg->password)
351 binMsg->password = "";
352
353 if ( 0 == AuthenticateUser(binMsg->user, binMsg->password) &&
354 0 == LookupUser(binMsg->user, &handler->uid, &handler->gid))
355 {
356 if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
357 return MI_FALSE;
358 mike 1.1
359 /* Auth ok */
360 handler->authState = PRT_AUTH_OK;
361 _FreeAuthData(handler);
362 return MI_TRUE;
363 }
364
365 LOGW_CHAR(("auth failed for user [%s]", binMsg->user));
366
367 /* Auth failed */
368 _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
369 handler->authState = PRT_AUTH_FAILED;
370 return MI_FALSE;
371 }
372
373 /* If system supports connection-based auth, use it for
374 implicit auth */
375 if (0 == GetUIDByConnection((int)handler->base.sock, &handler->uid, &handler->gid))
376 {
377 if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
378 return MI_FALSE;
379 mike 1.1
380 /* Auth ok */
381 handler->authState = PRT_AUTH_OK;
382 return MI_TRUE;
383 }
384 #if defined(CONFIG_OS_WINDOWS)
385 /* ignore auth on Windows */
386 {
387 if (!_SendAuthResponse(handler, MI_RESULT_OK, NULL))
388 return MI_FALSE;
389
390 /* Auth ok */
391 handler->uid = -1;
392 handler->gid = -1;
393 handler->authState = PRT_AUTH_OK;
394 return MI_TRUE;
395 }
396 #else
397
398 /* If valid uid provided, try implicit credentials (file-based)
399 gid will be taken from user name */
400 mike 1.1 {
401 handler->authData = (Protocol_AuthData*)calloc(1, sizeof(Protocol_AuthData));
402
403 if (!handler->authData)
404 {
405 /* Auth failed */
406 _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
407 handler->authState = PRT_AUTH_FAILED;
408 return MI_FALSE;
409 }
410
411 if (0 != CreateAuthFile(binMsg->uid, handler->authData->authRandom, AUTH_RANDOM_DATA_SIZE, handler->authData->path))
412 {
413 LOGW_CHAR(("cannot create file for user uid [%d]", (int)binMsg->uid));
414
415 /* Auth failed */
416 _SendAuthResponse(handler, MI_RESULT_ACCESS_DENIED, NULL);
417 handler->authState = PRT_AUTH_FAILED;
418 return MI_FALSE;
419 }
420
421 mike 1.1 /* send file name to the client */
422 if (!_SendAuthResponse(handler, MI_RESULT_IN_PROGRESS, handler->authData->path))
423 return MI_FALSE;
424
425 /* Auth posponed */
426 handler->authState = PRT_AUTH_WAIT_CONNECTION_REQUEST_WITH_FILE_DATA;
427
428 /* Remember uid we used to create file */
429 handler->uid = binMsg->uid;
430 handler->gid = -1;
431
432 return MI_TRUE;
433
434 }
435 #endif
436 }
437
438 /*
439 Processes auht message (either connect request or connect-response)
440 Updates auth states correspondingly.
441 Parameters:
442 mike 1.1 handler - socket handler
443 msg - BinProtocolNotification message with connect request/response
444
445 Return:
446 "TRUE" if connection should stay open; "FALSE" if auth failed
447 and conneciton should be closed immediately
448 */
449 static MI_Boolean _ProcessAuthMessage(
450 Protocol_SR_SocketData* handler,
451 Message *msg)
452 {
453 BinProtocolNotification* binMsg;
454
455 if (msg->tag != BinProtocolNotificationTag)
456 return MI_FALSE;
457
458 binMsg = (BinProtocolNotification*) msg;
459
460 /* server waiting client's first request? */
461 if (PRT_AUTH_WAIT_CONNECTION_REQUEST == handler->authState)
462 {
463 mike 1.1 return _ProcessAuthMessageWaitingConnectRequest(handler, binMsg);
464 }
465
466 /* server waiting for client's file's content request? */
467 if (PRT_AUTH_WAIT_CONNECTION_REQUEST_WITH_FILE_DATA == handler->authState)
468 {
469 return _ProcessAuthMessageWaitingConnectRequestFileData(handler, binMsg);
470 }
471
472 /* client waiting for server's response? */
473 if (PRT_AUTH_WAIT_CONNECTION_RESPONSE == handler->authState)
474 {
475 /* un-expected message */
476 if (BinNotificationConnectResponse != binMsg->type)
477 return MI_FALSE;
478
479 if (binMsg->result == MI_RESULT_OK)
480 {
481 handler->authState = PRT_AUTH_OK;
482
483 /* process backlog items (if any) */
484 mike 1.1 _PrepareMessageForSending(handler);
485 _RequestCallbackWrite(handler);
486 return MI_TRUE;
487 }
488 else if (binMsg->result == MI_RESULT_IN_PROGRESS && binMsg->authFile)
489 {
490 /* send back file's content */
491 char buf[AUTH_RANDOM_DATA_SIZE];
492 FILE* is = Fopen(binMsg->authFile, "r");
493
494 if (!is)
495 {
496 LOGE_CHAR(("cannot open auth data file: %s", binMsg->authFile));
497 return MI_FALSE;
498 }
499
500 /* Read auth data from the file. */
501 if (sizeof(buf) != fread(buf, 1, sizeof(buf), is))
502 {
503 LOGE_CHAR(("cannot read from auth data file: %s", binMsg->authFile));
504 fclose(is);
505 mike 1.1 return MI_FALSE;
506 }
507
508 fclose(is);
509 return _SendAuthRequest(handler, 0, 0, buf);
510 }
511 else
512 {
513 /* PROTOCOLEVENT_DISCONNECT */
514 if (handler->isConnector)
515 {
516 Protocol* self = (Protocol*)handler->base.data;
517 if (self->eventCallback)
518 {
519 (*self->eventCallback)(self, handler->isConnected ? PROTOCOLEVENT_DISCONNECT : PROTOCOLEVENT_CONNECT_FAILED,
520 self->eventCallbackData);
521 }
522 handler->isConnected = MI_FALSE;
523 }
524 }
525
526 mike 1.1 return MI_FALSE;
527 }
528
529 /* unknown state? */
530 return MI_FALSE;
531 }
532
533
534 /****************************************************************************************/
535 static void _RemoveAllMessages(
536 Protocol_SR_SocketData* handler)
537 {
538 while (handler->head)
539 {
540 Message* msg = (Message*)handler->head;
541
542 List_Remove(&handler->head, &handler->tail, (ListElem*)msg);
543 Message_Release(msg);
544
545 if (-1 != handler->sendingQueueLength)
546 handler->sendingQueueLength--;
547 mike 1.1 }
548 }
549
550 static void _Release(
551 Protocol_SR_SocketData* handler)
552 {
553 if (--handler->refcounter == 0)
554 {
555 free(handler);
556 }
557 }
558
559 static void _PrepareMessageForSending(
560 Protocol_SR_SocketData *handler)
561 {
562 /* check for hi watermark */
563 if (handler->sendingQueueLength == SR_SOCKET_OUT_QUEUE_WATERMARK_HIGH)
564 handler->base.mask &= ~SELECTOR_READ;
565
566 /* check for hi watermark */
567 if (handler->sendingQueueLength == SR_SOCKET_OUT_QUEUE_WATERMARK_LOW)
568 mike 1.1 handler->base.mask |= SELECTOR_READ;
569
570 if (handler->message)
571 return; /* already sending */
572
573 if (!handler->head)
574 {
575 handler->base.mask &= ~SELECTOR_WRITE;
576 return; /*nothing to do*/
577 }
578
579 /* before auht is complete, only auht-related messages should be sent */
580 if (PRT_AUTH_OK != handler->authState && BinProtocolNotificationTag != ((Message*)handler->head)->tag)
581 {
582 handler->base.mask &= ~SELECTOR_WRITE;
583 return; /*nothing to do*/
584 }
585
586 handler->message = (Message*)handler->head;
587 List_Remove(&handler->head, &handler->tail, (ListElem*)handler->message);
588
589 mike 1.1 if (-1 != handler->sendingQueueLength)
590 handler->sendingQueueLength--;
591
592 /* reset sending attributes */
593 handler->sendingPageIndex = 0;
594 handler->sentCurrentBlockBytes = 0;
595
596 memset(&handler->send_buffer,0,sizeof(handler->send_buffer));
597 handler->send_buffer.base.magic = PROTOCOL_MAGIC;
598 handler->send_buffer.base.version = PROTOCOL_VERSION;
599 handler->send_buffer.base.pageCount = (MI_Uint32)Batch_GetPageCount(handler->message->batch);
600 handler->send_buffer.base.originalMessagePointer = handler->message;
601
602 /* ATTN! */
603 assert (handler->send_buffer.base.pageCount <= PROTOCOL_HEADER_MAX_PAGES);
604
605 /* get page info */
606
607 Batch_GetPageInfo(
608 handler->message->batch, handler->send_buffer.batchInfo);
609
610 mike 1.1 /* mark handler as 'want-write' */
611 handler->base.mask |= SELECTOR_WRITE;
612
613 }
614
615 static MI_Boolean _RequestCallbackWrite(
616 Protocol_SR_SocketData* handler)
617 {
618 /* try to write to socket as much as possible */
619 size_t sent;
620 MI_Result r;
621
622 for(;;)
623 {
624 /* buffers to write */
625 IOVec buffers[32];
626 size_t counter;
627
628 if ( !handler->message )
629 { /* nothing to send */
630 handler->base.mask &= ~SELECTOR_WRITE;
631 mike 1.1 return MI_TRUE;
632 }
633
634 for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
635 {
636 const char* buf;
637 MI_Uint32 index = (MI_Uint32)(handler->sendingPageIndex + counter);
638
639 buf = (index == 0) ?
640 &handler->send_buffer :
641 handler->send_buffer.batchInfo[index - 1].pagePointer;
642
643 if (!counter)
644 buf += handler->sentCurrentBlockBytes;
645
646 buffers[counter].ptr = (void*)buf;
647
648 buffers[counter].len = (index == 0) ? (sizeof(HeaderBase) + sizeof(Header_BatchInfoItem) * handler->send_buffer.base.pageCount)
649 : handler->send_buffer.batchInfo[index - 1].pageSize;
650
651 if (!counter)
652 mike 1.1 buffers[counter].len -= handler->sentCurrentBlockBytes;
653
654 if ( index == handler->send_buffer.base.pageCount)
655 {
656 counter++;
657 break;
658 }
659 }
660
661 sent = 0;
662
663 r = Sock_WriteV(handler->base.sock, buffers, counter, &sent);
664
665 PRINTF(("sent %d\n", sent));
666
667 if ( r == MI_RESULT_OK && 0 == sent )
668 return MI_FALSE; /* conection closed */
669
670 if ( r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK )
671 return MI_FALSE;
672
673 mike 1.1 if (!sent)
674 return MI_TRUE;
675
676 /* update index */
677 for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
678 {
679 if (!sent)
680 break;
681
682 if (sent >= buffers[counter].len)
683 {
684 sent -= buffers[counter].len;
685 handler->sendingPageIndex++;
686 handler->sentCurrentBlockBytes = 0;
687 continue;
688 }
689
690 handler->sentCurrentBlockBytes += sent;
691 break;
692 }
693
694 mike 1.1 if ( (handler->sendingPageIndex - 1) == (int)handler->send_buffer.base.pageCount )
695 {
696 PRINTF(("done with sending message tag %d\n", handler->message->tag));
697
698 /* next message */
699 Message_Release(handler->message);
700 handler->message = 0;
701
702 _PrepareMessageForSending(handler);
703 }
704 }
705 }
706
707 /*
708 Processes incoming message, including:
709 - decoding message from batch
710 - invoking callback to process message
711
712 Parameters:
713 handler - pointer to received data
714 Returns:
715 mike 1.1 it returns result if 'callback' with the followinf meaning:
716 MI_TRUE - to continue normal operations
717 MI_FALSE - to close connection
718 */
719 static MI_Boolean _ProcessReceivedMessage(
720 Protocol_SR_SocketData* handler)
721 {
722 MI_Result r;
723 Message* msg = 0;
724 Protocol* self = (Protocol*)handler->base.data;
725 MI_Boolean ret = MI_TRUE;
726
727 /* create a message from a batch */
728 r = MessageFromBatch(
729 handler->receivingBatch,
730 handler->recv_buffer.base.originalMessagePointer,
731 handler->recv_buffer.batchInfo,
732 handler->recv_buffer.base.pageCount,
733 self->skipInstanceUnpack,
734 &msg);
735
736 mike 1.1 if (MI_RESULT_OK == r)
737 {
738 PRINTF(("done with receiving message tag %d\n", msg->tag));
739
740 if (PRT_AUTH_OK != handler->authState)
741 {
742 ret = _ProcessAuthMessage(handler, msg);
743 }
744 else
745 {
746 /* attach client id */
747 msg->clientID = PtrToUint64(handler);
748
749 /* +1 for incoming request */
750 handler->refcounter++;
751
752 /* auth info */
753 msg->uid = handler->uid;
754 msg->gid = handler->gid;
755
756 /* count message in for back-pressure feature (only Instances) */
757 mike 1.1 if (PostInstanceMsgTag == msg->tag &&
758 PRT_TYPE_FROM_SOCKET == self->type)
759 Selector_NewInstanceCreated(self->selector, msg);
760
761 ret = (*self->callback)(self, msg, self->callbackData);
762 }
763
764 Message_Release(msg);
765 }
766 else
767 {
768 LOGW((T("failed to restore message %d [%s]\n"), r,
769 Result_ToString(r)));
770 Batch_Destroy( handler->receivingBatch );
771 }
772
773 /* clean up the state */
774 handler->receivingBatch = 0;
775 handler->receivingPageIndex = 0;
776 memset(&handler->recv_buffer,0,sizeof(handler->recv_buffer));
777
778 mike 1.1 return ret;
779 }
780
781
782 static Protocol_CallbackResult _ReadHeader(
783 Protocol_SR_SocketData* handler)
784 {
785 char* buf;
786 size_t buf_size, received;
787 MI_Result r;
788 MI_Uint32 index;
789
790 /* are we done with header? */
791 if (0!= handler->receivingPageIndex)
792 return PRT_CONTINUE;
793
794 for (;;)
795 {
796 buf = (char*)&handler->recv_buffer;
797 buf_size = (sizeof(HeaderBase) + sizeof(Header_BatchInfoItem) * handler->recv_buffer.base.pageCount);
798 received = 0;
799 mike 1.1
800 r = Sock_Read(handler->base.sock, buf + handler->receivedCurrentBlockBytes, buf_size - handler->receivedCurrentBlockBytes, &received);
801
802 PRINTF(("read %d\n", received));
803
804 if ( r == MI_RESULT_OK && 0 == received )
805 return PRT_RETURN_FALSE; /* conection closed */
806
807 if ( r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK )
808 return PRT_RETURN_FALSE;
809
810 if (!received)
811 return PRT_RETURN_TRUE;
812
813 handler->receivedCurrentBlockBytes += received;
814
815 if (handler->receivedCurrentBlockBytes == buf_size)
816 {
817 /* got header - validate/allocate as required */
818 if (handler->recv_buffer.base.pageCount > PROTOCOL_HEADER_MAX_PAGES)
819 return PRT_RETURN_FALSE;
820 mike 1.1
821 if (handler->recv_buffer.base.magic != PROTOCOL_MAGIC)
822 return PRT_RETURN_FALSE;
823
824 for (index =0; index < handler->recv_buffer.base.pageCount; index++)
825 {
826 if (handler->recv_buffer.batchInfo[index].pageSize > (64*1024))
827 return PRT_RETURN_FALSE;
828 }
829
830 /* check if page info is also retrieved */
831 if (buf_size != ((sizeof(HeaderBase) + sizeof(Header_BatchInfoItem) * handler->recv_buffer.base.pageCount)) )
832 continue;
833
834 /* create a batch */
835 if (!Batch_CreateBatchByPageInfo(
836 &handler->receivingBatch,
837 handler->recv_buffer.batchInfo,
838 handler->recv_buffer.base.pageCount))
839 return PRT_RETURN_FALSE;
840
841 mike 1.1 /* skip to next page */
842 handler->receivingPageIndex++;
843 handler->receivedCurrentBlockBytes = 0;
844
845 if ( (handler->receivingPageIndex - 1) == (int)handler->recv_buffer.base.pageCount )
846 { /* received the whole message - process it */
847 if (!_ProcessReceivedMessage(handler))
848 return PRT_RETURN_FALSE;
849 }
850 break;
851 } /* if we read the whole buffer */
852 } /* for(;;)*/
853 return PRT_CONTINUE;
854 }
855
856
857 static Protocol_CallbackResult _ReadAllPages(
858 Protocol_SR_SocketData* handler)
859 {
860 size_t received;
861 MI_Result r;
862 mike 1.1 /* buffers to write */
863 IOVec buffers[32];
864 size_t counter;
865
866 /* are we done with header? - if not, return 'continue' */
867 if (0== handler->receivingPageIndex)
868 return PRT_CONTINUE;
869
870
871 for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
872 {
873 const char* buf;
874 MI_Uint32 index = (MI_Uint32)(handler->receivingPageIndex + counter);
875
876 buf = Batch_GetPageByIndex(handler->receivingBatch, index - 1);
877
878 if (!counter)
879 buf += handler->receivedCurrentBlockBytes;
880
881 buffers[counter].ptr = (void*)buf;
882 buffers[counter].len = handler->recv_buffer.batchInfo[index - 1].pageSize;
883 mike 1.1
884 if (!counter)
885 buffers[counter].len -= handler->receivedCurrentBlockBytes;
886
887 if ( index == handler->recv_buffer.base.pageCount)
888 {
889 counter++;
890 break;
891 }
892 }
893
894 received = 0;
895
896 r = Sock_ReadV(handler->base.sock, buffers, counter, &received);
897
898 PRINTF(("read %d\n", received));
899
900 if ( r == MI_RESULT_OK && 0 == received )
901 return PRT_RETURN_FALSE; /* conection closed */
902
903 if ( r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK )
904 mike 1.1 return PRT_RETURN_FALSE;
905
906 if (!received)
907 return PRT_RETURN_TRUE;
908
909 /* update index */
910 for ( counter = 0; counter < MI_COUNT(buffers); counter++ )
911 {
912 if (!received)
913 break;
914
915 if (received >= buffers[counter].len)
916 {
917 received -= buffers[counter].len;
918 handler->receivingPageIndex++;
919 handler->receivedCurrentBlockBytes = 0;
920 continue;
921 }
922
923 handler->receivedCurrentBlockBytes += received;
924 break;
925 mike 1.1 }
926
927 if ( (handler->receivingPageIndex - 1) == (int)handler->recv_buffer.base.pageCount )
928 { /* received the whole message - process it */
929 if (!_ProcessReceivedMessage(handler))
930 return PRT_RETURN_FALSE;
931 }
932
933 return PRT_CONTINUE;
934 }
935
936 static MI_Boolean _RequestCallbackRead(
937 Protocol_SR_SocketData* handler)
938 {
939 int fullMessagesREceived = 0;
940
941 /* we have to keep repeating read until 'WOULD_BLOCK is returned;
942 windows does not reset event until read buffer is empty */
943 for (;fullMessagesREceived < 3;)
944 {
945 switch (_ReadHeader(handler))
946 mike 1.1 {
947 case PRT_CONTINUE: break;
948 case PRT_RETURN_TRUE: return MI_TRUE;
949 case PRT_RETURN_FALSE: return MI_FALSE;
950 }
951
952 switch (_ReadAllPages(handler))
953 {
954 case PRT_CONTINUE: break;
955 case PRT_RETURN_TRUE: return MI_TRUE;
956 case PRT_RETURN_FALSE: return MI_FALSE;
957 }
958 } /* for(;;)*/
959 return MI_TRUE;
960 }
961
962 static MI_Boolean _RequestCallback(
963 Selector* sel,
964 Handler* handlerIn,
965 MI_Uint32 mask,
966 MI_Uint64 currentTimeUsec)
967 mike 1.1 {
968 Protocol_SR_SocketData* handler = (Protocol_SR_SocketData*)handlerIn;
969
970 MI_UNUSED(sel);
971 MI_UNUSED(currentTimeUsec);
972
973 if (mask & SELECTOR_READ)
974 {
975 if (!_RequestCallbackRead(handler))
976 {
977 /* PROTOCOLEVENT_DISCONNECT */
978 if (handler->isConnector)
979 {
980 Protocol* self = (Protocol*)handler->base.data;
981 if (self->eventCallback)
982 {
983 (*self->eventCallback)(self, handler->isConnected ? PROTOCOLEVENT_DISCONNECT : PROTOCOLEVENT_CONNECT_FAILED,
984 self->eventCallbackData);
985 }
986 handler->isConnected = MI_FALSE;
987 }
988 mike 1.1 goto closeConnection;
989 }
990 else if (handler->isConnector && !handler->isConnected)
991 {
992 Protocol* self = (Protocol*)handler->base.data;
993 if (self->eventCallback)
994 {
995 (*self->eventCallback)(self, PROTOCOLEVENT_CONNECT,
996 self->eventCallbackData);
997 }
998 handler->isConnected = MI_TRUE;
999 }
1000 }
1001
1002 if (mask & SELECTOR_WRITE)
1003 {
1004 if (!_RequestCallbackWrite(handler))
1005 {
1006 /* PROTOCOLEVENT_DISCONNECT */
1007 if (handler->isConnector && handler->isConnected)
1008 {
1009 mike 1.1 Protocol* self = (Protocol*)handler->base.data;
1010 if (self->eventCallback)
1011 {
1012 (*self->eventCallback)(self, PROTOCOLEVENT_DISCONNECT,
1013 self->eventCallbackData);
1014 }
1015 handler->isConnected = MI_FALSE;
1016 }
1017 goto closeConnection;
1018 }
1019 else if (handler->isConnector && !handler->isConnected)
1020 {
1021 Protocol* self = (Protocol*)handler->base.data;
1022 if (self->eventCallback)
1023 {
1024 (*self->eventCallback)(self, PROTOCOLEVENT_CONNECT,
1025 self->eventCallbackData);
1026 }
1027 handler->isConnected = MI_TRUE;
1028 }
1029 }
1030 mike 1.1
1031 /* Close connection by timeout */
1032 if (mask & SELECTOR_TIMEOUT)
1033 return MI_FALSE;
1034
1035 if ((mask & SELECTOR_REMOVE) != 0 ||
1036 (mask & SELECTOR_DESTROY) != 0)
1037 {
1038 Protocol* self = (Protocol*)handler->base.data;
1039
1040 _FreeAuthData(handler);
1041
1042 /* free outstanding messages, batch */
1043 if (handler->receivingBatch)
1044 Batch_Destroy( handler->receivingBatch );
1045
1046 handler->receivingBatch = 0;
1047
1048 if (handler->message)
1049 Message_Release(handler->message);
1050
1051 mike 1.1 handler->message = 0;
1052
1053 _RemoveAllMessages(handler);
1054
1055 Sock_Close(handler->base.sock);
1056
1057 /* Mark handler as closed */
1058 handler->base.sock = INVALID_SOCK;
1059
1060 /* if connection sokcet was released, invalidate pointer to it */
1061 if (self && handler == self->connectorHandle)
1062 self->connectorHandle = 0;
1063
1064 if (handler->isConnector)
1065 free(handler);
1066 else
1067 _Release(handler);
1068 }
1069
1070 return MI_TRUE;
1071
1072 mike 1.1 closeConnection:
1073
1074 PRINTF(("LOG: closed client connection\n"));
1075
1076 return MI_FALSE;
1077 }
1078
1079 static MI_Boolean _ListenerCallback(
1080 Selector* sel,
1081 Handler* handler,
1082 MI_Uint32 mask,
1083 MI_Uint64 currentTimeUsec)
1084 {
1085 Protocol* self = (Protocol*)handler->data;
1086 MI_Result r;
1087 Sock s;
1088 Addr addr;
1089 Protocol_SR_SocketData* h;
1090
1091 sel=sel;
1092 mask=mask;
1093 mike 1.1 currentTimeUsec = currentTimeUsec;
1094
1095 if (mask & SELECTOR_READ)
1096 {
1097 /* Accept the incoming connection */
1098 r = Sock_Accept(handler->sock, &s, &addr);
1099
1100 if (MI_RESULT_WOULD_BLOCK == r)
1101 return MI_TRUE;
1102
1103 if (r != MI_RESULT_OK)
1104 {
1105 LOGW((T("Sock_Accept() failed; err %d\n"), Sock_GetLastError()));
1106 return MI_TRUE;
1107 }
1108
1109 r = Sock_SetBlocking(s, MI_FALSE);
1110 if (r != MI_RESULT_OK)
1111 {
1112 LOGW((T("Sock_SetBlocking() failed\n")));
1113 Sock_Close(s);
1114 mike 1.1 return MI_TRUE;
1115 }
1116
1117 /* Create handler */
1118 h = (Protocol_SR_SocketData*)calloc(1, sizeof(Protocol_SR_SocketData));
1119
1120 if (!h)
1121 {
1122 Sock_Close(s);
1123 return MI_TRUE;
1124 }
1125
1126 h->base.sock = s;
1127 h->base.mask = SELECTOR_READ | SELECTOR_EXCEPTION;
1128 h->base.callback = _RequestCallback;
1129 h->base.data = self;
1130
1131 /* get '1' for connected */
1132 h->refcounter = 1;
1133
1134 /* waiting for connect-request */
1135 mike 1.1 h->authState = PRT_AUTH_WAIT_CONNECTION_REQUEST;
1136
1137 /* Watch for read events on the incoming connection */
1138 r = Selector_AddHandler(self->selector, &h->base);
1139
1140 if (r != MI_RESULT_OK)
1141 {
1142 LOGW((T("Selector_AddHandler() failed\n")));
1143 return MI_TRUE;
1144 }
1145 }
1146
1147 if ((mask & SELECTOR_REMOVE) != 0 ||
1148 (mask & SELECTOR_DESTROY) != 0)
1149 {
1150 Sock_Close(handler->sock);
1151 free(handler);
1152 }
1153
1154 return MI_TRUE;
1155 }
1156 mike 1.1
1157 static MI_Result _CreateListener(
1158 Sock* s,
1159 const char* locator)
1160 {
1161 const char* posColon;
1162
1163 posColon = strchr(locator, ':');
1164
1165 if (!posColon)
1166 return Sock_CreateLocalListener(s, locator);
1167
1168 /* create listener for remote address like host:port or :port (ANYADDR) */
1169 {
1170 unsigned short port = (unsigned short)atol(posColon+1);
1171 char host[128];
1172 unsigned int len = (unsigned int)(posColon - locator);
1173 Addr addr;
1174 MI_Result r;
1175
1176 if (len > 0)
1177 mike 1.1 {
1178 if (len >= sizeof(host))
1179 return MI_RESULT_FAILED;
1180
1181 memcpy(host, locator, len);
1182 host[len] = 0;
1183
1184 // Initialize address.
1185 r = Addr_Init(&addr, host, port);
1186 if (r != MI_RESULT_OK)
1187 return MI_RESULT_FAILED;
1188 }
1189 else
1190 {
1191 Addr_InitAny(&addr, port);
1192 }
1193
1194 return Sock_CreateListener(s, &addr);
1195 }
1196 }
1197
1198 mike 1.1 static MI_Result _CreateConnector(
1199 Sock* s,
1200 const char* locator)
1201 {
1202 const char* posColon;
1203
1204 posColon = strchr(locator, ':');
1205
1206 if (!posColon)
1207 return Sock_CreateLocalConnector(s, locator);
1208
1209 /* create connector to remote address like host:port */
1210 {
1211 unsigned short port = (unsigned short)atol(posColon+1);
1212 char host[128];
1213 unsigned int len = (unsigned int)(posColon - locator);
1214 Addr addr;
1215 MI_Result r;
1216
1217 if (len >= sizeof(host))
1218 return MI_RESULT_FAILED;
1219 mike 1.1
1220 memcpy(host, locator, len);
1221 host[len] = 0;
1222
1223 // Initialize address.
1224 r = Addr_Init(&addr, host, port);
1225 if (r != MI_RESULT_OK)
1226 return MI_RESULT_FAILED;
1227
1228 // Create client socket.
1229 r = Sock_Create(s);
1230 if (r != MI_RESULT_OK)
1231 {
1232 Sock_Close(*s);
1233 return MI_RESULT_FAILED;
1234 }
1235
1236 r = Sock_SetBlocking(*s, MI_FALSE);
1237 if (r != MI_RESULT_OK)
1238 {
1239 Sock_Close(*s);
1240 mike 1.1 return MI_RESULT_FAILED;
1241 }
1242
1243 // Connect to server.
1244 r = Sock_Connect(*s, &addr);
1245 if (r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK)
1246 {
1247 Sock_Close(*s);
1248 return MI_RESULT_FAILED;
1249 }
1250 return r;
1251 }
1252 }
1253
1254 static MI_Result _New_Protocol(
1255 Protocol** selfOut,
1256 Selector* selector, /*optional, maybe NULL*/
1257 ProtocolCallback callback,
1258 void* callbackData,
1259 ProtocolEventCallback eventCallback,
1260 void* eventCallbackData)
1261 mike 1.1 {
1262 Protocol* self;
1263
1264 /* Check parameters */
1265 if (!selfOut)
1266 return MI_RESULT_INVALID_PARAMETER;
1267
1268 /* Clear output parameter */
1269 *selfOut = NULL;
1270
1271 /* Allocate structure */
1272 {
1273 self = (Protocol*)calloc(1, sizeof(Protocol));
1274
1275 if (!self)
1276 return MI_RESULT_FAILED;
1277 }
1278
1279 if (selector)
1280 { /* attach the exisiting selector */
1281 self->selector = selector;
1282 mike 1.1 self->internal_selector_used = MI_FALSE;
1283 }
1284 else
1285 { /* creaet a new selector */
1286 /* Initialize the network */
1287 Sock_Start();
1288
1289 /* Initialize the selector */
1290 if (Selector_Init(&self->internal_selector) != MI_RESULT_OK)
1291 {
1292 free(self);
1293 return MI_RESULT_FAILED;
1294 }
1295 self->selector = &self->internal_selector;
1296 self->internal_selector_used = MI_TRUE;
1297 }
1298
1299 /* Save the callback and callbackData */
1300 self->callback = callback;
1301 self->callbackData = callbackData;
1302 self->eventCallback = eventCallback;
1303 mike 1.1 self->eventCallbackData = eventCallbackData;
1304
1305 /* Set the magic number */
1306 self->magic = _MAGIC;
1307
1308 /* Set output parameter */
1309 *selfOut = self;
1310 return MI_RESULT_OK;
1311 }
1312
1313 /*
1314 **==============================================================================
1315 **
1316 ** Public definitions:
1317 **
1318 **==============================================================================
1319 */
1320
1321 MI_Result Protocol_New_Listener(
1322 Protocol** selfOut,
1323 Selector* selector, /*optional, maybe NULL*/
1324 mike 1.1 const char* locator,
1325 ProtocolCallback callback,
1326 void* callbackData)
1327 {
1328 Protocol* self;
1329 MI_Result r;
1330 Sock listener;
1331
1332 r = _New_Protocol(selfOut, selector, callback, callbackData, NULL, NULL);
1333
1334 if (MI_RESULT_OK != r)
1335 return r;
1336
1337 self = *selfOut;
1338
1339 self->type = PRT_TYPE_LISTENER;
1340
1341 /* Create listener socket */
1342 {
1343 r = _CreateListener(&listener, locator);
1344
1345 mike 1.1 if (r != MI_RESULT_OK)
1346 {
1347 Protocol_Delete(self);
1348 return r;
1349 }
1350
1351 r = Sock_SetBlocking(listener, MI_FALSE);
1352
1353 if (r != MI_RESULT_OK)
1354 {
1355 Sock_Close(listener);
1356 Protocol_Delete(self);
1357 return r;
1358 }
1359 }
1360
1361 /* Watch for read events on the listener socket (client connections) */
1362 {
1363 Handler* h = (Handler*)calloc(1, sizeof(Handler));
1364
1365 if (!h)
1366 mike 1.1 {
1367 Sock_Close(listener);
1368 Protocol_Delete(self);
1369 return MI_RESULT_FAILED;
1370 }
1371
1372 h->sock = listener;
1373 h->mask = SELECTOR_READ | SELECTOR_EXCEPTION;
1374 h->callback = _ListenerCallback;
1375 h->data = self;
1376
1377 r = Selector_AddHandler(self->selector, h);
1378
1379 if (r != MI_RESULT_OK)
1380 {
1381 Sock_Close(listener);
1382 free(h);
1383 Protocol_Delete(self);
1384 return r;
1385 }
1386 }
1387 mike 1.1
1388 return MI_RESULT_OK;
1389 }
1390
1391 MI_Result Protocol_New_Connector(
1392 Protocol** selfOut,
1393 Selector* selector, /*optional, maybe NULL*/
1394 const char* locator,
1395 ProtocolCallback callback,
1396 void* callbackData,
1397 ProtocolEventCallback eventCallback,
1398 void* eventCallbackData,
1399 const char* user,
1400 const char* password)
1401 {
1402 Protocol* self;
1403 MI_Result r;
1404 Sock connector;
1405
1406 r = _New_Protocol(selfOut, selector, callback, callbackData,
1407 eventCallback, eventCallbackData);
1408 mike 1.1
1409 if (MI_RESULT_OK != r)
1410 return r;
1411
1412 self = *selfOut;
1413 *selfOut = 0;
1414
1415 self->type = PRT_TYPE_CONNECTOR;
1416
1417 /* Create connector socket */
1418 {
1419 // Connect to server.
1420 r = _CreateConnector(&connector, locator);
1421 if (r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK)
1422 {
1423 Protocol_Delete(self);
1424 return MI_RESULT_FAILED;
1425 }
1426 }
1427
1428 /* Allocating connector's structure */
1429 mike 1.1 {
1430 Protocol_SR_SocketData* h = (Protocol_SR_SocketData*)calloc(1, sizeof(Protocol_SR_SocketData));
1431
1432
1433 if (!h)
1434 {
1435 Sock_Close(connector);
1436 Protocol_Delete(self);
1437 return MI_RESULT_FAILED;
1438 }
1439
1440 h->base.sock = connector;
1441 h->base.mask = SELECTOR_READ | SELECTOR_WRITE | SELECTOR_EXCEPTION;
1442 h->base.callback = _RequestCallback;
1443 h->base.data = self;
1444 h->sendingQueueLength = -1; /* disable watermarks for client */
1445 h->isConnector = MI_TRUE;
1446 h->isConnected = MI_FALSE;
1447 h->authState = PRT_AUTH_WAIT_CONNECTION_RESPONSE;
1448
1449 r = Selector_AddHandler(self->selector, &h->base);
1450 mike 1.1
1451 if (r != MI_RESULT_OK)
1452 {
1453 Sock_Close(connector);
1454 Protocol_Delete(self);
1455 free(h);
1456 return MI_RESULT_FAILED;
1457 }
1458 self->connectorHandle = h;
1459
1460 /* send connect request */
1461 if (!_SendAuthRequest(h, user, password, NULL))
1462 {
1463 /* remove handler will free 'h' pointer */
1464 Selector_RemoveHandler(self->selector, &h->base);
1465 Protocol_Delete(self);
1466 return MI_RESULT_FAILED;
1467 }
1468 }
1469
1470 /* Set output parameter */
1471 mike 1.1 *selfOut = self;
1472 return MI_RESULT_OK;
1473 }
1474
1475 MI_Result Protocol_New_From_Socket(
1476 Protocol** selfOut,
1477 Selector* selector, /*optional, maybe NULL*/
1478 Sock s,
1479 MI_Boolean skipInstanceUnpack,
1480 ProtocolCallback callback,
1481 void* callbackData,
1482 ProtocolEventCallback eventCallback,
1483 void* eventCallbackData)
1484 {
1485 Protocol* self;
1486 MI_Result r;
1487
1488 r = _New_Protocol(selfOut, selector, callback, callbackData,
1489 eventCallback, eventCallbackData);
1490
1491 if (MI_RESULT_OK != r)
1492 mike 1.1 return r;
1493
1494 self = *selfOut;
1495 *selfOut = 0;
1496
1497 self->type = PRT_TYPE_FROM_SOCKET;
1498
1499 self->skipInstanceUnpack = skipInstanceUnpack;
1500
1501 /* Attach provided socket to connector */
1502 {
1503 Protocol_SR_SocketData* h = (Protocol_SR_SocketData*)calloc(1, sizeof(Protocol_SR_SocketData));
1504
1505 if (!h)
1506 {
1507 Protocol_Delete(self);
1508 return MI_RESULT_FAILED;
1509 }
1510
1511 h->base.sock = s;
1512 h->base.mask = SELECTOR_READ | SELECTOR_EXCEPTION;
1513 mike 1.1
1514 if (skipInstanceUnpack)
1515 {
1516 /* skipInstanceUnpack indicates that call made from server
1517 and socket connected to the agent
1518 In that case we can use back=pressure feature and
1519 ignore socket operations under stress */
1520 h->base.mask |= SELECTOR_IGNORE_READ_OVERLOAD;
1521 }
1522
1523 h->base.callback = _RequestCallback;
1524 h->base.data = self;
1525 h->isConnector = MI_TRUE;
1526 h->isConnected = MI_TRUE;
1527 /* skip authentication for established connections
1528 (only used in server/agent communication) */
1529 h->authState = PRT_AUTH_OK;
1530
1531 r = Selector_AddHandler(self->selector, &h->base);
1532
1533 if (r != MI_RESULT_OK)
1534 mike 1.1 {
1535 Protocol_Delete(self);
1536 free(h);
1537 return MI_RESULT_FAILED;
1538 }
1539 self->connectorHandle = h;
1540 }
1541
1542 /* Set output parameter */
1543 *selfOut = self;
1544 return MI_RESULT_OK;
1545 }
1546
1547 MI_Result Protocol_Delete(
1548 Protocol* self)
1549 {
1550 /* Check parameters */
1551 if (!self)
1552 return MI_RESULT_INVALID_PARAMETER;
1553
1554 /* Check magic number */
1555 mike 1.1 if (self->magic != _MAGIC)
1556 return MI_RESULT_INVALID_PARAMETER;
1557
1558 if (self->internal_selector_used)
1559 {
1560 /* Release selector;
1561 Note: selector-destory closes all sockects in a list including connector and listener */
1562 Selector_Destroy(self->selector);
1563
1564 /* Shutdown the network */
1565 Sock_Stop();
1566 }
1567
1568 /* if connector, invalide 'self' pointr in connector */
1569 if (self->connectorHandle)
1570 self->connectorHandle->base.data = 0;
1571
1572 /* Clear magic number */
1573 self->magic = 0xDDDDDDDD;
1574
1575 /* Free self pointer */
1576 mike 1.1 free(self);
1577
1578
1579 return MI_RESULT_OK;
1580 }
1581
1582 MI_Result Protocol_Run(
1583 Protocol* self,
1584 MI_Uint64 timeoutUsec)
1585 {
1586 /* Run the selector */
1587 return Selector_Run(self->selector, timeoutUsec);
1588 }
1589
1590 static MI_Result _SendIN_IO_thread(
1591 void* self_,
1592 Message* message)
1593 {
1594 Protocol* self = (Protocol*)self_;
1595 Protocol_SR_SocketData* sendSock;
1596
1597 mike 1.1 /* check params */
1598 if (!self || !message )
1599 return MI_RESULT_INVALID_PARAMETER;
1600
1601 if (self->magic != _MAGIC)
1602 {
1603 LOGW((T("_SendIN_IO_thread: invalid magic!") ));
1604 return MI_RESULT_INVALID_PARAMETER;
1605 }
1606
1607 /* find where to send it */
1608 if (self->connectorHandle)
1609 sendSock = self->connectorHandle;
1610 else
1611 sendSock = (Protocol_SR_SocketData*)Uint64ToPtr(message->clientID);
1612
1613 /* validate handler */
1614 if (!sendSock || INVALID_SOCK == sendSock->base.sock)
1615 {
1616 //LOGW((T("cannot send message: expired handler (msg->clientID) %p\n"), sendSock));
1617
1618 mike 1.1 /* connection was closed - ignore message, but release handler if needed */
1619 if (sendSock && Message_IsFinalRepsonse(message))
1620 _Release(sendSock);
1621
1622 return MI_RESULT_FAILED;
1623 }
1624
1625 /* decrement number of outstanding requests */
1626 if (Message_IsFinalRepsonse(message) && !sendSock->isConnector)
1627 {
1628 DEBUG_ASSERT(sendSock->refcounter > 1);
1629 _Release(sendSock);
1630 }
1631
1632
1633 /* add message to the list */
1634 List_Append(&sendSock->head, &sendSock->tail, (ListElem*)message);
1635
1636 if (-1 != sendSock->sendingQueueLength)
1637 sendSock->sendingQueueLength++;
1638
1639 mike 1.1 Message_AddRef(message);
1640
1641 _PrepareMessageForSending(sendSock);
1642
1643 if (!_RequestCallbackWrite(sendSock) && !sendSock->isConnector)
1644 {
1645 //LOGW((T("cannot send message: queue overflow) %p\n"), sendSock));
1646 _RemoveAllMessages(sendSock);
1647 return MI_RESULT_FAILED;
1648 }
1649
1650 {
1651 int counter = 0;
1652
1653 while (sendSock->sendingQueueLength > SR_SOCKET_OUT_QUEUE_WATERMARK_CRITICAL)
1654 {
1655 _RequestCallbackWrite(sendSock);
1656
1657 counter++;
1658 /* give system a chance to clear backlog */
1659 Sleep_ms(1);
1660 mike 1.1
1661 if (counter > 40000)
1662 {
1663 LOGW((T("cannot send message: queue overflow) %p\n"), sendSock));
1664 _RemoveAllMessages(sendSock);
1665 return MI_RESULT_FAILED;
1666 }
1667 }
1668 }
1669 return MI_RESULT_OK;
1670 }
1671
1672 /* Signature must not have return type so we created this wrapper */
1673 static void _SendIN_IO_thread_wrapper(void* self, Message* message)
1674 {
1675 MI_Result r;
1676 r = _SendIN_IO_thread(self, message);
1677
1678 /* ATTN: log failed result? */
1679 }
1680
1681 mike 1.1 MI_Result Protocol_Send(
1682 Protocol* self,
1683 Message* message)
1684 {
1685 return Selector_CallInIOThread(
1686 self->selector, _SendIN_IO_thread_wrapper, self, message );
1687 }
|