/* **============================================================================== ** ** Open Management Infrastructure (OMI) ** ** Copyright (c) Microsoft Corporation ** ** Licensed under the Apache License, Version 2.0 (the "License"); you may not ** use this file except in compliance with the License. You may obtain a copy ** of the License at ** ** http://www.apache.org/licenses/LICENSE-2.0 ** ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABLITY OR NON-INFRINGEMENT. ** ** See the Apache 2 License for the specific language governing permissions ** and limitations under the License. ** **============================================================================== */ #include #include #include #include #include #include #include #include #include #include #include /* Include network headers */ #if defined(CONFIG_OS_WINDOWS) # include #elif defined(CONFIG_POSIX) # include # include # include # include # include # include # include # include # include # include #endif #define BUFFER_SIZE 256 #define DATA_SIZE 100000 using namespace std; static bool s_done = false; static bool s_timeout_called = false; static bool s_stoprunning_called = false; // client callback data static bool s_cln_connected = false; static vector s_cln_out; static vector s_cln_in; static vector s_data; static Handler* s_cln_h; // server callback static vector s_srv_data; static Handler* s_srv_h; #if defined(_MSC_VER) #undef BEGIN_EXTERNC #undef END_EXTERNC #define BEGIN_EXTERNC #define END_EXTERNC #endif static void setUp() { s_timeout_called = false; s_stoprunning_called = false; s_data.clear(); s_cln_connected = false; s_cln_out.clear(); s_cln_in.clear(); s_srv_data.clear(); s_srv_h = 0; s_cln_h = 0; } static void cleanup() { } static void TestAddr() { /* sockaddr_in must fit within an Addr object */ { size_t x = sizeof(struct sockaddr_in); size_t y = sizeof(Addr); UT_ASSERT(x <= y); } } static void GenRandData(vector& data) { data.clear(); data.reserve(DATA_SIZE); for (size_t i = 0; i < DATA_SIZE; i++) { data.push_back(char(rand())); } } BEGIN_EXTERNC static MI_Boolean ClientCallback( Selector* sel, Handler* handler, MI_Uint32 mask, MI_Uint64 currentTimeUsec) { MI_Result r; sel=sel; currentTimeUsec = currentTimeUsec; s_cln_h = handler; /* Process write event */ if (mask & SELECTOR_WRITE) { for (;;) { if (!s_cln_connected) { // Generate random data sequence. GenRandData(s_data); s_cln_out = s_data; // Watch for READ and WRITE events. handler->mask = SELECTOR_READ|SELECTOR_WRITE; s_cln_connected = true; //break; } // Write out data to server. if (s_cln_out.size()) { size_t n = 0; size_t m = 7 + (int)(rand() % 256); if (m > s_cln_out.size()) m = s_cln_out.size(); r = Sock_Write(handler->sock, &s_cln_out[0], m, &n); s_cln_out.erase(s_cln_out.begin(), s_cln_out.begin() + n); #if defined(TRACE_IO) printf("CLIENT.WROTE[%u]\n", n); #endif // If all data written, then quit watching for write. if (s_cln_out.size() == 0) { handler->mask = SELECTOR_READ; } if (r == MI_RESULT_WOULD_BLOCK) break; UT_ASSERT(r == MI_RESULT_OK); } else break; } } /* Process read event */ if (mask & SELECTOR_READ) { for (;;) { char buf[BUFFER_SIZE]; size_t n = 0; r = Sock_Read(handler->sock, buf, sizeof(buf), &n); s_cln_in.insert(s_cln_in.end(), buf, buf + n); #if defined(TRACE_IO) printf("CLIENT.READ[%u]\n", n); #endif // All data has been written (check whether it is identical). if (s_cln_in.size() == s_data.size()) { UT_ASSERT(s_cln_in == s_data); return MI_FALSE; } if (r == MI_RESULT_WOULD_BLOCK) break; UT_ASSERT(r == MI_RESULT_OK); break; } } if (mask & SELECTOR_REMOVE) { Sock_Close(handler->sock); free(handler); handler = 0; } if (mask & SELECTOR_DESTROY) { if (handler) { Sock_Close(handler->sock); free(handler); } } return MI_TRUE; } END_EXTERNC BEGIN_EXTERNC static MI_Boolean ServerCallback( Selector* sel, Handler* handler, MI_Uint32 mask, MI_Uint64 currentTimeUsec) { char buf[BUFFER_SIZE]; MI_Result r; size_t n = 0; sel=sel; currentTimeUsec = currentTimeUsec; s_srv_h = handler; // Process READ events. if (mask & SELECTOR_READ) { for (;;) { // Read request: n = 0; r = Sock_Read(handler->sock, buf, sizeof(buf), &n); s_srv_data.insert(s_srv_data.end(), buf, buf + n); #if defined(TRACE_IO) printf("SERVER.READ[%u]\n", n); #endif if (r == MI_RESULT_WOULD_BLOCK) break; UT_ASSERT(r == MI_RESULT_OK); // Did client close connection? if (n == 0) { return MI_FALSE; } // Save incoming data. // Now solicit read and write events. handler->mask = SELECTOR_READ|SELECTOR_WRITE; //break; // ATTN: WINDOWS FEATURE! // if event 'write' was already set and we have nothing to write // new event will never arrive // so we need to try to write data once it's available mask |= SELECTOR_WRITE; } } // Process WRITE events. if (mask & SELECTOR_WRITE) { for (;;) { n = 0; size_t m = 7 + (int)(rand() % 256); if (m >= s_srv_data.size()) m = s_srv_data.size(); // Write response: r = Sock_Write(handler->sock, &s_srv_data[0], m, &n); s_srv_data.erase(s_srv_data.begin(), s_srv_data.begin() + n); #if defined(TRACE_IO) printf("SERVER.WRITE[%u]\n", n); #endif if (s_srv_data.size() == 0) { /* Watch for read events (but not write events) */ handler->mask = SELECTOR_READ; break; } if (r == MI_RESULT_WOULD_BLOCK) break; UT_ASSERT(r == MI_RESULT_OK); //break; } } if (mask & SELECTOR_REMOVE) { r = Sock_Close(handler->sock); UT_ASSERT(r == MI_RESULT_OK); s_done = true; free(handler); handler = 0; } if (mask & SELECTOR_DESTROY) { if (handler) { r = Sock_Close(handler->sock); free(handler); } } return MI_TRUE; } END_EXTERNC BEGIN_EXTERNC static MI_Boolean ListenCallback( Selector* sel, Handler* handler, MI_Uint32 mask, MI_Uint64 currentTimeUsec) { MI_Result r; Sock s; Addr addr; currentTimeUsec = currentTimeUsec; if (mask & SELECTOR_READ) { r = Sock_Accept(handler->sock, &s, &addr); UT_ASSERT(r == MI_RESULT_OK); // Set to non-blocking I/O. r = Sock_SetBlocking(s, MI_FALSE); UT_ASSERT(r == MI_RESULT_OK); // Create handler for new client. { Handler* h; h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h != NULL); h->sock = s; h->mask = SELECTOR_READ; h->callback = ServerCallback; r = Selector_AddHandler(sel, h); UT_ASSERT(r == MI_RESULT_OK); } } if (mask & SELECTOR_REMOVE) { Sock_Close(handler->sock); free(handler); handler = 0; } if (mask & SELECTOR_DESTROY) { if (handler) { Sock_Close(handler->sock); free(handler); } } return MI_TRUE; } END_EXTERNC BEGIN_EXTERNC static MI_Boolean TimeoutCallback( Selector* sel, Handler* handler, MI_Uint32 mask, MI_Uint64 currentTimeUsec) { sel = sel; if (mask & (SELECTOR_READ | SELECTOR_WRITE)) { UT_ASSERT_FAILED_MSG("TimeoutCallback: unexpected message!"); } if (mask & SELECTOR_ADD) { handler->fireTimeoutAt = currentTimeUsec + 2000; } if (mask & SELECTOR_TIMEOUT) { s_timeout_called = true; return MI_FALSE; } if (mask & (SELECTOR_REMOVE | SELECTOR_DESTROY)) { if (INVALID_SOCK != handler->sock) Sock_Close(handler->sock); free(handler); } return MI_TRUE; } END_EXTERNC BEGIN_EXTERNC static MI_Boolean _StopRunningCallback( Selector* sel, Handler* handler, MI_Uint32 mask, MI_Uint64 currentTimeUsec) { if (mask & SELECTOR_READ) { Sock s; Addr addr; Sock_Accept(handler->sock, &s, &addr); if (INVALID_SOCK != s) Sock_Close(s); Selector_StopRunning(sel); s_stoprunning_called = true; return MI_TRUE; } if (mask & SELECTOR_WRITE) { Selector_StopRunning(sel); s_stoprunning_called = true; return MI_TRUE; } if (mask & SELECTOR_ADD) { handler->fireTimeoutAt = currentTimeUsec + 1000; } if (mask & SELECTOR_TIMEOUT) { handler->fireTimeoutAt = 0; Selector_StopRunning(sel); s_stoprunning_called = true; return MI_TRUE; } if (mask & (SELECTOR_REMOVE | SELECTOR_DESTROY)) { if (INVALID_SOCK != handler->sock) Sock_Close(handler->sock); free(handler); } return MI_TRUE; } END_EXTERNC static unsigned short PORT = ut::getUnittestPortNumber() + 1; static Sock CreateClientSocket() { Sock sock; Addr addr; MI_Result r; // Initialize address (connect using loopback). r = Addr_Init(&addr, "127.0.0.1", PORT); UT_ASSERT(r == MI_RESULT_OK); // Create client socket. r = Sock_Create(&sock); UT_ASSERT(r == MI_RESULT_OK); // Set to non-blocking I/O. r = Sock_SetBlocking(sock, MI_FALSE); // Listen to server. r = Sock_Connect(sock, &addr); UT_ASSERT(r == MI_RESULT_OK || r == MI_RESULT_WOULD_BLOCK); return sock; } static Sock CreateListenerSock() { Sock sock; Addr addr; MI_Result r; // Initialize listener address and port. Addr_InitAny(&addr, PORT); // Create server listener socket. r = Sock_CreateListener(&sock, &addr); UT_ASSERT(r == MI_RESULT_OK); return sock; } /* **============================================================================== ** ** TestClientServer() ** ** This test creates a TCP client and TCP server that runs in the same ** thread. The client connects to the server and sends a long message ** using non-blocking I/O. The server echos the message back. Once the ** entire message has been echoed back, both the client and the server ** exit and the test is finished. ** **============================================================================== */ void TestClientServer() { Selector sel; Sock lsock; Sock csock; MI_Result r; // Initialize the network: Sock_Start(); // Initialize the selector object. Selector_Init(&sel); // Create listener socket: lsock = CreateListenerSock(); // Add handler for listener socket. { Handler* h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h); h->sock = lsock; h->mask = SELECTOR_READ; h->callback = ListenCallback; // Watch for connection requests. r = Selector_AddHandler(&sel, h); UT_ASSERT(r == MI_RESULT_OK); } // Create client socket and attempt connect: csock = CreateClientSocket(); // Add handler for client socket: { Handler* h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h); h->sock = csock; h->mask = SELECTOR_WRITE; h->callback = ClientCallback; // Watch for write events (indicating that connection is ready). r = Selector_AddHandler(&sel, h); UT_ASSERT(r == MI_RESULT_OK); } // Cancel this loop after N seconds. MI_Uint64 deadline; r = Time_Now(&deadline); UT_ASSERT(r == MI_RESULT_OK); deadline += 10*1000000; // Wait for connections. while (!s_done) { MI_Uint64 now; r = Time_Now(&now); UT_ASSERT(r == MI_RESULT_OK); if (now >= deadline) { if (s_srv_h) cout << "srv mask " << s_srv_h->mask << "; "; if (s_cln_h) cout << "cln mask " << s_cln_h->mask << "; "; cout << "out size " << s_cln_out.size() << "; in size " << s_cln_in.size() << "; data size " << s_data.size() << "; srv data size " << s_srv_data.size() << endl; UT_ASSERT_FAILED_MSG("Timed out!"); break; } r = Selector_Run(&sel, 1000*1000); UT_ASSERT(r == MI_RESULT_OK || r == MI_RESULT_TIME_OUT); } // Verify that client had received all data back UT_ASSERT( s_cln_in == s_data ); // Destroy the selector. Selector_Destroy(&sel); // Shutdown the network. Sock_Stop(); } static void TestTimeNow() { MI_Uint64 t1 = 0, t2 = 0; UT_ASSERT(MI_RESULT_OK == Time_Now(&t1)); // sleep 1 ms ut::sleep_ms( 1 ); UT_ASSERT(MI_RESULT_OK == Time_Now(&t2)); // expect difference to be 0.1 - 1 sec if ((t2-t1) <= 100 || (t2-t1) >= 1000000) std::cout << "warning: unexpected time diff; t1 " << t1 << "; t2 " << t2 << endl; // With VM-driven environment, timing becomes very tricky and un-reliable UT_ASSERT(t2 > t1); } static void TestSelectorTimeoutEvent() { Selector sel; Sock lsock; MI_Result r; // Initialize the network: Sock_Start(); // Initialize the selector object. Selector_Init(&sel); // Create listener socket: lsock = CreateListenerSock(); // Add handler for listener socket. { Handler* h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h); h->sock = lsock; h->mask = SELECTOR_READ; h->callback = TimeoutCallback; // Watch for connection requests. // 'add' will set timeout for 2 ms; r = Selector_AddHandler(&sel, h); UT_ASSERT(r == MI_RESULT_OK); } r = Selector_Run(&sel, 1000*1000); UT_ASSERT(s_timeout_called); // 'timeout' event on socekt is triggered na dremoves socket, so // selector returns 'failed' UT_ASSERT(r == MI_RESULT_FAILED); // Destroy the selector. Selector_Destroy(&sel); // Shutdown the network. Sock_Stop(); } static void TestSelectorTimeoutWithNoSocketHandler() { Selector sel; MI_Result r; // Initialize the network: Sock_Start(); // Initialize the selector object. Selector_Init(&sel); // Add handler for listener socket. { Handler* h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h); h->sock = INVALID_SOCK; h->callback = TimeoutCallback; // 'add' will set timeout for 2 ms; r = Selector_AddHandler(&sel, h); UT_ASSERT(r == MI_RESULT_OK); } s_timeout_called = false; r = Selector_Run(&sel, 1000*1000); UT_ASSERT(s_timeout_called); // 'timeout' event on socekt is triggered na dremoves socket, so // selector returns 'failed' UT_ASSERT(r == MI_RESULT_FAILED); // Destroy the selector. Selector_Destroy(&sel); // Shutdown the network. Sock_Stop(); } static void TestSelectorStopRunning() { Selector sel; MI_Result r; // Initialize the network: Sock_Start(); // Initialize the selector object. Selector_Init(&sel); // Add that will call 'stop-running' from handler { Handler* h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h); h->sock = INVALID_SOCK; h->callback = _StopRunningCallback; // 'add' will set timeout for 2 ms; r = Selector_AddHandler(&sel, h); UT_ASSERT(r == MI_RESULT_OK); } r = Selector_Run(&sel, 60 * 1000*1000); UT_ASSERT(s_stoprunning_called); // Stop-running call should return OK UT_ASSERT(r == MI_RESULT_OK); // Destroy the selector. Selector_Destroy(&sel); // Shutdown the network. Sock_Stop(); } BEGIN_EXTERNC static void* MI_CALL _ConnectorThread(void* ) { // Initialize the network: Sock_Start(); Sock s = SockConnectLocal(PORT); Sock_Close(s); // Shutdown the network. Sock_Stop(); return 0; } END_EXTERNC static void TestSelectorWithIndefiniteTimeout() { Selector sel; MI_Result r; // Initialize the network: Sock_Start(); // Initialize the selector object. Selector_Init(&sel); // Add that will call 'stop-running' from handler { Handler* h = (Handler*)calloc(1, sizeof(Handler)); UT_ASSERT(h); h->sock = CreateListenerSock();; h->mask = SELECTOR_READ; h->callback = _StopRunningCallback; // 'add' will set timeout for 2 ms; r = Selector_AddHandler(&sel, h); UT_ASSERT(r == MI_RESULT_OK); } ThreadHandle t; // create a connector thread UT_ASSERT(MI_RESULT_OK == Thread_Create( _ConnectorThread, 0, &t)); r = Selector_Run(&sel, TIME_NEVER); UT_ASSERT(s_stoprunning_called); // Stop-running call should return OK UT_ASSERT(r == MI_RESULT_OK); UT_ASSERT(MI_RESULT_OK == Thread_Destroy( t, MI_TRUE)); // Destroy the selector. Selector_Destroy(&sel); // Shutdown the network. Sock_Stop(); } #if defined(_MSC_VER) /* Strange, but for this particular file VS compiler thinks that 'catch' after 'try {setup()}' is unreachable (which is true) */ /* warning C4702: unreachable code */ # pragma warning(disable : 4702) #endif /* _MSC_VER */ static void RunTests() { UT_TEST(TestAddr); UT_TEST(TestClientServer); UT_TEST(TestTimeNow); UT_TEST(TestSelectorTimeoutEvent); UT_TEST(TestSelectorTimeoutWithNoSocketHandler); UT_TEST(TestSelectorStopRunning); UT_TEST(TestSelectorWithIndefiniteTimeout); } UT_ENTRY_POINT(RunTests);