(file) Return to test_selector.cpp CVS log (file) (dir) Up to [OMI] / omi / protocol / tests

File: [OMI] / omi / protocol / tests / Attic / test_selector.cpp (download)
Revision: 1.1.1.1 (vendor branch), Wed May 30 21:47:49 2012 UTC (12 years, 1 month ago) by mike
Branch: TOG
CVS Tags: OMI_1_0_2_Branch, OMI_1_0_2, OMI_1_0_1_PRE, OMI_1_0_1, OMI_1_0_0
Changes since 1.1: +0 -0 lines
Initial Import

/*
**==============================================================================
**
** Open Management Infrastructure (OMI)
**
** Copyright (c) Microsoft Corporation
** 
** Licensed under the Apache License, Version 2.0 (the "License"); you may not 
** use this file except in compliance with the License. You may obtain a copy 
** of the License at 
**
**     http://www.apache.org/licenses/LICENSE-2.0 
**
** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED 
** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, 
** MERCHANTABLITY OR NON-INFRINGEMENT. 
**
** See the Apache 2 License for the specific language governing permissions 
** and limitations under the License.
**
**==============================================================================
*/

#include <vector>
#include <cstdlib>
#include <iostream>
#include <ut/ut.h>
#include <unittest/utils.h>
#include <protocol/addr.h>
#include <base/time.h>
#include <protocol/thread.h>
#include <base/result.h>
#include <protocol/sock.h>
#include <protocol/selector.h>

/* Include network headers */
#if defined(CONFIG_OS_WINDOWS)
# include <winsock2.h>
#elif defined(CONFIG_POSIX)
# include <unistd.h>
# include <errno.h>
# include <sys/socket.h>
# include <netinet/tcp.h>
# include <netinet/in.h>
# include <sys/time.h>
# include <sys/types.h>
# include <netdb.h>
# include <fcntl.h>
# include <arpa/inet.h>
#endif

#define BUFFER_SIZE 256
#define DATA_SIZE 100000

using namespace std;

static bool s_done = false;
static bool s_timeout_called = false;
static bool s_stoprunning_called = false;
// client callback data
static bool s_cln_connected = false;
static vector<char> s_cln_out;
static vector<char> s_cln_in;
static vector<char> s_data;
static Handler* s_cln_h;

// server callback
static vector<char> s_srv_data;
static Handler* s_srv_h;

#if defined(_MSC_VER)
#undef BEGIN_EXTERNC
#undef END_EXTERNC
#define BEGIN_EXTERNC
#define END_EXTERNC
#endif

static void setUp()
{
    s_timeout_called = false;
    s_stoprunning_called = false;
    s_data.clear();

    s_cln_connected = false;
    s_cln_out.clear();
    s_cln_in.clear();

    s_srv_data.clear();
    s_srv_h = 0;
    s_cln_h = 0;
}

static void cleanup()
{
}

static void TestAddr()
{
    /* sockaddr_in must fit within an Addr object */
    {
        size_t x = sizeof(struct sockaddr_in);
        size_t y = sizeof(Addr);
        UT_ASSERT(x <= y);
    }
}


static void GenRandData(vector<char>& data)
{
    data.clear();
    data.reserve(DATA_SIZE);

    for (size_t i = 0; i < DATA_SIZE; i++)
    {
        data.push_back(char(rand()));
    }
}

BEGIN_EXTERNC
static MI_Boolean ClientCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    MI_Result r;

    sel=sel;
    currentTimeUsec = currentTimeUsec;
    s_cln_h = handler;

    /* Process write event */
    if (mask & SELECTOR_WRITE)
    {
        for (;;)
        {
            if (!s_cln_connected)
            {
                // Generate random data sequence.
                GenRandData(s_data);
                s_cln_out = s_data;

                // Watch for READ and WRITE events.
                handler->mask = SELECTOR_READ|SELECTOR_WRITE;

                s_cln_connected = true;
                //break;
            }

            // Write out data to server.
            if (s_cln_out.size())
            {
                size_t n = 0;
                size_t m = 7 + (int)(rand() % 256);

                if (m > s_cln_out.size())
                    m = s_cln_out.size();

                r = Sock_Write(handler->sock, &s_cln_out[0], m, &n);

                s_cln_out.erase(s_cln_out.begin(), s_cln_out.begin() + n);
#if defined(TRACE_IO)
                printf("CLIENT.WROTE[%u]\n", n);
#endif

                // If all data written, then quit watching for write.
                if (s_cln_out.size() == 0)
                {
                    handler->mask = SELECTOR_READ;
                }

                if (r == MI_RESULT_WOULD_BLOCK)
                    break;

                UT_ASSERT(r == MI_RESULT_OK);
            }
            else
                break;
        }
    }

    /* Process read event */
    if (mask & SELECTOR_READ)
    {
        for (;;)
        {
            char buf[BUFFER_SIZE];
            size_t n = 0;

            r = Sock_Read(handler->sock, buf, sizeof(buf), &n);

            s_cln_in.insert(s_cln_in.end(), buf, buf + n);

#if defined(TRACE_IO)
            printf("CLIENT.READ[%u]\n", n);
#endif
            // All data has been written (check whether it is identical).
            if (s_cln_in.size() == s_data.size())
            {
                UT_ASSERT(s_cln_in == s_data);
                return MI_FALSE;
            }

            if (r == MI_RESULT_WOULD_BLOCK)
                break;

            UT_ASSERT(r == MI_RESULT_OK);


            break;
        }
    }

    if (mask & SELECTOR_REMOVE)
    {
        Sock_Close(handler->sock);
        free(handler);
        handler = 0;
    }

    if (mask & SELECTOR_DESTROY)
    {
        if (handler)
        {
            Sock_Close(handler->sock);
            free(handler);
        }
    }

    return MI_TRUE;
}
END_EXTERNC

BEGIN_EXTERNC
static MI_Boolean ServerCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    char buf[BUFFER_SIZE];
    MI_Result r;
    size_t n = 0;

    sel=sel;
    currentTimeUsec = currentTimeUsec;
    s_srv_h = handler;

    // Process READ events.
    if (mask & SELECTOR_READ)
    {
        for (;;)
        {
            // Read request:
            n = 0;
            r = Sock_Read(handler->sock, buf, sizeof(buf), &n);
            s_srv_data.insert(s_srv_data.end(), buf, buf + n);

#if defined(TRACE_IO)
            printf("SERVER.READ[%u]\n", n);
#endif
            if (r == MI_RESULT_WOULD_BLOCK)
                break;

            UT_ASSERT(r == MI_RESULT_OK);

            // Did client close connection?
            if (n == 0)
            {
                return MI_FALSE;
            }

            // Save incoming data.

            // Now solicit read and write events.
            handler->mask = SELECTOR_READ|SELECTOR_WRITE;
            //break;

            // ATTN: WINDOWS FEATURE!
            // if event 'write' was already set and we have nothing to write
            // new event will never arrive
            // so we need to try to write data once it's available
            mask |= SELECTOR_WRITE;
        }
    }

    // Process WRITE events.
    if (mask & SELECTOR_WRITE)
    {
        for (;;)
        {
            n = 0;
            size_t m = 7 + (int)(rand() % 256);

            if (m >= s_srv_data.size())
                m = s_srv_data.size();

            // Write response:
            r = Sock_Write(handler->sock, &s_srv_data[0], m, &n);

            s_srv_data.erase(s_srv_data.begin(), s_srv_data.begin() + n);
#if defined(TRACE_IO)
            printf("SERVER.WRITE[%u]\n", n);
#endif
            if (s_srv_data.size() == 0)
            {
                /* Watch for read events (but not write events) */
                handler->mask = SELECTOR_READ;
                break;
            }

            if (r == MI_RESULT_WOULD_BLOCK)
                break;

            UT_ASSERT(r == MI_RESULT_OK);

            //break;
        }
    }

    if (mask & SELECTOR_REMOVE)
    {
        r = Sock_Close(handler->sock);
        UT_ASSERT(r == MI_RESULT_OK);
        s_done = true;
        free(handler);
        handler = 0;
    }

    if (mask & SELECTOR_DESTROY)
    {
        if (handler)
        {
            r = Sock_Close(handler->sock);
            free(handler);
        }
    }

    return MI_TRUE;
}
END_EXTERNC

BEGIN_EXTERNC
static MI_Boolean ListenCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    MI_Result r;
    Sock s;
    Addr addr;

    currentTimeUsec = currentTimeUsec;

    if (mask & SELECTOR_READ)
    {
        r = Sock_Accept(handler->sock, &s, &addr);
        UT_ASSERT(r == MI_RESULT_OK);

        // Set to non-blocking I/O.
        r = Sock_SetBlocking(s, MI_FALSE);
        UT_ASSERT(r == MI_RESULT_OK);

        // Create handler for new client.
        {
            Handler* h;
            h = (Handler*)calloc(1, sizeof(Handler));
            UT_ASSERT(h != NULL);
            h->sock = s;
            h->mask = SELECTOR_READ;
            h->callback = ServerCallback;

            r = Selector_AddHandler(sel, h);
            UT_ASSERT(r == MI_RESULT_OK);
        }
    }

    if (mask & SELECTOR_REMOVE)
    {
        Sock_Close(handler->sock);
        free(handler);
        handler = 0;
    }

    if (mask & SELECTOR_DESTROY)
    {
        if (handler)
        {
            Sock_Close(handler->sock);
            free(handler);
        }
    }

    return MI_TRUE;
}
END_EXTERNC

BEGIN_EXTERNC
static MI_Boolean TimeoutCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    sel = sel;

    if (mask & (SELECTOR_READ | SELECTOR_WRITE))
    {
        UT_ASSERT_FAILED_MSG("TimeoutCallback: unexpected message!");
    }

    if (mask & SELECTOR_ADD)
    {
        handler->fireTimeoutAt = currentTimeUsec + 2000;
    }

    if (mask & SELECTOR_TIMEOUT)
    {
        s_timeout_called = true;
        return MI_FALSE;
    }

    if (mask & (SELECTOR_REMOVE | SELECTOR_DESTROY))
    {
        if (INVALID_SOCK != handler->sock)
            Sock_Close(handler->sock);

        free(handler);
    }

    return MI_TRUE;
}
END_EXTERNC

BEGIN_EXTERNC
static MI_Boolean _StopRunningCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{

    if (mask & SELECTOR_READ)
    {
        Sock s;
        Addr addr;

        Sock_Accept(handler->sock, &s, &addr);
        if (INVALID_SOCK != s)
            Sock_Close(s);

        Selector_StopRunning(sel);
        s_stoprunning_called = true;

        return MI_TRUE;        
    }
    if (mask & SELECTOR_WRITE)
    {
        Selector_StopRunning(sel);
        s_stoprunning_called = true;
        return MI_TRUE;        
    }

    if (mask & SELECTOR_ADD)
    {
        handler->fireTimeoutAt = currentTimeUsec + 1000;
    }

    if (mask & SELECTOR_TIMEOUT)
    {
        handler->fireTimeoutAt = 0;
        Selector_StopRunning(sel);
        s_stoprunning_called = true;
        return MI_TRUE;
    }

    if (mask & (SELECTOR_REMOVE | SELECTOR_DESTROY))
    {
        if (INVALID_SOCK != handler->sock)
            Sock_Close(handler->sock);

        free(handler);
    }

    return MI_TRUE;
}
END_EXTERNC

static unsigned short PORT = ut::getUnittestPortNumber() + 1;

static Sock CreateClientSocket()
{
    Sock sock;
    Addr addr;
    MI_Result r;

    // Initialize address (connect using loopback).
    r = Addr_Init(&addr, "127.0.0.1", PORT);
    UT_ASSERT(r == MI_RESULT_OK);

    // Create client socket.
    r = Sock_Create(&sock);
    UT_ASSERT(r == MI_RESULT_OK);
 
    // Set to non-blocking I/O.
    r = Sock_SetBlocking(sock, MI_FALSE);

    // Listen to server.
    r = Sock_Connect(sock, &addr);
    UT_ASSERT(r == MI_RESULT_OK || r == MI_RESULT_WOULD_BLOCK);

    return sock;
}

static Sock CreateListenerSock()
{
    Sock sock;
    Addr addr;
    MI_Result r;

    // Initialize listener address and port.
    Addr_InitAny(&addr, PORT);

    // Create server listener socket.
    r = Sock_CreateListener(&sock, &addr);
    UT_ASSERT(r == MI_RESULT_OK);

    return sock;
}

/*
**==============================================================================
**
** TestClientServer()
**
**     This test creates a TCP client and TCP server that runs in the same 
**     thread. The client connects to the server and sends a long message
**     using non-blocking I/O. The server echos the message back. Once the
**     entire message has been echoed back, both the client and the server
**     exit and the test is finished.
**
**==============================================================================
*/
void TestClientServer()
{
    Selector sel;
    Sock lsock;
    Sock csock;
    MI_Result r;

    // Initialize the network:
    Sock_Start();

    // Initialize the selector object.
    Selector_Init(&sel);

    // Create listener socket:
    lsock = CreateListenerSock();
    
    // Add  handler for listener socket.
    {
        Handler* h = (Handler*)calloc(1, sizeof(Handler));

        UT_ASSERT(h);

        h->sock = lsock;
        h->mask = SELECTOR_READ;
        h->callback = ListenCallback;

        // Watch for connection requests.
        r = Selector_AddHandler(&sel, h);
        UT_ASSERT(r == MI_RESULT_OK);
    }

    // Create client socket and attempt connect:
    csock = CreateClientSocket();

    // Add handler for client socket:
    {
        Handler* h = (Handler*)calloc(1, sizeof(Handler));

        UT_ASSERT(h);

        h->sock = csock;
        h->mask = SELECTOR_WRITE;
        h->callback = ClientCallback;

        // Watch for write events (indicating that connection is ready).
        r = Selector_AddHandler(&sel, h);
        UT_ASSERT(r == MI_RESULT_OK);
    }

    // Cancel this loop after N seconds.
    MI_Uint64 deadline;
    r = Time_Now(&deadline);
    UT_ASSERT(r == MI_RESULT_OK);
    deadline += 10*1000000;

    // Wait for connections.
    while (!s_done)
    {
        MI_Uint64 now;
        r = Time_Now(&now);
        UT_ASSERT(r == MI_RESULT_OK);

        if (now >= deadline)
        {
            if (s_srv_h)
                cout << "srv mask " << s_srv_h->mask << "; ";

            if (s_cln_h)
                cout << "cln mask " << s_cln_h->mask << "; ";

            cout << "out size " << s_cln_out.size() << 
                "; in size " << s_cln_in.size() <<
                "; data size " << s_data.size() <<
                "; srv data size " << s_srv_data.size() << endl;
            UT_ASSERT_FAILED_MSG("Timed out!");
            break;
        }

        r = Selector_Run(&sel, 1000*1000);
        UT_ASSERT(r == MI_RESULT_OK || r == MI_RESULT_TIME_OUT);
    }

    // Verify that client had received all data back
    UT_ASSERT( s_cln_in == s_data );

    // Destroy the selector.
    Selector_Destroy(&sel);

    // Shutdown the network.
    Sock_Stop();
}

static void TestTimeNow()
{
    MI_Uint64 t1 = 0, t2 = 0;

    UT_ASSERT(MI_RESULT_OK == Time_Now(&t1));

    // sleep 1 ms
    ut::sleep_ms( 1 );
    UT_ASSERT(MI_RESULT_OK == Time_Now(&t2));

    // expect difference to be 0.1 -  1 sec
    if ((t2-t1) <= 100 || (t2-t1) >= 1000000)
        std::cout << "warning: unexpected time diff; t1 " << t1 << "; t2 " << t2 << endl;

    // With VM-driven environment, timing becomes very tricky and un-reliable
    UT_ASSERT(t2 > t1);
}

static void TestSelectorTimeoutEvent()
{
    Selector sel;
    Sock lsock;
    MI_Result r;

    // Initialize the network:
    Sock_Start();

    // Initialize the selector object.
    Selector_Init(&sel);

    // Create listener socket:
    lsock = CreateListenerSock();
    
    // Add  handler for listener socket.
    {
        Handler* h = (Handler*)calloc(1, sizeof(Handler));

        UT_ASSERT(h);

        h->sock = lsock;
        h->mask = SELECTOR_READ;
        h->callback = TimeoutCallback;

        // Watch for connection requests.
        // 'add' will set timeout for 2 ms;
        r = Selector_AddHandler(&sel, h);
        UT_ASSERT(r == MI_RESULT_OK);
    }

    r = Selector_Run(&sel, 1000*1000);
    UT_ASSERT(s_timeout_called);
    // 'timeout' event on socekt is triggered na dremoves socket, so 
    // selector returns 'failed'
    UT_ASSERT(r == MI_RESULT_FAILED);

    // Destroy the selector.
    Selector_Destroy(&sel);

    // Shutdown the network.
    Sock_Stop();
}

static void TestSelectorTimeoutWithNoSocketHandler()
{
    Selector sel;
    MI_Result r;

    // Initialize the network:
    Sock_Start();

    // Initialize the selector object.
    Selector_Init(&sel);

    // Add  handler for listener socket.
    {
        Handler* h = (Handler*)calloc(1, sizeof(Handler));

        UT_ASSERT(h);

        h->sock = INVALID_SOCK;
        h->callback = TimeoutCallback;

        // 'add' will set timeout for 2 ms;
        r = Selector_AddHandler(&sel, h);
        UT_ASSERT(r == MI_RESULT_OK);
    }

    s_timeout_called = false;
    r = Selector_Run(&sel, 1000*1000);
    UT_ASSERT(s_timeout_called);
    // 'timeout' event on socekt is triggered na dremoves socket, so 
    // selector returns 'failed'
    UT_ASSERT(r == MI_RESULT_FAILED);

    // Destroy the selector.
    Selector_Destroy(&sel);

    // Shutdown the network.
    Sock_Stop();
}

static void TestSelectorStopRunning()
{
    Selector sel;
    MI_Result r;

    // Initialize the network:
    Sock_Start();

    // Initialize the selector object.
    Selector_Init(&sel);

    // Add  that will call 'stop-running' from handler
    {
        Handler* h = (Handler*)calloc(1, sizeof(Handler));

        UT_ASSERT(h);

        h->sock = INVALID_SOCK;
        h->callback = _StopRunningCallback;

        // 'add' will set timeout for 2 ms;
        r = Selector_AddHandler(&sel, h);
        UT_ASSERT(r == MI_RESULT_OK);
    }

    r = Selector_Run(&sel, 60 * 1000*1000);

    UT_ASSERT(s_stoprunning_called);

    // Stop-running call should return OK
    UT_ASSERT(r == MI_RESULT_OK);

    // Destroy the selector.
    Selector_Destroy(&sel);

    // Shutdown the network.
    Sock_Stop();
}


BEGIN_EXTERNC
static void* MI_CALL _ConnectorThread(void* )
{
    // Initialize the network:
    Sock_Start();

    Sock s = SockConnectLocal(PORT);

    Sock_Close(s);

    // Shutdown the network.
    Sock_Stop();
    return 0;
}
END_EXTERNC

static void TestSelectorWithIndefiniteTimeout()
{
    Selector sel;
    MI_Result r;

    // Initialize the network:
    Sock_Start();

    // Initialize the selector object.
    Selector_Init(&sel);

    // Add  that will call 'stop-running' from handler
    {
        Handler* h = (Handler*)calloc(1, sizeof(Handler));

        UT_ASSERT(h);

        h->sock =  CreateListenerSock();;
        h->mask = SELECTOR_READ;
        h->callback = _StopRunningCallback;

        // 'add' will set timeout for 2 ms;
        r = Selector_AddHandler(&sel, h);
        UT_ASSERT(r == MI_RESULT_OK);
    }

    ThreadHandle t;

    // create a connector thread
    UT_ASSERT(MI_RESULT_OK == Thread_Create(
        _ConnectorThread, 0, &t));

    r = Selector_Run(&sel, TIME_NEVER);

    UT_ASSERT(s_stoprunning_called);

    // Stop-running call should return OK
    UT_ASSERT(r == MI_RESULT_OK);

    UT_ASSERT(MI_RESULT_OK == Thread_Destroy( t, MI_TRUE));
    // Destroy the selector.
    Selector_Destroy(&sel);

    // Shutdown the network.
    Sock_Stop();

}

#if defined(_MSC_VER)
/* Strange, but for this particular file VS compiler 
    thinks that 'catch' after 'try {setup()}' is unreachable 
    (which is true) */

/* warning C4702: unreachable code */
# pragma warning(disable : 4702)

#endif /* _MSC_VER */

static void RunTests()
{
    UT_TEST(TestAddr);
    UT_TEST(TestClientServer);
    UT_TEST(TestTimeNow);
    UT_TEST(TestSelectorTimeoutEvent);
    UT_TEST(TestSelectorTimeoutWithNoSocketHandler);
    UT_TEST(TestSelectorStopRunning);
    UT_TEST(TestSelectorWithIndefiniteTimeout);
}

UT_ENTRY_POINT(RunTests);

ViewCVS 0.9.2