(file) Return to multiplex.cpp CVS log (file) (dir) Up to [OMI] / omi / sock / multiplex

File: [OMI] / omi / sock / multiplex / multiplex.cpp (download)
Revision: 1.1, Mon Apr 20 17:20:35 2015 UTC (9 years, 2 months ago) by krisbash
Branch: MAIN
CVS Tags: OMI_1_0_8_2, OMI_1_0_8_1, HEAD
OMI 1.0.8-1

#include <vector>
#include <cstdlib>
#include <iostream>
#include <ut/ut.h>
#include <sock/addr.h>
#include <pal/sleep.h>
#include <pal/thread.h>
#include <base/result.h>
#include <sock/sock.h>
#include <sock/selector.h>

#define MAX_MESSAGES 10000
#define NUM_SESSIONS 10
#define TOTAL_MESSAGES (MAX_MESSAGES * NUM_SESSIONS)
#define MAX_QUEUE_SIZE 10
#define PORT 8000

//#define DUMP

// #define CHECK_DATA

using namespace std;

//==============================================================================
//
// Err()
//
//==============================================================================

static void Err(const char* msg)
{
    fprintf(stderr, "error: %s\n", msg);
    exit(1);
}

//==============================================================================
//
// Header:
// Body:
//
//==============================================================================

struct Header
{
    unsigned int word1;
    unsigned int word2;
    unsigned int word3;
    unsigned int word4;
};

struct Body
{
    unsigned char data[16*1024];
};

struct Msg
{
    Header header;
    Body body;
};

//==============================================================================
//
// Client:
//
//==============================================================================

struct ClientHandler : public Handler
{
    bool connected;
    size_t messagesSent;
    size_t offset;
    Msg msg;

    static unsigned long totalMessagesSent;

    ClientHandler()
    {
        connected = false;
        messagesSent = 0;
        offset = 0;

        msg.header.word1 = 0x11111111;
        msg.header.word2 = 0x22222222;
        msg.header.word3 = 0x33333333;
        msg.header.word4 = 0x44444444;
        memset(&msg.body.data, 0xBB, sizeof(msg.body.data));
    }
};

unsigned long ClientHandler::totalMessagesSent = 0;

BEGIN_EXTERNC
static MI_Boolean ClientCallback(
    Selector* sel,
    Handler* handler_,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    ClientHandler* handler = (ClientHandler*)handler_;
    MI_UNUSED(sel);

    if (mask & SELECTOR_WRITE)
    {
        for (;;)
        {
            if (!handler->connected)
            {
                handler->mask = SELECTOR_READ | SELECTOR_WRITE;
                handler->connected = true;
            }

            if (handler->messagesSent != MAX_MESSAGES)
            {
                if (handler->offset == sizeof(Msg))
                    handler->offset = 0;

                size_t n = 0;

                MI_Result r = Sock_Write(
                    handler->sock, 
                    (char*)&handler->msg + handler->offset,
                    sizeof(Msg) - handler->offset,
                    &n);

                if (r == MI_RESULT_WOULD_BLOCK)
                    break;

                if (r != MI_RESULT_OK)
                    Err("Sock_Write() failed");

                handler->offset += n;

                if (handler->offset == sizeof(Msg))
                {
                    handler->messagesSent++;
                    ClientHandler::totalMessagesSent++;

                    if (ClientHandler::totalMessagesSent == TOTAL_MESSAGES)
                    {
                        printf("Wrote %lu messages\n",
                            ClientHandler::totalMessagesSent);
                        exit(0);
                    }
                }
            }
            else
            {
                // Quit watching for write events.
                handler->mask = SELECTOR_READ;
                break;
            }
        }
    }

    if (mask & SELECTOR_READ)
    {
        // Should be unreachable since serer sends no data:
        Err("Unexpected!");
    }

    if (mask & SELECTOR_REMOVE)
    {
        Sock_Close(handler->sock);
        delete handler;
    }
    else if (mask & SELECTOR_DESTROY)
    {
        Sock_Close(handler->sock);
        delete handler;
    }

    return MI_TRUE;
}
END_EXTERNC

void RunClient(const char* host)
{
    Sock_Start();

    Selector sel;
    Selector_Init(&sel);
    
    for (unsigned short i = 0; i < NUM_SESSIONS; i++)
    {
        Addr addr;
        Sock sock;

        if (Addr_Init(&addr, host, PORT) != MI_RESULT_OK)
            Err("Addr_Init() failed");

        if (Sock_Create(&sock) != MI_RESULT_OK)
            Err("Sock_Create() failed");

        if (Sock_SetBlocking(sock, MI_FALSE) != MI_RESULT_OK)
            Err("Sock_SetBlocking() failed");

        MI_Result r = Sock_Connect(sock, &addr);

        if (r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK)
            Err("Sock_Connect() failed");

        ClientHandler* h = new ClientHandler;
        h->sock = sock;
        h->mask = SELECTOR_WRITE;
        h->callback = ClientCallback;

        if (Selector_AddHandler(&sel, h) != MI_RESULT_OK)
            Err("Selector_AddHandler() failed");
    }

    for (;;)
    {
        MI_Result r = Selector_Run(&sel, 1000 * 10000);

        if (r != MI_RESULT_OK && r != MI_RESULT_TIME_OUT)
            Err("Selector_Run() failed");
    }

    Selector_Destroy(&sel);
    Sock_Stop();
}

//==============================================================================
//
// Server:
//
//==============================================================================

struct ServerHandler : public Handler
{
    size_t offset;
    bool readHeader;
    static unsigned long totalMessagesRecv;

    ServerHandler()
    {
        offset = 0;
        readHeader = true;
    }
};

unsigned long ServerHandler::totalMessagesRecv;

BEGIN_EXTERNC
static MI_Boolean ServerCallback(
    Selector* sel,
    Handler* handler_,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    ServerHandler* handler = (ServerHandler*)handler_;

    // Process READ events.
    if (mask & SELECTOR_READ)
    {
        if (handler->readHeader)
        {
            Header header;
            size_t n = 0;

            MI_Result r = Sock_Read(
                handler->sock, 
                (char*)&header + handler->offset, 
                sizeof(header) - handler->offset,
                &n);

            if (r == MI_RESULT_WOULD_BLOCK)
                return MI_TRUE;

            if (r != MI_RESULT_OK)
                Err("Sock_Read() failed");

            // Did client close the connection?
            if (n == 0)
                return MI_FALSE;

            handler->offset += n;

            if (handler->offset == sizeof(Header))
            {

                if (header.word1 != 0x11111111)
                    Err("Expected 0x11111111");

                if (header.word2 != 0x22222222)
                    Err("Expected 0x22222222");

                if (header.word3 != 0x33333333)
                    Err("Expected 0x33333333");

                if (header.word4 != 0x44444444)
                    Err("Expected 0x44444444");

                handler->readHeader = false;
                handler->offset = 0;
            }

            // Check the data:

        }
        else
        {
            Body body;
            size_t n = 0;

            MI_Result r = Sock_Read(
                handler->sock, 
                (char*)&body + handler->offset, 
                sizeof(body) - handler->offset,
                &n);

            if (r == MI_RESULT_WOULD_BLOCK)
                return MI_TRUE;

            if (r != MI_RESULT_OK)
                Err("Sock_Read() failed");

            // Did client close the connection?
            if (n == 0)
                return MI_FALSE;

            handler->offset += n;

            if (handler->offset == sizeof(Body))
            {
#if defined(CHECK_DATA)
                for (size_t i = 0; i < sizeof(body.data); i++)
                {
                    if (body.data[i] != 0xBB)
                        Err("expected 0xBB");
                }
#endif

                handler->readHeader = true;
                handler->offset = 0;
                ServerHandler::totalMessagesRecv++;

                if (ServerHandler::totalMessagesRecv == TOTAL_MESSAGES)
                {
                    printf("done\n");
                }
            }
        }
    }

    // Process WRITE events.
    if (mask & SELECTOR_WRITE)
    {
        Err("Unexpected write event");
    }

    if (mask & SELECTOR_REMOVE)
    {
#if defined(DUMP)
        printf("Close connection\n");
#endif
        Sock_Close(handler->sock);
        delete handler;
    }
    else if (mask & SELECTOR_DESTROY)
    {
#if defined(DUMP)
        printf("Destory connection\n");
#endif
        Sock_Close(handler->sock);
        delete handler;
    }

    return MI_TRUE;
}
END_EXTERNC

BEGIN_EXTERNC
static MI_Boolean ListenCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    if (mask & SELECTOR_READ)
    {
        Sock sock;
        Addr addr;

        if (Sock_Accept(handler->sock, &sock, &addr) != MI_RESULT_OK)
            Err("Sock_Accept() failed");

        if (Sock_SetBlocking(sock, MI_FALSE) != MI_RESULT_OK)
            Err("Sock_SetBlocking() failed");

        // Create handler for new client.
        {
            ServerHandler* h = new ServerHandler;
            h->sock = sock;
            h->mask = SELECTOR_READ;
            h->callback = ServerCallback;

            if (Selector_AddHandler(sel, h) != MI_RESULT_OK)
                Err("Selector_AddHandler() failed");
        }
#if defined(DUMP)
        printf("Accepted connection\n");
#endif
    }

    if (mask & SELECTOR_REMOVE)
    {
        Sock_Close(handler->sock);
        PAL_Free(handler);
    }
    else if (mask & SELECTOR_DESTROY)
    {
        Sock_Close(handler->sock);
        PAL_Free(handler);
    }

    return MI_TRUE;
}
END_EXTERNC

void RunServer(const char* host)
{
    Sock_Start();

    Selector sel;
    Selector_Init(&sel);

    {
        Addr addr;
        Sock sock;

        if (Addr_Init(&addr, host, PORT) != MI_RESULT_OK)
            Err("Addr_Init() failed");

        if (Sock_CreateListener(&sock, &addr) != MI_RESULT_OK)
            Err("Sock_Create() failed");

        Handler* h = (Handler*)PAL_Calloc(1, sizeof(Handler));
        h->sock = sock;
        h->mask = SELECTOR_READ;
        h->callback = ListenCallback;

        if (Selector_AddHandler(&sel, h) != MI_RESULT_OK)
            Err("Selector_AddHandler() failed");
    }

    for (;;)
    {
        MI_Result r = Selector_Run(&sel, 1000 * 10000);

        if (r != MI_RESULT_OK && r != MI_RESULT_TIME_OUT)
            Err("Selector_Run() failed");
    }

    Selector_Destroy(&sel);
    Sock_Stop();
}

//==============================================================================
//
// MClient:
//
//==============================================================================

struct MClientHandler : public Handler
{
    bool connected;

    struct Session
    {
        size_t messagesSent;
        size_t offset;
        Msg msg;

        // Number of messages in the queue. If equal to MAX_QUEUE_SIZE, then
        // wait for continue message from server before writing another message.
        size_t queueSize;
    };

    Session sess[NUM_SESSIONS];
    size_t sessIndex;

    static unsigned long numAcks;
    static unsigned long totalMessagesSent;

    MClientHandler()
    {
        memset(this, 0, sizeof(*this));

        for (size_t i = 0; i < NUM_SESSIONS; i++)
        {
            Msg& msg = sess[i].msg;

            // Use word1 as the session number:
            msg.header.word1 = i;
            msg.header.word2 = 0x22222222;
            msg.header.word3 = 0x33333333;
            msg.header.word4 = 0x44444444;
            memset(&msg.body.data, 0xBB, sizeof(msg.body.data));
        }
    }

    // Select the next session that is ready to write:
    Session* GetReadySession()
    {
        for (size_t i = sessIndex; i < NUM_SESSIONS; i++)
        {
            if (sess[i].queueSize < MAX_QUEUE_SIZE &&
                sess[i].messagesSent < MAX_MESSAGES)
            {
                sessIndex = i;
                return &sess[i];
            }
        }

        for (size_t i = 0; i < sessIndex; i++)
        {
            if (sess[i].queueSize < MAX_QUEUE_SIZE &&
                sess[i].messagesSent < MAX_MESSAGES)
            {
                sessIndex = i;
                return &sess[i];
            }
        }

        // No sessions are ready to write:
        return 0;
    }
};

unsigned long MClientHandler::numAcks = 0;
unsigned long MClientHandler::totalMessagesSent = 0;

BEGIN_EXTERNC
static MI_Boolean MClientCallback(
    Selector* sel,
    Handler* handler_,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    MClientHandler* handler = (MClientHandler*)handler_;
    MI_UNUSED(sel);

    if (mask & SELECTOR_WRITE)
    {
        // We just got the connection completion:
        if (!handler->connected)
            handler->connected = true;

        // Write data for each session:
        for (;;)
        {
            MClientHandler::Session* sess = handler->GetReadySession();

            // No session is ready to write, so quit watching write events:
            if (!sess)
            {
                handler->mask = SELECTOR_READ;
                break;
            }

            handler->mask = SELECTOR_READ | SELECTOR_WRITE;

            size_t n = 0;

            MI_Result r = Sock_Write(
                handler->sock, 
                (char*)&sess->msg + sess->offset,
                sizeof(Msg) - sess->offset,
                &n);

#if defined(DUMP)
            printf("send{%u}{%u}\n", (int)handler->sessIndex, (int)n);
#endif

            if (r == MI_RESULT_WOULD_BLOCK)
                break;

            if (r != MI_RESULT_OK)
                Err("Sock_Write() failed");

            if (n == 0)
                return MI_FALSE;

            sess->offset += n;

            // If complete message was written:
            if (sess->offset == sizeof(Msg))
            {
                sess->queueSize++;
                sess->messagesSent++;
                MClientHandler::totalMessagesSent++;
                sess->offset = 0;

                if (++handler->sessIndex == NUM_SESSIONS)
                    handler->sessIndex = 0;
            }
        }
    }

    if (mask & SELECTOR_READ)
    {
        size_t n = 0;

        // Read continue message (consists of session index):

        unsigned char sessIndex = 0;

        MI_Result r = Sock_Read(
            handler->sock, 
            (char*)&sessIndex,
            sizeof(sessIndex),
            &n);

        if (r == MI_RESULT_WOULD_BLOCK)
            return MI_TRUE;

        if (r != MI_RESULT_OK)
            Err("Sock_Read() failed");

        // Did server close the connection?
        if (n == 0)
            return MI_FALSE;

        if (n != sizeof(sessIndex))
            Err("Sock_Read() failed");

#if defined(DUMP)
        printf("recv{%u}{%u}\n", (int)handler->sessIndex, (int)n);
#endif

        // Set this session's queue size back to zero so it can start sending
        // data again:
        handler->sess[sessIndex].queueSize = 0;

        // Watch for write again (may already be enabled)
        handler->mask = SELECTOR_READ | SELECTOR_WRITE;

        MClientHandler::numAcks++;

        if (MClientHandler::numAcks == TOTAL_MESSAGES / MAX_QUEUE_SIZE)
        {
            printf("Wrote %lu messages\n",
                MClientHandler::totalMessagesSent++);
            exit(0);
        }
    }

    if (mask & SELECTOR_REMOVE)
    {
        Sock_Close(handler->sock);
        delete handler;
    }
    else if (mask & SELECTOR_DESTROY)
    {
        Sock_Close(handler->sock);
        delete handler;
    }

    return MI_TRUE;
}
END_EXTERNC

void RunMClient(const char* host)
{
    Sock_Start();

    Selector sel;
    Selector_Init(&sel);
    
    // Connect to server:
    {
        Addr addr;
        Sock sock;

        if (Addr_Init(&addr, host, PORT) != MI_RESULT_OK)
            Err("Addr_Init() failed");

        if (Sock_Create(&sock) != MI_RESULT_OK)
            Err("Sock_Create() failed");

        if (Sock_SetBlocking(sock, MI_FALSE) != MI_RESULT_OK)
            Err("Sock_SetBlocking() failed");

        MI_Result r = Sock_Connect(sock, &addr);

        if (r != MI_RESULT_OK && r != MI_RESULT_WOULD_BLOCK)
            Err("Sock_Connect() failed");

        MClientHandler* h = new MClientHandler;
        h->sock = sock;
        h->mask = SELECTOR_WRITE;
        h->callback = MClientCallback;

        if (Selector_AddHandler(&sel, h) != MI_RESULT_OK)
            Err("Selector_AddHandler() failed");
    }

    for (;;)
    {
        MI_Result r = Selector_Run(&sel, 1000 * 10000);

        if (r != MI_RESULT_OK && r != MI_RESULT_TIME_OUT)
            Err("Selector_Run() failed");
    }

    Selector_Destroy(&sel);
    Sock_Stop();
}

//==============================================================================
//
// MServer:
//
//==============================================================================

struct MServerHandler : public Handler
{
    bool connected;

    struct Session
    {
        size_t queueSize;
    };

    Session sess[NUM_SESSIONS];
    Msg msg;
    size_t offset;
    bool readHeader;

    static unsigned long totalMessagesRecv;

    MServerHandler()
    {
        memset(this, 0, sizeof(*this));
        readHeader = true;
    }
};

unsigned long MServerHandler::totalMessagesRecv;

BEGIN_EXTERNC
static MI_Boolean MServerCallback(
    Selector* sel,
    Handler* handler_,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    MServerHandler* handler = (MServerHandler*)handler_;

    // Process READ events.
    if (mask & SELECTOR_READ)
    {
        if (handler->readHeader)
        {
            Header& header = handler->msg.header;

            size_t n = 0;

            MI_Result r = Sock_Read(
                handler->sock, 
                (char*)&header + handler->offset, 
                sizeof(header) - handler->offset,
                &n);

            if (r == MI_RESULT_WOULD_BLOCK)
                return MI_TRUE;

            if (r != MI_RESULT_OK)
                Err("Sock_Read() failed");

            // Did client close the connection?
            if (n == 0)
                return MI_FALSE;

            handler->offset += n;

#if defined(DUMP)
            printf("recv{%u}\n", (int)n);
#endif

            if (handler->offset == sizeof(Header))
            {
                size_t sessIndex = (size_t)header.word1;

                if (sessIndex < 0 || sessIndex >= NUM_SESSIONS)
                    Err("Expected session in range");

                if (header.word2 != 0x22222222)
                    Err("Expected 0x22222222");

                if (header.word3 != 0x33333333)
                    Err("Expected 0x33333333");

                if (header.word4 != 0x44444444)
                    Err("Expected 0x44444444");

                handler->readHeader = false;
                handler->offset = 0;
            }
        }
        else
        {
            Body& body = handler->msg.body;

            size_t n = 0;

            MI_Result r = Sock_Read(
                handler->sock, 
                (char*)&body + handler->offset, 
                sizeof(body) - handler->offset,
                &n);

            if (r == MI_RESULT_WOULD_BLOCK)
                return MI_TRUE;

            if (r != MI_RESULT_OK)
                Err("Sock_Read() failed");

            // Did client close the connection?
            if (n == 0)
                return MI_FALSE;

            handler->offset += n;

            if (handler->offset == sizeof(Body))
            {
#if defined(CHECK_DATA)
                for (size_t i = 0; i < sizeof(body.data); i++)
                {
                    if (body.data[i] != 0xBB)
                        Err("expected 0xBB");
                }
#endif

                size_t sessIndex = handler->msg.header.word1;

#if defined(DUMP)
                printf("recv{%u}{%u}\n", (int)sessIndex, sizeof(Body));
#endif

                if (sessIndex < 0 || sessIndex >= NUM_SESSIONS)
                    Err("sessIndex out of range");

                assert(handler->sess[sessIndex].queueSize < MAX_QUEUE_SIZE);

                if (++handler->sess[sessIndex].queueSize == MAX_QUEUE_SIZE)
                {
                    // Need to write a 'continue' message:
                    handler->mask = SELECTOR_WRITE;
                }

                handler->readHeader = true;
                handler->offset = 0;
                MServerHandler::totalMessagesRecv++;

                if (MServerHandler::totalMessagesRecv == TOTAL_MESSAGES)
                {
                    printf("done\n");
                }
            }
        }
    }

    // Process WRITE events.
    if (mask & SELECTOR_WRITE)
    {
        for (size_t i = 0; i < NUM_SESSIONS; i++)
        {
            if (handler->sess[i].queueSize == MAX_QUEUE_SIZE)
            {
                unsigned char sessIndex = (unsigned char)i;

                size_t n = 0;

                MI_Result r = Sock_Write(
                    handler->sock, 
                    (char*)&sessIndex,
                    sizeof(sessIndex),
                    &n);

                if (r == MI_RESULT_WOULD_BLOCK)
                    return MI_TRUE;

                if (r != MI_RESULT_OK)
                {
                    return MI_FALSE;
                }

                if (n == 0)
                    return MI_FALSE;

#if defined(DUMP)
                printf("send{%u}{%u}\n", (int)sessIndex, (int)n);
#endif

                handler->sess[i].queueSize = 0;
                handler->mask = SELECTOR_WRITE | SELECTOR_READ;
            }
        }

        handler->mask = SELECTOR_READ;
    }

    if (mask & SELECTOR_REMOVE)
    {
#if defined(DUMP)
        printf("Close connection\n");
#endif
        Sock_Close(handler->sock);
        delete handler;
    }
    else if (mask & SELECTOR_DESTROY)
    {
#if defined(DUMP)
        printf("Destory connection\n");
#endif
        Sock_Close(handler->sock);
        delete handler;
    }

    return MI_TRUE;
}
END_EXTERNC

BEGIN_EXTERNC
static MI_Boolean MListenCallback(
    Selector* sel,
    Handler* handler,
    MI_Uint32 mask, 
    MI_Uint64 currentTimeUsec)
{
    if (mask & SELECTOR_READ)
    {
        Sock sock;
        Addr addr;

        if (Sock_Accept(handler->sock, &sock, &addr) != MI_RESULT_OK)
            Err("Sock_Accept() failed");

        if (Sock_SetBlocking(sock, MI_FALSE) != MI_RESULT_OK)
            Err("Sock_SetBlocking() failed");

        // Create handler for new client.
        {
            MServerHandler* h = new MServerHandler;
            h->sock = sock;
            h->mask = SELECTOR_READ;
            h->callback = MServerCallback;

            if (Selector_AddHandler(sel, h) != MI_RESULT_OK)
                Err("Selector_AddHandler() failed");
        }

#if defined(DUMP)
        printf("Accepted connection\n");
#endif
    }

    if (mask & SELECTOR_REMOVE)
    {
        Sock_Close(handler->sock);
        PAL_Free(handler);
    }
    else if (mask & SELECTOR_DESTROY)
    {
        Sock_Close(handler->sock);
        PAL_Free(handler);
    }

    return MI_TRUE;
}
END_EXTERNC

void RunMServer(const char* host)
{
    Sock_Start();

    Selector sel;
    Selector_Init(&sel);

    {
        Addr addr;
        Sock sock;

        if (Addr_Init(&addr, host, PORT) != MI_RESULT_OK)
            Err("Addr_Init() failed");

        if (Sock_CreateListener(&sock, &addr) != MI_RESULT_OK)
            Err("Sock_Create() failed");

        Handler* h = (Handler*)PAL_Calloc(1, sizeof(Handler));
        h->sock = sock;
        h->mask = SELECTOR_READ;
        h->callback = MListenCallback;

        if (Selector_AddHandler(&sel, h) != MI_RESULT_OK)
            Err("Selector_AddHandler() failed");
    }

    for (;;)
    {
        MI_Result r = Selector_Run(&sel, 1000 * 10000);

        if (r != MI_RESULT_OK && r != MI_RESULT_TIME_OUT)
            Err("Selector_Run() failed");
    }

    Selector_Destroy(&sel);
    Sock_Stop();
}

int main(int argc, char** argv)
{
    if (argc != 2)
    {
        fprintf(stderr, "Usage: %s [client|server]\n", argv[0]);
        exit(1);
    }

    if (strcmp(argv[1], "client") == 0)
    {
        RunClient("127.0.0.1");
    }
    else if (strcmp(argv[1], "mclient") == 0)
    {
        RunMClient("127.0.0.1");
    }
    else if (strcmp(argv[1], "server") == 0)
    {
        RunServer("127.0.0.1");
    }
    else if (strcmp(argv[1], "mserver") == 0)
    {
        RunMServer("127.0.0.1");
    }
    else
    {
        fprintf(stderr, "%s: invalid first argument\n", argv[0]);
        exit(1);
    }

    return 0;
}

ViewCVS 0.9.2