/*
**==============================================================================
**
** Open Management Infrastructure (OMI)
**
** Copyright (c) Microsoft Corporation
**
** Licensed under the Apache License, Version 2.0 (the "License"); you may not
** use this file except in compliance with the License. You may obtain a copy
** of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
** MERCHANTABLITY OR NON-INFRINGEMENT.
**
** See the Apache 2 License for the specific language governing permissions
** and limitations under the License.
**
**==============================================================================
*/
#include <vector>
#include <cstdlib>
#include <iostream>
#include <ut/ut.h>
#include <unittest/utils.h>
#include <protocol/addr.h>
#include <base/time.h>
#include <protocol/thread.h>
#include <base/result.h>
#include <protocol/sock.h>
#include <protocol/selector.h>
/* Include network headers */
#if defined(CONFIG_OS_WINDOWS)
# include <winsock2.h>
#elif defined(CONFIG_POSIX)
# include <unistd.h>
# include <errno.h>
# include <sys/socket.h>
# include <netinet/tcp.h>
# include <netinet/in.h>
# include <sys/time.h>
# include <sys/types.h>
# include <netdb.h>
# include <fcntl.h>
# include <arpa/inet.h>
#endif
#define BUFFER_SIZE 256
#define DATA_SIZE 100000
using namespace std;
static bool s_done = false;
static bool s_timeout_called = false;
static bool s_stoprunning_called = false;
// client callback data
static bool s_cln_connected = false;
static vector<char> s_cln_out;
static vector<char> s_cln_in;
static vector<char> s_data;
static Handler* s_cln_h;
// server callback
static vector<char> s_srv_data;
static Handler* s_srv_h;
#if defined(_MSC_VER)
#undef BEGIN_EXTERNC
#undef END_EXTERNC
#define BEGIN_EXTERNC
#define END_EXTERNC
#endif
static void setUp()
{
s_timeout_called = false;
s_stoprunning_called = false;
s_data.clear();
s_cln_connected = false;
s_cln_out.clear();
s_cln_in.clear();
s_srv_data.clear();
s_srv_h = 0;
s_cln_h = 0;
}
static void cleanup()
{
}
static void TestAddr()
{
/* sockaddr_in must fit within an Addr object */
{
size_t x = sizeof(struct sockaddr_in);
size_t y = sizeof(Addr);
UT_ASSERT(x <= y);
}
}
static void GenRandData(vector<char>& data)
{
data.clear();
data.reserve(DATA_SIZE);
for (size_t i = 0; i < DATA_SIZE; i++)
{
data.push_back(char(rand()));
}
}
BEGIN_EXTERNC
static MI_Boolean ClientCallback(
Selector* sel,
Handler* handler,
MI_Uint32 mask,
MI_Uint64 currentTimeUsec)
{
MI_Result r;
sel=sel;
currentTimeUsec = currentTimeUsec;
s_cln_h = handler;
/* Process write event */
if (mask & SELECTOR_WRITE)
{
for (;;)
{
if (!s_cln_connected)
{
// Generate random data sequence.
GenRandData(s_data);
s_cln_out = s_data;
// Watch for READ and WRITE events.
handler->mask = SELECTOR_READ|SELECTOR_WRITE;
s_cln_connected = true;
//break;
}
// Write out data to server.
if (s_cln_out.size())
{
size_t n = 0;
size_t m = 7 + (int)(rand() % 256);
if (m > s_cln_out.size())
m = s_cln_out.size();
r = Sock_Write(handler->sock, &s_cln_out[0], m, &n);
s_cln_out.erase(s_cln_out.begin(), s_cln_out.begin() + n);
#if defined(TRACE_IO)
printf("CLIENT.WROTE[%u]\n", n);
#endif
// If all data written, then quit watching for write.
if (s_cln_out.size() == 0)
{
handler->mask = SELECTOR_READ;
}
if (r == MI_RESULT_WOULD_BLOCK)
break;
UT_ASSERT(r == MI_RESULT_OK);
}
else
break;
}
}
/* Process read event */
if (mask & SELECTOR_READ)
{
for (;;)
{
char buf[BUFFER_SIZE];
size_t n = 0;
r = Sock_Read(handler->sock, buf, sizeof(buf), &n);
s_cln_in.insert(s_cln_in.end(), buf, buf + n);
#if defined(TRACE_IO)
printf("CLIENT.READ[%u]\n", n);
#endif
// All data has been written (check whether it is identical).
if (s_cln_in.size() == s_data.size())
{
UT_ASSERT(s_cln_in == s_data);
return MI_FALSE;
}
if (r == MI_RESULT_WOULD_BLOCK)
break;
UT_ASSERT(r == MI_RESULT_OK);
break;
}
}
if (mask & SELECTOR_REMOVE)
{
Sock_Close(handler->sock);
free(handler);
handler = 0;
}
if (mask & SELECTOR_DESTROY)
{
if (handler)
{
Sock_Close(handler->sock);
free(handler);
}
}
return MI_TRUE;
}
END_EXTERNC
BEGIN_EXTERNC
static MI_Boolean ServerCallback(
Selector* sel,
Handler* handler,
MI_Uint32 mask,
MI_Uint64 currentTimeUsec)
{
char buf[BUFFER_SIZE];
MI_Result r;
size_t n = 0;
sel=sel;
currentTimeUsec = currentTimeUsec;
s_srv_h = handler;
// Process READ events.
if (mask & SELECTOR_READ)
{
for (;;)
{
// Read request:
n = 0;
r = Sock_Read(handler->sock, buf, sizeof(buf), &n);
s_srv_data.insert(s_srv_data.end(), buf, buf + n);
#if defined(TRACE_IO)
printf("SERVER.READ[%u]\n", n);
#endif
if (r == MI_RESULT_WOULD_BLOCK)
break;
UT_ASSERT(r == MI_RESULT_OK);
// Did client close connection?
if (n == 0)
{
return MI_FALSE;
}
// Save incoming data.
// Now solicit read and write events.
handler->mask = SELECTOR_READ|SELECTOR_WRITE;
//break;
// ATTN: WINDOWS FEATURE!
// if event 'write' was already set and we have nothing to write
// new event will never arrive
// so we need to try to write data once it's available
mask |= SELECTOR_WRITE;
}
}
// Process WRITE events.
if (mask & SELECTOR_WRITE)
{
for (;;)
{
n = 0;
size_t m = 7 + (int)(rand() % 256);
if (m >= s_srv_data.size())
m = s_srv_data.size();
// Write response:
r = Sock_Write(handler->sock, &s_srv_data[0], m, &n);
s_srv_data.erase(s_srv_data.begin(), s_srv_data.begin() + n);
#if defined(TRACE_IO)
printf("SERVER.WRITE[%u]\n", n);
#endif
if (s_srv_data.size() == 0)
{
/* Watch for read events (but not write events) */
handler->mask = SELECTOR_READ;
break;
}
if (r == MI_RESULT_WOULD_BLOCK)
break;
UT_ASSERT(r == MI_RESULT_OK);
//break;
}
}
if (mask & SELECTOR_REMOVE)
{
r = Sock_Close(handler->sock);
UT_ASSERT(r == MI_RESULT_OK);
s_done = true;
free(handler);
handler = 0;
}
if (mask & SELECTOR_DESTROY)
{
if (handler)
{
r = Sock_Close(handler->sock);
free(handler);
}
}
return MI_TRUE;
}
END_EXTERNC
BEGIN_EXTERNC
static MI_Boolean ListenCallback(
Selector* sel,
Handler* handler,
MI_Uint32 mask,
MI_Uint64 currentTimeUsec)
{
MI_Result r;
Sock s;
Addr addr;
currentTimeUsec = currentTimeUsec;
if (mask & SELECTOR_READ)
{
r = Sock_Accept(handler->sock, &s, &addr);
UT_ASSERT(r == MI_RESULT_OK);
// Set to non-blocking I/O.
r = Sock_SetBlocking(s, MI_FALSE);
UT_ASSERT(r == MI_RESULT_OK);
// Create handler for new client.
{
Handler* h;
h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h != NULL);
h->sock = s;
h->mask = SELECTOR_READ;
h->callback = ServerCallback;
r = Selector_AddHandler(sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
}
if (mask & SELECTOR_REMOVE)
{
Sock_Close(handler->sock);
free(handler);
handler = 0;
}
if (mask & SELECTOR_DESTROY)
{
if (handler)
{
Sock_Close(handler->sock);
free(handler);
}
}
return MI_TRUE;
}
END_EXTERNC
BEGIN_EXTERNC
static MI_Boolean TimeoutCallback(
Selector* sel,
Handler* handler,
MI_Uint32 mask,
MI_Uint64 currentTimeUsec)
{
sel = sel;
if (mask & (SELECTOR_READ | SELECTOR_WRITE))
{
UT_ASSERT_FAILED_MSG("TimeoutCallback: unexpected message!");
}
if (mask & SELECTOR_ADD)
{
handler->fireTimeoutAt = currentTimeUsec + 2000;
}
if (mask & SELECTOR_TIMEOUT)
{
s_timeout_called = true;
return MI_FALSE;
}
if (mask & (SELECTOR_REMOVE | SELECTOR_DESTROY))
{
if (INVALID_SOCK != handler->sock)
Sock_Close(handler->sock);
free(handler);
}
return MI_TRUE;
}
END_EXTERNC
BEGIN_EXTERNC
static MI_Boolean _StopRunningCallback(
Selector* sel,
Handler* handler,
MI_Uint32 mask,
MI_Uint64 currentTimeUsec)
{
if (mask & SELECTOR_READ)
{
Sock s;
Addr addr;
Sock_Accept(handler->sock, &s, &addr);
if (INVALID_SOCK != s)
Sock_Close(s);
Selector_StopRunning(sel);
s_stoprunning_called = true;
return MI_TRUE;
}
if (mask & SELECTOR_WRITE)
{
Selector_StopRunning(sel);
s_stoprunning_called = true;
return MI_TRUE;
}
if (mask & SELECTOR_ADD)
{
handler->fireTimeoutAt = currentTimeUsec + 1000;
}
if (mask & SELECTOR_TIMEOUT)
{
handler->fireTimeoutAt = 0;
Selector_StopRunning(sel);
s_stoprunning_called = true;
return MI_TRUE;
}
if (mask & (SELECTOR_REMOVE | SELECTOR_DESTROY))
{
if (INVALID_SOCK != handler->sock)
Sock_Close(handler->sock);
free(handler);
}
return MI_TRUE;
}
END_EXTERNC
static unsigned short PORT = ut::getUnittestPortNumber() + 1;
static Sock CreateClientSocket()
{
Sock sock;
Addr addr;
MI_Result r;
// Initialize address (connect using loopback).
r = Addr_Init(&addr, "127.0.0.1", PORT);
UT_ASSERT(r == MI_RESULT_OK);
// Create client socket.
r = Sock_Create(&sock);
UT_ASSERT(r == MI_RESULT_OK);
// Set to non-blocking I/O.
r = Sock_SetBlocking(sock, MI_FALSE);
// Listen to server.
r = Sock_Connect(sock, &addr);
UT_ASSERT(r == MI_RESULT_OK || r == MI_RESULT_WOULD_BLOCK);
return sock;
}
static Sock CreateListenerSock()
{
Sock sock;
Addr addr;
MI_Result r;
// Initialize listener address and port.
Addr_InitAny(&addr, PORT);
// Create server listener socket.
r = Sock_CreateListener(&sock, &addr);
UT_ASSERT(r == MI_RESULT_OK);
return sock;
}
/*
**==============================================================================
**
** TestClientServer()
**
** This test creates a TCP client and TCP server that runs in the same
** thread. The client connects to the server and sends a long message
** using non-blocking I/O. The server echos the message back. Once the
** entire message has been echoed back, both the client and the server
** exit and the test is finished.
**
**==============================================================================
*/
void TestClientServer()
{
Selector sel;
Sock lsock;
Sock csock;
MI_Result r;
// Initialize the network:
Sock_Start();
// Initialize the selector object.
Selector_Init(&sel);
// Create listener socket:
lsock = CreateListenerSock();
// Add handler for listener socket.
{
Handler* h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h);
h->sock = lsock;
h->mask = SELECTOR_READ;
h->callback = ListenCallback;
// Watch for connection requests.
r = Selector_AddHandler(&sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
// Create client socket and attempt connect:
csock = CreateClientSocket();
// Add handler for client socket:
{
Handler* h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h);
h->sock = csock;
h->mask = SELECTOR_WRITE;
h->callback = ClientCallback;
// Watch for write events (indicating that connection is ready).
r = Selector_AddHandler(&sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
// Cancel this loop after N seconds.
MI_Uint64 deadline;
r = Time_Now(&deadline);
UT_ASSERT(r == MI_RESULT_OK);
deadline += 10*1000000;
// Wait for connections.
while (!s_done)
{
MI_Uint64 now;
r = Time_Now(&now);
UT_ASSERT(r == MI_RESULT_OK);
if (now >= deadline)
{
if (s_srv_h)
cout << "srv mask " << s_srv_h->mask << "; ";
if (s_cln_h)
cout << "cln mask " << s_cln_h->mask << "; ";
cout << "out size " << s_cln_out.size() <<
"; in size " << s_cln_in.size() <<
"; data size " << s_data.size() <<
"; srv data size " << s_srv_data.size() << endl;
UT_ASSERT_FAILED_MSG("Timed out!");
break;
}
r = Selector_Run(&sel, 1000*1000);
UT_ASSERT(r == MI_RESULT_OK || r == MI_RESULT_TIME_OUT);
}
// Verify that client had received all data back
UT_ASSERT( s_cln_in == s_data );
// Destroy the selector.
Selector_Destroy(&sel);
// Shutdown the network.
Sock_Stop();
}
static void TestTimeNow()
{
MI_Uint64 t1 = 0, t2 = 0;
UT_ASSERT(MI_RESULT_OK == Time_Now(&t1));
// sleep 1 ms
ut::sleep_ms( 1 );
UT_ASSERT(MI_RESULT_OK == Time_Now(&t2));
// expect difference to be 0.1 - 1 sec
if ((t2-t1) <= 100 || (t2-t1) >= 1000000)
std::cout << "warning: unexpected time diff; t1 " << t1 << "; t2 " << t2 << endl;
// With VM-driven environment, timing becomes very tricky and un-reliable
UT_ASSERT(t2 > t1);
}
static void TestSelectorTimeoutEvent()
{
Selector sel;
Sock lsock;
MI_Result r;
// Initialize the network:
Sock_Start();
// Initialize the selector object.
Selector_Init(&sel);
// Create listener socket:
lsock = CreateListenerSock();
// Add handler for listener socket.
{
Handler* h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h);
h->sock = lsock;
h->mask = SELECTOR_READ;
h->callback = TimeoutCallback;
// Watch for connection requests.
// 'add' will set timeout for 2 ms;
r = Selector_AddHandler(&sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
r = Selector_Run(&sel, 1000*1000);
UT_ASSERT(s_timeout_called);
// 'timeout' event on socekt is triggered na dremoves socket, so
// selector returns 'failed'
UT_ASSERT(r == MI_RESULT_FAILED);
// Destroy the selector.
Selector_Destroy(&sel);
// Shutdown the network.
Sock_Stop();
}
static void TestSelectorTimeoutWithNoSocketHandler()
{
Selector sel;
MI_Result r;
// Initialize the network:
Sock_Start();
// Initialize the selector object.
Selector_Init(&sel);
// Add handler for listener socket.
{
Handler* h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h);
h->sock = INVALID_SOCK;
h->callback = TimeoutCallback;
// 'add' will set timeout for 2 ms;
r = Selector_AddHandler(&sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
s_timeout_called = false;
r = Selector_Run(&sel, 1000*1000);
UT_ASSERT(s_timeout_called);
// 'timeout' event on socekt is triggered na dremoves socket, so
// selector returns 'failed'
UT_ASSERT(r == MI_RESULT_FAILED);
// Destroy the selector.
Selector_Destroy(&sel);
// Shutdown the network.
Sock_Stop();
}
static void TestSelectorStopRunning()
{
Selector sel;
MI_Result r;
// Initialize the network:
Sock_Start();
// Initialize the selector object.
Selector_Init(&sel);
// Add that will call 'stop-running' from handler
{
Handler* h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h);
h->sock = INVALID_SOCK;
h->callback = _StopRunningCallback;
// 'add' will set timeout for 2 ms;
r = Selector_AddHandler(&sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
r = Selector_Run(&sel, 60 * 1000*1000);
UT_ASSERT(s_stoprunning_called);
// Stop-running call should return OK
UT_ASSERT(r == MI_RESULT_OK);
// Destroy the selector.
Selector_Destroy(&sel);
// Shutdown the network.
Sock_Stop();
}
BEGIN_EXTERNC
static void* MI_CALL _ConnectorThread(void* )
{
// Initialize the network:
Sock_Start();
Sock s = SockConnectLocal(PORT);
Sock_Close(s);
// Shutdown the network.
Sock_Stop();
return 0;
}
END_EXTERNC
static void TestSelectorWithIndefiniteTimeout()
{
Selector sel;
MI_Result r;
// Initialize the network:
Sock_Start();
// Initialize the selector object.
Selector_Init(&sel);
// Add that will call 'stop-running' from handler
{
Handler* h = (Handler*)calloc(1, sizeof(Handler));
UT_ASSERT(h);
h->sock = CreateListenerSock();;
h->mask = SELECTOR_READ;
h->callback = _StopRunningCallback;
// 'add' will set timeout for 2 ms;
r = Selector_AddHandler(&sel, h);
UT_ASSERT(r == MI_RESULT_OK);
}
ThreadHandle t;
// create a connector thread
UT_ASSERT(MI_RESULT_OK == Thread_Create(
_ConnectorThread, 0, &t));
r = Selector_Run(&sel, TIME_NEVER);
UT_ASSERT(s_stoprunning_called);
// Stop-running call should return OK
UT_ASSERT(r == MI_RESULT_OK);
UT_ASSERT(MI_RESULT_OK == Thread_Destroy( t, MI_TRUE));
// Destroy the selector.
Selector_Destroy(&sel);
// Shutdown the network.
Sock_Stop();
}
#if defined(_MSC_VER)
/* Strange, but for this particular file VS compiler
thinks that 'catch' after 'try {setup()}' is unreachable
(which is true) */
/* warning C4702: unreachable code */
# pragma warning(disable : 4702)
#endif /* _MSC_VER */
static void RunTests()
{
UT_TEST(TestAddr);
UT_TEST(TestClientServer);
UT_TEST(TestTimeNow);
UT_TEST(TestSelectorTimeoutEvent);
UT_TEST(TestSelectorTimeoutWithNoSocketHandler);
UT_TEST(TestSelectorStopRunning);
UT_TEST(TestSelectorWithIndefiniteTimeout);
}
UT_ENTRY_POINT(RunTests);