(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.80 and 1.120

version 1.80, 2004/11/05 15:44:05 version 1.120, 2007/06/05 09:36:36
Line 1 
Line 1 
 //%2004////////////////////////////////////////////////////////////////////////  //%2006////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
Line 6 
Line 6 
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; Symantec Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 25 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com  
 //              Amit K Arora (Bug#1153) amita@in.ibm.com  
 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090  
 //              Sushma Fernandes (sushma@hp.com) for Bug#2057  
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
   #include "Network.h"
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
   
 #include <cstring> #include <cstring>
 #include "Monitor.h" #include "Monitor.h"
 #include "MessageQueue.h" #include "MessageQueue.h"
Line 44 
Line 41 
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
   #include "ArrayIterator.h"
 #ifdef PEGASUS_OS_TYPE_WINDOWS  #include <errno.h>
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024  
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \  
 of <winsock.h>. It may have been indirectly included (e.g., by including \  
 <windows.h>). Finthe inclusion of that header which is visible to this \  
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \  
 otherwise, less than 64 clients (the default) will be able to connect to the \  
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."  
   
 # endif  
 # define FD_SETSIZE 1024  
 # include <windows.h>  
 #else  
 # include <sys/types.h>  
 # include <sys/socket.h>  
 # include <sys/time.h>  
 # include <netinet/in.h>  
 # include <netdb.h>  
 # include <arpa/inet.h>  
 #endif  
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   static AtomicInt _connections(0);
 static AtomicInt _connections = 0;  
   
 static struct timeval create_time = {0, 1};  
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
   
 ////////////////////////////////////////////////////////////////////////////////  
 //  
 // MonitorRep  
 //  
 ////////////////////////////////////////////////////////////////////////////////  
   
 struct MonitorRep  
 {  
     fd_set rd_fd_set;  
     fd_set wr_fd_set;  
     fd_set ex_fd_set;  
     fd_set active_rd_fd_set;  
     fd_set active_wr_fd_set;  
     fd_set active_ex_fd_set;  
 };  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 101 
Line 58 
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 Monitor::Monitor() Monitor::Monitor()
    : _module_handle(0),     : _stopConnections(0),
      _controller(0),  
      _async(false),  
      _stopConnections(0),  
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
      _solicitSocketCount(0),      _solicitSocketCount(0),
      _tickle_client_socket(-1),      _tickle_client_socket(-1),
Line 113 
Line 67 
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler     // setup the tickler
Line 129 
Line 82 
     }     }
 } }
  
 Monitor::Monitor(Boolean async)  Monitor::~Monitor()
    : _module_handle(0),  
      _controller(0),  
      _async(async),  
      _stopConnections(0),  
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 {  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  
     Socket::initializeInterface();  
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {     {
        _MonitorEntry entry(0, 0, 0);      uninitializeTickler();
        _entries.append(entry);      Socket::uninitializeInterface();
     }      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                     "returning from monitor destructor");
 } }
   void Monitor::uninitializeTickler()
 Monitor::~Monitor()  
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
                   "deregistering with module controller");  
  
     if(_module_handle != NULL)      try
     {     {
        _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);  
        _controller = 0;  
        delete _module_handle;  
     }  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");  
   
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
   
     try{  
         if(_tickle_peer_socket >= 0)         if(_tickle_peer_socket >= 0)
         {         {
             Socket::close(_tickle_peer_socket);             Socket::close(_tickle_peer_socket);
Line 189 
Line 110 
     }     }
     catch(...)     catch(...)
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                   "Failed to close tickle sockets");                   "Failed to close tickle sockets");
     }     }
  
     Socket::uninitializeInterface();  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "returning from monitor destructor");  
 } }
  
 void Monitor::initializeTickler(){  void Monitor::initializeTickler()
   {
     /*     /*
        NOTE: On any errors trying to        NOTE: On any errors trying to
              setup out tickle connection,              setup out tickle connection,
Line 206 
Line 125 
     */     */
  
     /* setup the tickle server/listener */     /* setup the tickle server/listener */
       // try until the tcpip is restarted
       do
       {
     // get a socket for the server side     // get a socket for the server side
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){          if ((_tickle_server_socket =
         //handle error                   Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",               PEGASUS_INVALID_SOCKET)
           {
               MessageLoaderParms parms(
                   "Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // initialize the address     // initialize the address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");     _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
 #endif  
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);          SocketLength _addr_size = sizeof(_tickle_server_addr);
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,     if((::bind(_tickle_server_socket,
                (struct sockaddr *)&_tickle_server_addr,                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                sizeof(_tickle_server_addr))) < 0){                   sizeof(_tickle_server_addr))) < 0)
         // handle error          {
         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",  #ifdef PEGASUS_OS_ZOS
                                  "Received error number $0 while binding the internal socket.",              MessageLoaderParms parms(
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  "Common.Monitor.TICKLE_BIND_LONG",
                                  errno);                  "Received error:$0 while binding the internal socket.",
                   strerror(errno));
 #else #else
                                  WSAGetLastError());              MessageLoaderParms parms(
                   "Common.Monitor.TICKLE_BIND",
                   "Received error number $0 while binding the internal socket.",
                   getSocketError());
 #endif #endif
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // tell the kernel we are a server     // tell the kernel we are a server
     if((::listen(_tickle_server_socket,3)) < 0){          if ((::listen(_tickle_server_socket, 3)) < 0)
         // handle error          {
         MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",              MessageLoaderParms parms(
                          "Received error number $0 while listening to the internal socket.",                  "Common.Monitor.TICKLE_LISTEN",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  "Received error number $0 while listening to the internal "
                                  errno);                      "socket.",
 #else                  getSocketError());
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // make sure we have the correct socket for our server     // make sure we have the correct socket for our server
     int sock = ::getsockname(_tickle_server_socket,          int sock = ::getsockname(
                              (struct sockaddr*)&_tickle_server_addr,              _tickle_server_socket,
               reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                              &_addr_size);                              &_addr_size);
     if(sock < 0){          if (sock < 0)
         // handle error          {
         MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",              MessageLoaderParms parms(
                          "Received error number $0 while getting the internal socket name.",                  "Common.Monitor.TICKLE_SOCKNAME",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  "Received error number $0 while getting the internal socket "
                                  errno);                      "name.",
 #else                  getSocketError());
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the tickle client/connector */     /* set up the tickle client/connector */
  
     // get a socket for our tickle client     // get a socket for our tickle client
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){          if ((_tickle_client_socket =
         // handle error                   Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",              PEGASUS_INVALID_SOCKET)
                          "Received error number $0 while creating the internal client socket.",          {
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              MessageLoaderParms parms(
                                  errno);                  "Common.Monitor.TICKLE_CLIENT_CREATE",
 #else                  "Received error number $0 while creating the internal client "
                                  WSAGetLastError());                      "socket.",
 #endif                  getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // setup the address of the client     // setup the address of the client
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
 #endif  
     _tickle_client_addr.sin_family = PF_INET;     _tickle_client_addr.sin_family = PF_INET;
     _tickle_client_addr.sin_port = 0;     _tickle_client_addr.sin_port = 0;
  
     // bind socket to client side     // bind socket to client side
     if((::bind(_tickle_client_socket,     if((::bind(_tickle_client_socket,
                (struct sockaddr*)&_tickle_client_addr,                   reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                sizeof(_tickle_client_addr))) < 0){                   sizeof(_tickle_client_addr))) < 0)
         // handle error          {
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",              MessageLoaderParms parms(
                          "Received error number $0 while binding the internal client socket.",                  "Common.Monitor.TICKLE_CLIENT_BIND",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  "Received error number $0 while binding the internal client "
                                  errno);                      "socket.",
 #else                  getSocketError());
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // connect to server side     // connect to server side
     if((::connect(_tickle_client_socket,     if((::connect(_tickle_client_socket,
                   (struct sockaddr*)&_tickle_server_addr,                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                   sizeof(_tickle_server_addr))) < 0){                   sizeof(_tickle_server_addr))) < 0)
         // handle error          {
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",              MessageLoaderParms parms(
                          "Received error number $0 while connecting the internal client socket.",                  "Common.Monitor.TICKLE_CLIENT_CONNECT",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  "Received error number $0 while connecting the internal "
                                  errno);                      "client socket.",
 #else                  getSocketError());
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the slave connection */     /* set up the slave connection */
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);          SocketLength peer_size = sizeof(_tickle_peer_addr);
     pegasus_sleep(1);          Threads::sleep(1);
  
     // this call may fail, we will try a max of 20 times to establish this peer connection          // this call may fail, we will try a max of 20 times to establish
           // this peer connection
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,     if((_tickle_peer_socket = ::accept(_tickle_server_socket,
                                        (struct sockaddr*)&_tickle_peer_addr,                   reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                        &peer_size)) < 0){                   &peer_size)) < 0)
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)          {
         // Only retry on non-windows platforms.              if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
         if(_tickle_peer_socket == -1 && errno == EAGAIN)                  getSocketError() == PEGASUS_NETWORK_TRYAGAIN)
         {         {
           int retries = 0;           int retries = 0;
           do           do
           {           {
             pegasus_sleep(1);                      Threads::sleep(1);
             _tickle_peer_socket = ::accept(_tickle_server_socket,                      _tickle_peer_socket = ::accept(
                                            (struct sockaddr*)&_tickle_peer_addr,                          _tickle_server_socket,
                           reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                            &peer_size);                                            &peer_size);
             retries++;             retries++;
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);                  } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
                            getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&
                            retries < 20);
               }
               // TCP/IP is down, destroy sockets and retry again.
               if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
                   getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
               {
                   // destroy everything
                   uninitializeTickler();
                   // retry again.
                   continue;
         }         }
 #endif  
     }     }
     if(_tickle_peer_socket == -1){          if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)
         // handle error          {
         MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",              MessageLoaderParms parms(
                          "Received error number $0 while accepting the internal socket connection.",                  "Common.Monitor.TICKLE_ACCEPT",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                  "Received error number $0 while accepting the internal "
                                  errno);                      "socket connection.",
 #else                  getSocketError());
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only          else
           {
               // socket is ok
               break;
           }
       } while (1); // try until TCP/IP is restarted
   
       Socket::disableBlocking(_tickle_peer_socket);
       Socket::disableBlocking(_tickle_client_socket);
   
       // add the tickler to the list of entries to be monitored and set to
       // IDLE because Monitor only
     // checks entries with IDLE state for events     // checks entries with IDLE state for events
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
   
       // is the tickler initalized as first socket on startup ?
       if (_entries.size()==0)
       {
          // if yes, append a new entry
     _entries.append(entry);     _entries.append(entry);
 } }
       else
       {
          // if not, overwrite the tickler entry with new socket
          _entries[0]=entry;
       }
   }
  
 void Monitor::tickle(void)  void Monitor::tickle()
 { {
     static char _buffer[] =     static char _buffer[] =
     {     {
Line 394 
Line 325 
     };     };
  
     AutoMutex autoMutex(_tickle_mutex);     AutoMutex autoMutex(_tickle_mutex);
     Socket::disableBlocking(_tickle_client_socket);  
     Socket::write(_tickle_client_socket,&_buffer, 2);     Socket::write(_tickle_client_socket,&_buffer, 2);
     Socket::enableBlocking(_tickle_client_socket);  
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ) void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
Line 405 
Line 334 
     _entries[index]._status = status;     _entries[index]._status = status;
 } }
  
 Boolean Monitor::run(Uint32 milliseconds)  void Monitor::run(Uint32 milliseconds)
 { {
  
     Boolean handled_events = false;  
     int i = 0;  
  
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     _entry_mut.lock(pegasus_thread_self());      AutoMutex autoEntryMutex(_entry_mut);
   
       ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries      // Check the stopConnections flag.  If set, clear the Acceptor monitor
     if (_stopConnections == 1)      // entries
       if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)          for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)                  if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||                     if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.value() == _MonitorEntry::DYING )                          entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                         entries[indx]._status = _MonitorEntry::EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       _entries[indx]._status = _MonitorEntry::DYING;                        entries[indx]._status = _MonitorEntry::DYING;
                    }                    }
                }                }
            }            }
Line 445 
Line 375 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for (int indx = 0; indx < (int)entries.size(); indx++)
     {     {
                          const _MonitorEntry &entry = _entries[indx];          const _MonitorEntry &entry = entries[indx];
        if ((entry._status.value() == _MonitorEntry::DYING) &&          if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))                                          (entry._type == Monitor::CONNECTION))
        {        {
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
Line 466 
Line 396 
  
                                         if (h._responsePending == true)                                         if (h._responsePending == true)
                                         {                                         {
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                                                                                                         "Ignoring connection delete request because "                      "Monitor::run - Ignoring connection delete request "
                                                                                                         "responses are still pending. "                          "because responses are still pending. "
                                                                                                         "connection=0x%x, socket=%d\n",                          "connection=0x%p, socket=%d\n",
                                                                                                         (int)&h, h.getSocket());                      (void *)&h, h.getSocket()));
                                                 continue;                                                 continue;
                                         }                                         }
                                         h._connectionClosePending = false;                                         h._connectionClosePending = false;
Line 484 
Line 414 
           // Once HTTPAcceptor completes processing of the close           // Once HTTPAcceptor completes processing of the close
           // connection, the lock is re-requested and processing of           // connection, the lock is re-requested and processing of
           // the for loop continues.  This is safe with the current           // the for loop continues.  This is safe with the current
           // implementation of the _entries object.  Note that the              // implementation of the entries object.  Note that the
           // loop condition accesses the _entries.size() on each              // loop condition accesses the entries.size() on each
           // iteration, so that a change in size while the mutex is           // iteration, so that a change in size while the mutex is
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();           _entry_mut.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());              _entry_mut.lock();
   
               // After enqueue a message and the autoEntryMutex has been
               // released and locked again, the array of _entries can be
               // changed. The ArrayIterator has be reset with the original
               // _entries.
               entries.reset(_entries);
        }        }
     }     }
  
Line 504 
Line 440 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     int maxSocketCurrentPass = 0;      SocketHandle maxSocketCurrentPass = 0;
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for (int indx = 0; indx < (int)entries.size(); indx++)
     {     {
        if(maxSocketCurrentPass < _entries[indx].socket)         if (maxSocketCurrentPass < entries[indx].socket)
           maxSocketCurrentPass = _entries[indx].socket;             maxSocketCurrentPass = entries[indx].socket;
  
        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)         if (entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
           _idleEntries++;           _idleEntries++;
           FD_SET(_entries[indx].socket, &fdread);             FD_SET(entries[indx].socket, &fdread);
        }        }
     }     }
  
Line 524 
Line 460 
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();     _entry_mut.unlock();
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);  
    _entry_mut.lock(pegasus_thread_self());  
  
       //
       // The first argument to select() is ignored on Windows and it is not
       // a socket value.  The original code assumed that the number of sockets
       // and a socket value have the same type.  On Windows they do not.
       //
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(events == SOCKET_ERROR)      int events = select(0, &fdread, NULL, NULL, &tv);
 #else #else
     if(events == -1)      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
       _entry_mut.lock();
   
       // After enqueue a message and the autoEntryMutex has been released and
       // locked again, the array of _entries can be changed. The ArrayIterator
       // has be reset with the original _entries
       entries.reset(_entries);
   
       if (events == PEGASUS_SOCKET_ERROR)
     {     {
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run - errorno = %d has occurred on select.", errno);              "Monitor::run - errorno = %d has occurred on select.", errno));
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the _entries structure has been corrupted or that          // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
     }     }
     else if (events)     else if (events)
     {     {
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",              "Monitor::run select event received events = %d, monitoring %d "
            events, _idleEntries);                  "idle entries",
        for( int indx = 0; indx < (int)_entries.size(); indx++)              events, _idleEntries));
        {          for (int indx = 0; indx < (int)entries.size(); indx++)
           // The Monitor should only look at entries in the table that are IDLE (i.e.,          {
           // owned by the Monitor).              // The Monitor should only look at entries in the table that are
           if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&              // IDLE (i.e., owned by the Monitor).
              (FD_ISSET(_entries[indx].socket, &fdread)))              if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                   (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);                  MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, _entries[indx].queueId, q);                      indx, entries[indx].queueId, q));
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(_entries[indx]._type == Monitor::CONNECTION)                      if (entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                      "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                              "entries[indx].type for indx = %d is "
                                   "Monitor::CONNECTION",
                               indx));
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                          // Do not update the entry just yet. The entry gets
                    // the request has been read.                          // updated once the request has been read.
                    //_entries[indx]._status = _MonitorEntry::BUSY;                          //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                          // If allocate_and_awaken failure, retry on next
                           // iteration
 /* Removed for PEP 183. /* Removed for PEP 183.
                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(                          if (!MessageQueueService::get_thread_pool()->
                            (void *)q, _dispatch))                                  allocate_and_awaken((void *)q, _dispatch))
                    {                    {
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                              PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
                           "Monitor::run: Insufficient resources to process request.");                                  Tracer::LEVEL2,
                       _entries[indx]._status = _MonitorEntry::IDLE;                                  "Monitor::run: Insufficient resources to "
                       _entry_mut.unlock();                                      "process request.");
                               entries[indx]._status = _MonitorEntry::IDLE;
                       return true;                       return true;
                    }                    }
 */ */
 // Added for PEP 183 // Added for PEP 183
                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                          HTTPConnection *dst =
                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              reinterpret_cast<HTTPConnection *>(q);
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);                              "Monitor::_dispatch: entering run() for "
                                   "indx = %d, queueId = %d, q = %p",
                               dst->_entry_index,
                               dst->_monitor->_entries[dst->_entry_index].queueId,
                               dst));
   
                    try                    try
                    {                    {
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
                    {                    {
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                         "Monitor::_dispatch: exception received");                         "Monitor::_dispatch: exception received");
                    }                    }
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                              "Monitor::_dispatch: exited run() for index %d",
                               dst->_entry_index));
                    // It is possible the entry status may not be set to busy.  
                    // The following will fail in that case.                          // It is possible the entry status may not be set to
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                          // busy.  The following will fail in that case.
                    // Once the HTTPConnection thread has set the status value to either                          // PEGASUS_ASSERT(dst->_monitor->_entries[
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection                          //     dst->_entry_index]._status.get() ==
                    // to the Monitor.  It is no longer permissible to access the connection                          //    _MonitorEntry::BUSY);
                    // or the entry in the _entries table.                          // Once the HTTPConnection thread has set the status
                           // value to either Monitor::DYING or Monitor::IDLE,
                    // The following is not relevant as the worker thread or the                          // it has returned control of the connection to the
                    // reader thread will update the status of the entry.                          // Monitor.  It is no longer permissible to access
                           // the connection or the entry in the _entries table.
   
                           // The following is not relevant as the worker thread
                           // or the reader thread will update the status of the
                           // entry.
                    //if (dst->_connectionClosePending)                    //if (dst->_connectionClosePending)
                    //{                    //{
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;                          //  dst->_monitor->_entries[dst->_entry_index]._status =
                           //    _MonitorEntry::DYING;
                    //}                    //}
                    //else                    //else
                    //{                    //{
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;                          //  dst->_monitor->_entries[dst->_entry_index]._status =
                           //    _MonitorEntry::IDLE;
                    //}                    //}
 // end Added for PEP 183 // end Added for PEP 183
                 }                 }
                 else if( _entries[indx]._type == Monitor::INTERNAL){                      else if (entries[indx]._type == Monitor::INTERNAL)
                       {
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         _entries[indx]._status == _MonitorEntry::BUSY;                          entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                          Sint32 amt =
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                              Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);  
                         _entries[indx]._status == _MonitorEntry::IDLE;                          if (amt == PEGASUS_SOCKET_ERROR &&
                               getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
                           {
                               PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                                   "Monitor::run: Tickler socket got an IO error. "
                                       "Going to re-create Socket and wait for "
                                       "TCP/IP restart.");
                               uninitializeTickler();
                               initializeTickler();
                           }
                           else
                           {
                               entries[indx]._status = _MonitorEntry::IDLE;
                           }
                 }                 }
                 else                 else
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                              "Non-connection entry, indx = %d, has been "
                                   "received.",
                               indx));
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                          Message* msg = new SocketMessage(
                    _entries[indx]._status = _MonitorEntry::BUSY;                              entries[indx].socket, events);
                           entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                    _entry_mut.unlock();
   
                    q->enqueue(msg);                    q->enqueue(msg);
                    _entries[indx]._status = _MonitorEntry::IDLE;                          _entry_mut.lock();
                    return true;  
                           // After enqueue a message and the autoEntryMutex has
                           // been released and locked again, the array of
                           // entries can be changed. The ArrayIterator has be
                           // reset with the original _entries
                           entries.reset(_entries);
                           entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
              }              }
              catch(...)              catch(...)
              {              {
              }              }
              handled_events = true;  
           }           }
        }        }
     }     }
     _entry_mut.unlock();  
     return(handled_events);  
 } }
  
 void Monitor::stopListeningForConnections(Boolean wait) void Monitor::stopListeningForConnections(Boolean wait)
Line 671 
Line 654 
       // Wait for the monitor to notice _stopConnections.  Otherwise the       // Wait for the monitor to notice _stopConnections.  Otherwise the
       // caller of this function may unbind the ports while the monitor       // caller of this function may unbind the ports while the monitor
       // is still accepting connections on them.       // is still accepting connections on them.
       try        _stopConnectionsSem.wait();
         {  
           _stopConnectionsSem.time_wait(10000);  
         }  
       catch (TimeOut &)  
         {  
           // The monitor is probably busy processng a very long request, and is  
           // not accepting connections.  Let the caller unbind the ports.  
         }  
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 687 
Line 662 
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     Sint32 socket,      SocketHandle socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)     int type)
Line 699 
Line 674 
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
    if(_solicitSocketCount >= (size-1)){      if ((int)_solicitSocketCount >= (size-1))
         for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){      {
           for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
           {
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries.append(entry);                 _entries.append(entry);
         }         }
Line 711 
Line 688 
    {    {
       try       try
       {       {
          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)              if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
Line 725 
Line 702 
       {       {
       }       }
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful      // decrease the count, if we are here we didn't do anything meaningful
       _solicitSocketCount--;
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
    return -1;    return -1;
   
 } }
  
 void Monitor::unsolicitSocketMessages(Sint32 socket)  void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
  
     /*     /*
         Start at index = 1 because _entries[0] is the tickle entry which never needs          Start at index = 1 because _entries[0] is the tickle entry which
         to be EMPTY;          never needs to be EMPTY;
     */     */
     int index;      unsigned int index;
     for(index = 1; index < _entries.size(); index++)     for(index = 1; index < _entries.size(); index++)
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;           _entries[index]._status = _MonitorEntry::EMPTY;
           _entries[index].socket = -1;              _entries[index].socket = PEGASUS_INVALID_SOCKET;
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 755 
Line 731 
  
     /*     /*
         Dynamic Contraction:         Dynamic Contraction:
         To remove excess entries we will start from the end of the _entries array          To remove excess entries we will start from the end of the _entries
         and remove all entries with EMPTY status until we find the first NON EMPTY.          array and remove all entries with EMPTY status until we find the
         This prevents the positions, of the NON EMPTY entries, from being changed.          first NON EMPTY.  This prevents the positions, of the NON EMPTY
           entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status == _MonitorEntry::EMPTY){      while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
       {
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
 // Note: this is no longer called with PEP 183. // Note: this is no longer called with PEP 183.
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)  ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
 { {
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
         dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);              "q = %p",
           dst->_entry_index,
           dst->_monitor->_entries[dst->_entry_index].queueId,
           dst));
   
    try    try
    {    {
       dst->run(1);       dst->run(1);
    }    }
    catch (...)    catch (...)
    {    {
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exception received");           "Monitor::_dispatch: exception received");
    }    }
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);      PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
           _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection      // Monitor::DYING or Monitor::IDLE, it has returned control of the
    // to the Monitor.  It is no longer permissible to access the connection      // connection to the Monitor.  It is no longer permissible to access the
    // or the entry in the _entries table.      // connection or the entry in the _entries table.
    if (dst->_connectionClosePending)    if (dst->_connectionClosePending)
    {    {
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;          dst->_monitor->_entries[dst->_entry_index]._status =
               _MonitorEntry::DYING;
    }    }
    else    else
    {    {
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;          dst->_monitor->_entries[dst->_entry_index]._status =
               _MonitorEntry::IDLE;
    }    }
    return 0;    return 0;
 } }
  
   
   
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
   
   
   
   
   
 m2e_rep::m2e_rep(void)  
   :Base(), state(IDLE)  
   
 {  
 }  
   
 m2e_rep::m2e_rep(monitor_2_entry_type _type,  
                  pegasus_socket _sock,  
                  void* _accept,  
                  void* _dispatch)  
   : Base(), type(_type), state(IDLE), psock(_sock),  
     accept_parm(_accept), dispatch_parm(_dispatch)  
 {  
   
 }  
   
 m2e_rep::~m2e_rep(void)  
 {  
 }  
   
 m2e_rep::m2e_rep(const m2e_rep& r)  
   : Base()  
 {  
   if(this != &r){  
     type = r.type;  
     psock = r.psock;  
     accept_parm = r.accept_parm;  
     dispatch_parm = r.dispatch_parm;  
     state = IDLE;  
   
   }  
 }  
   
   
 m2e_rep& m2e_rep::operator =(const m2e_rep& r)  
 {  
   if(this != &r) {  
     type = r.type;  
     psock = r.psock;  
     accept_parm = r.accept_parm;  
     dispatch_parm = r.dispatch_parm;  
     state = IDLE;  
   }  
   return *this;  
 }  
   
 Boolean m2e_rep::operator ==(const m2e_rep& r)  
 {  
   if(this == &r)  
     return true;  
   return false;  
 }  
   
 Boolean m2e_rep::operator ==(void* r)  
 {  
   if((void*)this == r)  
     return true;  
   return false;  
 }  
   
 m2e_rep::operator pegasus_socket() const  
 {  
   return psock;  
 }  
   
   
 monitor_2_entry::monitor_2_entry(void)  
 {  
   _rep = new m2e_rep();  
 }  
   
 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,  
                                  monitor_2_entry_type _type,  
                                  void* _accept_parm, void* _dispatch_parm)  
 {  
   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);  
 }  
   
 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)  
 {  
   if(this != &e){  
     Inc(this->_rep = e._rep);  
   }  
 }  
   
 monitor_2_entry::~monitor_2_entry(void)  
 {  
   
   Dec(_rep);  
 }  
   
 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)  
 {  
   if(this != &e){  
     Dec(_rep);  
     Inc(this->_rep = e._rep);  
   }  
   return *this;  
 }  
   
 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const  
 {  
   if(this == &me)  
     return true;  
   return false;  
 }  
   
 Boolean monitor_2_entry::operator ==(void* k) const  
 {  
   if((void *)this == k)  
     return true;  
   return false;  
 }  
   
   
 monitor_2_entry_type monitor_2_entry::get_type(void) const  
 {  
   return _rep->type;  
 }  
   
 void monitor_2_entry::set_type(monitor_2_entry_type t)  
 {  
   _rep->type = t;  
 }  
   
   
 monitor_2_entry_state  monitor_2_entry::get_state(void) const  
 {  
   return (monitor_2_entry_state) _rep->state.value();  
 }  
   
 void monitor_2_entry::set_state(monitor_2_entry_state t)  
 {  
   _rep->state = t;  
 }  
   
 void* monitor_2_entry::get_accept(void) const  
 {  
   return _rep->accept_parm;  
 }  
   
 void monitor_2_entry::set_accept(void* a)  
 {  
   _rep->accept_parm = a;  
 }  
   
   
 void* monitor_2_entry::get_dispatch(void) const  
 {  
   return _rep->dispatch_parm;  
 }  
   
 void monitor_2_entry::set_dispatch(void* a)  
 {  
   _rep->dispatch_parm = a;  
 }  
   
 pegasus_socket monitor_2_entry::get_sock(void) const  
 {  
   return _rep->psock;  
 }  
   
   
 void monitor_2_entry::set_sock(pegasus_socket& s)  
 {  
   _rep->psock = s;  
   
 }  
   
 //static monitor_2* _m2_instance;  
   
 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);  
   
 monitor_2::monitor_2(void)  
   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),  
     _ready(true, 0), _die(0), _requestCount(0)  
 {  
   try {  
   
     bsd_socket_factory _factory;  
   
     // set up the listener/acceptor  
     pegasus_socket temp = pegasus_socket(&_factory);  
   
     temp.socket(PF_INET, SOCK_STREAM, 0);  
     // initialize the address  
     memset(&_tickle_addr, 0, sizeof(_tickle_addr));  
 #ifdef PEGASUS_OS_ZOS  
     _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
 #endif  
     _tickle_addr.sin_family = PF_INET;  
     _tickle_addr.sin_port = 0;  
   
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);  
   
     temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));  
     temp.listen(3);  
     temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);  
   
     // set up the connector  
   
     pegasus_socket tickler = pegasus_socket(&_factory);  
     tickler.socket(PF_INET, SOCK_STREAM, 0);  
     struct sockaddr_in _addr;  
     memset(&_addr, 0, sizeof(_addr));  
 #ifdef PEGASUS_OS_ZOS  
     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #endif  
     _addr.sin_family = PF_INET;  
     _addr.sin_port = 0;  
     tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));  
     tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));  
   
     _tickler.set_sock(tickler);  
     _tickler.set_type(INTERNAL);  
     _tickler.set_state(BUSY);  
   
     struct sockaddr_in peer;  
     memset(&peer, 0, sizeof(peer));  
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);  
   
     pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);  
   
     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);  
   
 // No need to set _tickle's state as BUSY, since monitor_2::run() now  
 // does a select only on sockets which are in IDLE (default) state.  
 //  _tickle->set_state(BUSY);  
   
     _listeners.insert_first(_tickle);  
   
   }  
   catch(...){  }  
 }  
   
 monitor_2::~monitor_2(void)  
 {  
   
    stop();  
   
   try {  
     monitor_2_entry* temp = _listeners.remove_first();  
     while(temp){  
       delete temp;  
       temp = _listeners.remove_first();  
     }  
   }  
   
   catch(...){  }  
   
   
   try  
   {  
      HTTPConnection2* temp = _connections.remove_first();  
      while(temp)  
      {  
         delete temp;  
         temp = _connections.remove_first();  
      }  
   }  
   catch(...)  
   {  
   }  
   
   
 }  
   
   
 void monitor_2::run(void)  
 {  
   monitor_2_entry* temp;  
   int _nonIdle=0, _idleCount=0, events;  
   
   while(_die.value() == 0) {  
     _nonIdle=_idleCount=0;  
   
      struct timeval tv_idle = { 60, 0 };  
   
     // place all sockets in the select set  
     FD_ZERO(&rd_fd_set);  
     try {  
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "monitor_2::run:Creating New FD list for SELECT.");  
       while(temp != 0 ){  
         if(temp->get_state() == CLOSED ) {  
           monitor_2_entry* closed = temp;  
       temp = _listeners.next(closed);  
           _listeners.remove_no_lock(closed);  
   
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());  
   
           HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));  
           delete cn;  
           delete closed;  
         }  
         if(temp == 0)  
            break;  
   
   
         //Count the number if IDLE sockets  
         if(temp->get_state() != IDLE ) _nonIdle++;  
          else _idleCount++;  
   
         Sint32 fd = (Sint32) temp->get_sock();  
   
         //Select should be called ONLY on the FDs which are in IDLE state  
         if((fd >= 0) && (temp->get_state() == IDLE))  
         {  
           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
            "monitor_2::run:Adding FD %d to the list for SELECT.",fd);  
           FD_SET(fd , &rd_fd_set);  
         }  
            temp = _listeners.next(temp);  
       }  
       _listeners.unlock();  
     }  
     catch(...){  
       return;  
     }  
   
     // important -  the dispatch routine has pointers to all the  
     // entries that are readable. These entries can be changed but  
     // the pointer must not be tampered with.  
     if(_connections.count() )  
        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);  
     else  
        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);  
   
     if(_die.value())  
     {  
        break;  
     }  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if(events == SOCKET_ERROR)  
 #else  
     if(events == -1)  
 #endif  
     {  
        Tracer::trace(TRC_HTTP, Tracer::LEVEL2,  
           "monitor_2:run:INVALID FD. errorno = %d on select.", errno);  
        // The EBADF error indicates that one or more or the file  
        // descriptions was not valid. This could indicate that  
        // the _entries structure has been corrupted or that  
        // we have a synchronization error.  
   
      // Keeping the line below commented for time being.  
      //  PEGASUS_ASSERT(errno != EBADF);  
     }  
     else if (events)  
     {  
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);  
   
   
     try {  
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       while(temp != 0 ){  
           Sint32 fd = (Sint32) temp->get_sock();  
           if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {  
           if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);  
           FD_CLR(fd,  &rd_fd_set);  
           monitor_2_entry* ready = new monitor_2_entry(*temp);  
           try  
           {  
              _ready.insert_first(ready);  
           }  
           catch(...)  
           {  
           }  
   
           _requestCount++;  
         }  
         temp = _listeners.next(temp);  
       }  
       _listeners.unlock();  
     }  
     catch(...){  
       return;  
     }  
     // now handle the sockets that are ready to read  
     if(_ready.count())  
        _dispatch();  
     else  
     {  
        if(_connections.count() == 0 )  
           _idle_dispatch(_idle_parm);  
     }  
    }  // if events  
   } // while alive  
   _die=0;  
   
 }  
   
 int  monitor_2::solicitSocketMessages(  
     Sint32 socket,  
     Uint32 events,  
     Uint32 queueId,  
     int type)  
 {  
   
    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");  
   
    AutoMutex autoMut(_entry_mut);  
   
    for(int index = 0; index < (int)_entries.size(); index++)  
    {  
       try  
       {  
          if(_entries[index]._status.value() == monitor_2_entry::EMPTY)  
          {  
             _entries[index].socket = socket;  
             //_entries[index].queueId  = queueId;  
             //_entries[index]._type = type;  
             _entries[index]._status = IDLE;  
   
             return index;  
          }  
       }  
       catch(...)  
       {  
       }  
   
    }  
    PEG_METHOD_EXIT();  
    return -1;  
 }  
   
   
 void monitor_2::unsolicitSocketMessages(Sint32 socket)  
 {  
   
     PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");  
     AutoMutex autoMut(_entry2_mut);  
   
     for(int index = 0; index < (int)_entries2.size(); index++)  
     {  
        if(_entries2[index].socket == socket)  
        {  
           _entries2[index]._status = monitor_2_entry::EMPTY;  
           _entries2[index].socket = -1;  
           break;  
        }  
     }  
     PEG_METHOD_EXIT();  
 }  
   
 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))  
 {  
   void* old = (void *)_session_dispatch;  
   _session_dispatch = dp;  
   return old;  
 }  
   
 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))  
 {  
   void* old = (void*)_accept_dispatch;  
   _accept_dispatch = dp;  
   return old;  
 }  
   
 void* monitor_2::set_idle_dispatch(void (*dp)(void*))  
 {  
    void* old = (void*)_idle_dispatch;  
    _idle_dispatch = dp;  
    return old;  
 }  
   
 void* monitor_2::set_idle_parm(void* parm)  
 {  
    void* old = _idle_parm;  
    _idle_parm = parm;  
    return old;  
 }  
   
   
   
 //-----------------------------------------------------------------  
 // Note on deleting the monitor_2_entry nodes:  
 //  Each case: in the switch statement needs to handle the deletion  
 //  of the monitor_2_entry * node differently. A SESSION dispatch  
 //  routine MUST DELETE the entry during its dispatch handling.  
 //  All other dispatch routines MUST NOT delete the entry during the  
 //  dispatch handling, but must allow monitor_2::_dispatch to delete  
 //   the entry.  
 //  
 //  The reason is pretty obscure and it is debatable whether or not  
 //  to even bother, but during cimserver shutdown the single monitor_2_entry*  
 //  will leak unless the _session_dispatch routine takes care of deleting it.  
 //  
 //  The reason is that a shutdown messages completely stops everything and  
 //  the _session_dispatch routine never returns. So monitor_2::_dispatch is  
 //  never able to do its own cleanup.  
 //  
 // << Mon Oct 13 09:33:33 2003 mdd >>  
 //-----------------------------------------------------------------  
   
 void monitor_2::_dispatch(void)  
 {  
    monitor_2_entry* entry;  
   
    try  
    {  
   
          entry = _ready.remove_first();  
    }  
    catch(...)  
    {  
    }  
   
   while(entry != 0 ) {  
     switch(entry->get_type()) {  
     case INTERNAL:  
       static char buffer[2];  
       entry->get_sock().disableBlocking();  
       entry->get_sock().read(&buffer, 2);  
       entry->get_sock().enableBlocking();  
       entry->set_state(IDLE);   // Set state of the socket to IDLE so that  
                                 // monitor_2::run can add to the list of FDs  
                                 // on which select would be called.  
   
   
   
       delete entry;  
   
       break;  
     case LISTEN:  
       {  
         static struct sockaddr peer;  
         static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);  
         entry->get_sock().disableBlocking();  
         pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);  
         entry->set_state(IDLE);  // Set state of the LISTEN socket to IDLE  
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if((Sint32)connected  == SOCKET_ERROR)  
 #else  
         if((Sint32)connected == -1 )  
 #endif  
         {  
            delete entry;  
            break;  
         }  
   
         entry->get_sock().enableBlocking();  
         monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());  
         if(temp && _accept_dispatch != 0)  
            _accept_dispatch(temp);  
         delete entry;  
   
       }  
       break;  
     case SESSION:  
     case CLIENTSESSION:  
        if(_session_dispatch != 0 )  
        {  
           // NOTE: _session_dispatch will delete entry - do not do it here  
              unsigned client=0;  
          if(entry->get_type() == CLIENTSESSION) client = 1;  
          Sint32 sock=(Sint32)(entry->get_sock());  
   
              _session_dispatch(entry);  
   
          if(client)  
          {  
            HTTPConnection2 *cn = monitor_2::remove_connection(sock);  
                if(cn) delete cn;  
            // stop();  
            _die=1;  
          }  
        }  
   
       else {  
         static char buffer[4096];  
         int bytes = entry->get_sock().read(&buffer, 4096);  
         delete entry;  
       }  
   
       break;  
     case UNTYPED:  
     default:  
            delete entry;  
       break;  
     }  
     _requestCount--;  
   
     if(_ready.count() == 0 )  
        break;  
   
     try  
     {  
        entry = _ready.remove_first();  
     }  
     catch(...)  
     {  
     }  
   
   }  
 }  
   
 void monitor_2::stop(void)  
 {  
   _die = 1;  
   tickle();  
   // shut down the listener list, free the list nodes  
   _tickler.get_sock().close();  
   _listeners.shutdown_queue();  
 }  
   
 void monitor_2::tickle(void)  
 {  
   static char _buffer[] =  
     {  
       '0','0'  
     };  
   
   _tickler.get_sock().disableBlocking();  
   
   _tickler.get_sock().write(&_buffer, 2);  
   _tickler.get_sock().enableBlocking();  
   
 }  
   
   
 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps,  
                                        monitor_2_entry_type type,  
                                        void* accept_parm,  
                                        void* dispatch_parm)  
 {  
   Sint32 fd1,fd2;  
   
   fd2=(Sint32) ps;  
   
   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);  
   
 // The purpose of the following piece of code is to avoid duplicate entries in  
 // the _listeners list. Would it be too much of an overhead ?  
 try {  
   
      monitor_2_entry* temp;  
   
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       while(temp != 0 )  
       {  
         fd1=(Sint32) temp->get_sock();  
   
         if(fd1 == fd2)  
         {  
   
            Tracer::trace(TRC_HTTP, Tracer::LEVEL3,  
           "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);  
             if(temp->get_state() == CLOSED)  
             {  
               temp->set_state(IDLE);  
               Tracer::trace(TRC_HTTP, Tracer::LEVEL3,  
               "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);  
              }  
              _listeners.unlock();  
             delete m2e;  
             return 0;  
         }  
        temp = _listeners.next(temp);  
       }  
    }  
    catch(...)  
    {  
       delete m2e;  
       return 0;  
    }  
   
   
   _listeners.unlock();  
   
   
   try{  
     _listeners.insert_first(m2e);  
   }  
   catch(...){  
     delete m2e;  
     return 0;  
   }  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
       "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);  
   tickle();  
   return m2e;  
 }  
   
 Boolean monitor_2::remove_entry(Sint32 s)  
 {  
   monitor_2_entry* temp;  
   try {  
     _listeners.try_lock(pegasus_thread_self());  
     temp = _listeners.next(0);  
     while(temp != 0){  
       if(s == (Sint32)temp->_rep->psock ){  
         temp = _listeners.remove_no_lock(temp);  
         delete temp;  
         _listeners.unlock();  
         return true;  
       }  
       temp = _listeners.next(temp);  
     }  
     _listeners.unlock();  
   }  
   catch(...){  
   }  
   return false;  
 }  
   
 Uint32 monitor_2::getOutstandingRequestCount(void)  
 {  
   return _requestCount.value();  
   
 }  
   
   
 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)  
 {  
   
    HTTPConnection2* temp;  
    try  
    {  
       monitor_2::_connections.lock(pegasus_thread_self());  
       temp = monitor_2::_connections.next(0);  
       while(temp != 0 )  
       {  
          if(sock == temp->getSocket())  
          {  
             temp = monitor_2::_connections.remove_no_lock(temp);  
             monitor_2::_connections.unlock();  
             return temp;  
          }  
          temp = monitor_2::_connections.next(temp);  
       }  
       monitor_2::_connections.unlock();  
    }  
    catch(...)  
    {  
    }  
    return 0;  
 }  
   
 Boolean monitor_2::insert_connection(HTTPConnection2* connection)  
 {  
    try  
    {  
       monitor_2::_connections.insert_first(connection);  
    }  
    catch(...)  
    {  
       return false;  
    }  
    return true;  
 }  
   
   
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.80  
changed lines
  Added in v.1.120

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2