(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.103.10.27 and 1.130

version 1.103.10.27, 2006/10/18 04:24:42 version 1.130, 2008/02/27 20:21:17
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
   #include "Network.h"
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
   
 #include <cstring> #include <cstring>
 #include <Pegasus/Common/Monitor.h>  #include "Monitor.h"
 #include <Pegasus/Common/MessageQueue.h>  #include "MessageQueue.h"
 #include <Pegasus/Common/Socket.h>  #include "Socket.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include <Pegasus/Common/ArrayIterator.h>  #include "ArrayIterator.h"
   #include "HostAddress.h"
 //const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes  
   
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 // Maximum iterations of Pipe processing in Monitor::run  
 const Uint32 maxIterations = 3;  
   
 #endif  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024  
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \  
 of <winsock.h>. It may have been indirectly included (e.g., by including \  
 <windows.h>). Find inclusion of that header which is visible to this \  
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \  
 otherwise, less than 64 clients (the default) will be able to connect to the \  
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."  
   
 # endif  
 # define FD_SETSIZE 1024  
 # include <windows.h>  
 #else  
 # include <sys/types.h>  
 # include <sys/socket.h>  
 # include <sys/time.h>  
 # include <netinet/in.h>  
 # include <netdb.h>  
 # include <arpa/inet.h>  
 #endif  
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0);  
   
 Mutex Monitor::_cout_mut;  
   
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
  #define PIPE_INCREMENT 1  
 #endif  
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor  // Tickler
 // //
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32  Tickler::Tickler()
       : _listenSocket(PEGASUS_INVALID_SOCKET),
 Monitor::Monitor()        _clientSocket(PEGASUS_INVALID_SOCKET),
    : _stopConnections(0),        _serverSocket(PEGASUS_INVALID_SOCKET)
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG      try
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  
     Socket::initializeInterface();  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {     {
        _MonitorEntry entry(0, 0, 0);          _initialize();
        _entries.append(entry);  
     }     }
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG      catch (...)
     {     {
         AutoMutex automut(Monitor::_cout_mut);          _uninitialize();
         PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);          throw;
     }     }
 #endif  
 } }
  
 Monitor::~Monitor()  Tickler::~Tickler()
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG      _uninitialize();
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }     }
 #endif  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
  
     try{  #if defined(PEGASUS_OS_TYPE_UNIX)
         if(_tickle_peer_socket >= 0)  
         {  // Use an anonymous pipe for the tickle connection.
             Socket::close(_tickle_peer_socket);  
         }  void Tickler::_initialize()
         if(_tickle_client_socket >= 0)  
         {         {
             Socket::close(_tickle_client_socket);      int fds[2];
         }  
         if(_tickle_server_socket >= 0)      if (pipe(fds) == -1)
         {         {
             Socket::close(_tickle_server_socket);          MessageLoaderParms parms(
         }              "Common.Monitor.TICKLE_CREATE",
               "Received error number $0 while creating the internal socket.",
               getSocketError());
           throw Exception(parms);
     }     }
     catch(...)  
     {      _serverSocket = fds[0];
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      _clientSocket = fds[1];
                   "Failed to close tickle sockets");  
     }     }
  
     Socket::uninitializeInterface();  #else
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "returning from monitor destructor");  // Use an external loopback socket connection to allow the tickle socket to
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  // be included in the select() array on non-Unix platforms.
   
   void Tickler::_initialize()
     {     {
         AutoMutex automut(Monitor::_cout_mut);      //
         PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);      // Set up the addresses for the listen, client, and server sockets
       // based on whether IPv6 is enabled.
       //
   
       Socket::initializeInterface();
   
   # ifdef PEGASUS_ENABLE_IPV6
       struct sockaddr_storage listenAddress;
       struct sockaddr_storage clientAddress;
       struct sockaddr_storage serverAddress;
   # else
       struct sockaddr_in listenAddress;
       struct sockaddr_in clientAddress;
       struct sockaddr_in serverAddress;
   # endif
   
       int addressFamily;
       SocketLength addressLength;
   
       memset(&listenAddress, 0, sizeof (listenAddress));
   
   # ifdef PEGASUS_ENABLE_IPV6
       if (System::isIPv6StackActive())
       {
           // Use the IPv6 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV6,
               "::1",
               &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
           listenAddress.ss_family = AF_INET6;
           reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
   
           addressFamily = AF_INET6;
           addressLength = sizeof(struct sockaddr_in6);
     }     }
       else
 #endif #endif
 }  
   
 void Monitor::initializeTickler(){  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {     {
         AutoMutex automut(Monitor::_cout_mut);          // Use the IPv4 loopback address for the listen sockets
         PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);          HostAddress::convertTextToBinary(
               HostAddress::AT_IPV4,
               "127.0.0.1",
               &reinterpret_cast<struct sockaddr_in*>(
                   &listenAddress)->sin_addr.s_addr);
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
               AF_INET;
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
   
           addressFamily = AF_INET;
           addressLength = sizeof(struct sockaddr_in);
     }     }
 #endif  
     /*  
        NOTE: On any errors trying to  
              setup out tickle connection,  
              throw an exception/end the server  
     */  
  
     /* setup the tickle server/listener */      // Use the same address for the client socket as the listen socket
       clientAddress = listenAddress;
   
       //
       // Set up a listen socket to allow the tickle client and server to connect
       //
  
     // get a socket for the server side      // Create the listen socket
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){      if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
         //handle error               PEGASUS_INVALID_SOCKET)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",      {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // initialize the address      // Bind the listen socket to the loopback address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));      if (::bind(
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM              _listenSocket,
 #pragma convert(37)              reinterpret_cast<struct sockaddr*>(&listenAddress),
 #endif              addressLength) < 0)
     _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");      {
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM          MessageLoaderParms parms(
 #pragma convert(0)              "Common.Monitor.TICKLE_BIND",
 #endif  
     _tickle_server_addr.sin_family = PF_INET;  
     _tickle_server_addr.sin_port = 0;  
   
     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);  
   
     // bind server side to socket  
     if((::bind(_tickle_server_socket,  
                reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),  
                sizeof(_tickle_server_addr))) < 0){  
         // handle error  
 #ifdef PEGASUS_OS_ZOS  
     MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",  
                                  "Received error:$0 while binding the internal socket.",strerror(errno));  
 #else  
         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",  
                                  "Received error number $0 while binding the internal socket.",                                  "Received error number $0 while binding the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // tell the kernel we are a server      // Listen for a connection from the tickle client
     if((::listen(_tickle_server_socket,3)) < 0){      if ((::listen(_listenSocket, 3)) < 0)
         // handle error      {
         MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",          MessageLoaderParms parms(
               "Common.Monitor.TICKLE_LISTEN",
                          "Received error number $0 while listening to the internal socket.",                          "Received error number $0 while listening to the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // make sure we have the correct socket for our server      // Verify we have the correct listen socket
     int sock = ::getsockname(_tickle_server_socket,      SocketLength tmpAddressLength = addressLength;
                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),      int sock = ::getsockname(
                    &_addr_size);          _listenSocket,
     if(sock < 0){          reinterpret_cast<struct sockaddr*>(&listenAddress),
         // handle error          &tmpAddressLength);
         MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",      if (sock < 0)
       {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_SOCKNAME",
                          "Received error number $0 while getting the internal socket name.",                          "Received error number $0 while getting the internal socket name.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the tickle client/connector */      //
       // Set up the client side of the tickle connection.
       //
  
     // get a socket for our tickle client      // Create the client socket
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){      if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
         // handle error               PEGASUS_INVALID_SOCKET)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",      {
                          "Received error number $0 while creating the internal client socket.",          MessageLoaderParms parms(
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              "Common.Monitor.TICKLE_CLIENT_CREATE",
                                  errno);              "Received error number $0 while creating the internal client "
 #else                  "socket.",
                                  WSAGetLastError());              getSocketError());
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // setup the address of the client      // Bind the client socket to the loopback address
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));      if (::bind(
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM              _clientSocket,
 #pragma convert(37)              reinterpret_cast<struct sockaddr*>(&clientAddress),
 #endif              addressLength) < 0)
     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");      {
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM          MessageLoaderParms parms(
 #pragma convert(0)              "Common.Monitor.TICKLE_CLIENT_BIND",
 #endif              "Received error number $0 while binding the internal client "
     _tickle_client_addr.sin_family = PF_INET;                  "socket.",
     _tickle_client_addr.sin_port = 0;              getSocketError());
   
     // bind socket to client side  
     if((::bind(_tickle_client_socket,  
                reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),  
                sizeof(_tickle_client_addr))) < 0){  
         // handle error  
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",  
                          "Received error number $0 while binding the internal client socket.",  
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)  
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // connect to server side      // Connect the client socket to the listen socket address
     if((::connect(_tickle_client_socket,      if (::connect(
                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),              _clientSocket,
                   sizeof(_tickle_server_addr))) < 0){              reinterpret_cast<struct sockaddr*>(&listenAddress),
         // handle error              addressLength) < 0)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",      {
                          "Received error number $0 while connecting the internal client socket.",          MessageLoaderParms parms(
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              "Common.Monitor.TICKLE_CLIENT_CONNECT",
                                  errno);              "Received error number $0 while connecting the internal client "
 #else                  "socket.",
                                  WSAGetLastError());              getSocketError());
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the slave connection */      //
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));      // Set up the server side of the tickle connection.
     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);      //
     pegasus_sleep(1);  
       tmpAddressLength = addressLength;
     // this call may fail, we will try a max of 20 times to establish this peer connection  
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,      // Accept the client socket connection.
             reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),      _serverSocket = ::accept(
             &peer_size)) < 0){          _listenSocket,
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)          reinterpret_cast<struct sockaddr*>(&serverAddress),
         // Only retry on non-windows platforms.          &tmpAddressLength);
         if(_tickle_peer_socket == -1 && errno == EAGAIN)  
         {      if (_serverSocket == PEGASUS_SOCKET_ERROR)
           int retries = 0;      {
           do          MessageLoaderParms parms(
           {              "Common.Monitor.TICKLE_ACCEPT",
             pegasus_sleep(1);              "Received error number $0 while accepting the internal socket "
             _tickle_peer_socket = ::accept(_tickle_server_socket,                  "connection.",
                 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),              getSocketError());
                 &peer_size);          throw Exception(parms);
             retries++;  
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);  
         }         }
 #endif  
       //
       // Close the listen socket and make the other sockets non-blocking
       //
   
       Socket::close(_listenSocket);
       Socket::disableBlocking(_serverSocket);
       Socket::disableBlocking(_clientSocket);
     }     }
     if(_tickle_peer_socket == -1){  
         // handle error  
         MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",  
                          "Received error number $0 while accepting the internal socket connection.",  
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)  
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif #endif
         throw Exception(parms);  
   void Tickler::_uninitialize()
   {
       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
   
       try
       {
           Socket::close(_serverSocket);
           Socket::close(_clientSocket);
           Socket::close(_listenSocket);
     }     }
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only      catch (...)
     // checks entries with IDLE state for events  
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);  
         Tracer::trace(TRC_HTTP,Tracer::LEVEL2,"!!!!!!!! TICKLE SOCKET-ID = %u",_tickle_peer_socket);  
     entry._status = _MonitorEntry::IDLE;  
     _entries.append(entry);  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {     {
         AutoMutex automut(Monitor::_cout_mut);          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
         PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);              "Failed to close tickle sockets");
     }     }
 #endif      Socket::uninitializeInterface();
 } }
  
 void Monitor::tickle(void)  
   ////////////////////////////////////////////////////////////////////////////////
   //
   // Monitor
   //
   ////////////////////////////////////////////////////////////////////////////////
   
   #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   Monitor::Monitor()
      : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
       _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
   
       // Create a MonitorEntry for the Tickler and set its state to IDLE so the
       // Monitor will watch for its events.
       _entries.append(MonitorEntry(
           _tickler.getServerSocket(),
           1,
           MonitorEntry::STATUS_IDLE,
           MonitorEntry::TYPE_INTERNAL));
   
       // Start the count at 1 because _entries[0] is the Tickler
       for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
     {     {
         AutoMutex automut(Monitor::_cout_mut);          _entries.append(MonitorEntry());
         PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);      }
     }     }
 #endif  
     static char _buffer[] =  
     {  
       '0','0'  
     };  
                Tracer::trace (TRC_HTTP, Tracer::LEVEL2,  
                                    "Now Monitor::Tickle ");  
     AutoMutex autoMutex(_tickle_mutex);  
     Socket::disableBlocking(_tickle_client_socket);  
                        Tracer::trace (TRC_HTTP, Tracer::LEVEL2,  
                                            "Now Monitor::Tickle::Write() ");  
   
     Socket::write(_tickle_client_socket,&_buffer, 2);  
     Socket::enableBlocking(_tickle_client_socket);  
                        Tracer::trace (TRC_HTTP, Tracer::LEVEL2,  
                                    "Now Monitor::Tickled ");  
  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  Monitor::~Monitor()
     {     {
         AutoMutex automut(Monitor::_cout_mut);      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
         PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);                    "returning from monitor destructor");
     }     }
 #endif  
   void Monitor::tickle()
   {
       Socket::write(_tickler.getClientSocket(), "\0", 1);
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )  void Monitor::setState(
       Uint32 index,
       MonitorEntry::Status status)
 { {
       AutoMutex autoEntryMutex(_entriesMutex);
     // Set the state to requested state     // Set the state to requested state
     _entries[index]._status = status;      _entries[index].status = status;
 } }
  
 Boolean Monitor::run(Uint32 milliseconds)  void Monitor::run(Uint32 milliseconds)
 { {
       struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
     Boolean handled_events = false;  
     int i = 0;  
   
  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     AutoMutex autoEntryMutex(_entry_mut);      AutoMutex autoEntryMutex(_entriesMutex);
  
     ArrayIterator<_MonitorEntry> entries(_entries);      ArrayIterator<MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries      // Check the stopConnections flag.  If set, clear the Acceptor monitor
       // entries
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             if (entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
             {             {
                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)                  if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
                 {                 {
                                         if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||                      if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
                         entries[indx]._status.get() == _MonitorEntry::DYING )                          entries[indx].status == MonitorEntry::STATUS_DYING)
                    {                    {
                        // remove the entry                        // remove the entry
                        entries[indx]._status = _MonitorEntry::EMPTY;                          entries[indx].status = MonitorEntry::STATUS_EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       entries[indx]._status = _MonitorEntry::DYING;                          entries[indx].status = MonitorEntry::STATUS_DYING;
                    }                    }
                }                }
            }            }
Line 450 
Line 397 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)entries.size(); indx++)      for (Uint32 indx = 0; indx < entries.size(); indx++)
     {  
         const _MonitorEntry &entry = entries[indx];  
         if ((entry._status.get() == _MonitorEntry::DYING) &&  
            (entry._type == Monitor::CONNECTION))  
         {         {
           const MonitorEntry& entry = entries[indx];
  
           if ((entry.status == MonitorEntry::STATUS_DYING) &&
               (entry.type == MonitorEntry::TYPE_CONNECTION))
           {
             MessageQueue *q = MessageQueue::lookup(entry.queueId);             MessageQueue *q = MessageQueue::lookup(entry.queueId);
             PEGASUS_ASSERT(q != 0);             PEGASUS_ASSERT(q != 0);
             HTTPConnection &h = *static_cast<HTTPConnection *>(q);             HTTPConnection &h = *static_cast<HTTPConnection *>(q);
  
             if (h._connectionClosePending == false)             if (h._connectionClosePending == false)
                         {  
                             continue;                             continue;
                         }  
   
  
             // NOTE: do not attempt to delete while there are pending responses             // NOTE: do not attempt to delete while there are pending responses
             // coming thru. The last response to come thru after a             // coming thru. The last response to come thru after a
Line 475 
Line 419 
  
             if (h._responsePending == true)             if (h._responsePending == true)
             {             {
 // Added for NamedPipe implementation for windows                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)                      "Monitor::run - Ignoring connection delete request "
                 if  (!entry.namedPipeConnection)                          "because responses are still pending. "
                 {  
 #endif  
                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "  
                         "Ignoring connection delete request because "  
                         "responses are still pending. "  
                         "connection=0x%p, socket=%d\n",                         "connection=0x%p, socket=%d\n",
                         (void *)&h, h.getSocket());                      (void *)&h, h.getSocket()));
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
                 }  
                 else  
                 {  
                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "  
                         "Ignoring connection delete request because "  
                         "responses are still pending. "  
                         "connection=0x%p, NamedPipe=%d\n",  
                         (void *)&h, h.getNamedPipe().getPipe());  
                 }  
 #endif  
                 continue;                 continue;
             }             }
             h._connectionClosePending = false;             h._connectionClosePending = false;
             MessageQueue &o = h.get_owner();             MessageQueue &o = h.get_owner();
                     Message* message = 0;              Message* message= new CloseConnectionMessage(entry.socket);
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
             if (!entry.namedPipeConnection)  
             {  
 #endif  
                 message= new CloseConnectionMessage(entry.socket);  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
             }  
             else  
             {  
   
                             message= new CloseConnectionMessage(entry.namedPipe);  
   
             }  
 #endif  
             message->dest = o.getQueueId();             message->dest = o.getQueueId();
  
             // HTTPAcceptor is responsible for closing the connection.             // HTTPAcceptor is responsible for closing the connection.
Line 535 
Line 443 
             // unlocked will not result in an ArrayIndexOutOfBounds             // unlocked will not result in an ArrayIndexOutOfBounds
             // exception.             // exception.
  
             autoEntryMutex.unlock();              _entriesMutex.unlock();
             o.enqueue(message);             o.enqueue(message);
             autoEntryMutex.lock();              _entriesMutex.lock();
             // After enqueue a message and the autoEntryMutex has been released and locked again,  
             // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.  
  
               // After enqueue a message and the autoEntryMutex has been
               // released and locked again, the array of _entries can be
               // changed. The ArrayIterator has be reset with the original
               // _entries.
             entries.reset(_entries);             entries.reset(_entries);
         }         }
     }     }
Line 553 
Line 463 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
       SocketHandle maxSocketCurrentPass = 0;
     PEGASUS_SOCKET maxSocketCurrentPass = 0;      for (Uint32 indx = 0; indx < entries.size(); indx++)
     int indx;  
   
         // Record the indexes at which Sockets are available  
         Array <Uint32> socketCountAssociator;  
     int socketEntryCount=0;  
   
      // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
     //This array associates named pipe connections to their place in [indx]  
     //in the entries array. The value in portion zero of the array is the  
     //index of the fist named pipe connection in the entries array  
   
         // Record the indexes at which Pipes are available  
         Array <Uint32> indexPipeCountAssociator;  
     int pipeEntryCount=0;  
     int MaxPipes = PIPE_INCREMENT;  
     // List of Pipe Handlers  
     HANDLE * hPipeList = new HANDLE[PIPE_INCREMENT];  
 #endif  
   
     // This loop takes care of setting the namedpipe which has to be used from the list....  
     for ( indx = 0,socketEntryCount=0 ;  
                              indx < (int)entries.size(); indx++)  
     {  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
                 if (!entries[indx].namedPipeConnection)  
         {         {
 #endif  
             if (maxSocketCurrentPass < entries[indx].socket)             if (maxSocketCurrentPass < entries[indx].socket)
                         {  
                                 maxSocketCurrentPass = entries[indx].socket;                                 maxSocketCurrentPass = entries[indx].socket;
                         }  
             if(entries[indx]._status.get() == _MonitorEntry::IDLE)          if (entries[indx].status == MonitorEntry::STATUS_IDLE)
             {             {
                 _idleEntries++;                 _idleEntries++;
                 FD_SET(entries[indx].socket, &fdread);                 FD_SET(entries[indx].socket, &fdread);
                 socketCountAssociator.append(indx);  
                                 socketEntryCount++;  
             }  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
         }         }
                 else  
                 {  
                     entries[indx].pipeSet = false;  
                         if (pipeEntryCount >= MaxPipes)  
                         {  
                             MaxPipes += PIPE_INCREMENT;  
                                 HANDLE* temp_pList = new HANDLE[MaxPipes];  
                                 for (Uint32 i =0;i<pipeEntryCount;i++)  
                                 {  
                                     temp_pList[i] = hPipeList[i];  
                                 }  
                                 delete [] hPipeList;  
                                 hPipeList = temp_pList;  
                     }  
                         hPipeList[pipeEntryCount] = entries[indx].namedPipe.getPipe();  
                         indexPipeCountAssociator.append(indx);  
                         pipeEntryCount++;  
                 }  
   
 #endif  
     }     }
  
     /*     /*
Line 628 
Line 482 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     autoEntryMutex.unlock();      _entriesMutex.unlock();
   
     int events = -1;  
         // Since the pipes have been introduced, the ratio of procesing  
         // time Socket:Pipe :: 3/4:1/4 respectively  
   
         Uint32 newMilliseconds = milliseconds;  
         #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
   
         newMilliseconds = (milliseconds * 3)/4 ;  
   
     #endif  
   
         struct timeval tv = {newMilliseconds/1000, newMilliseconds%1000*1000};  
   
  
       //
       // The first argument to select() is ignored on Windows and it is not
       // a socket value.  The original code assumed that the number of sockets
       // and a socket value have the same type.  On Windows they do not.
       //
         #ifdef PEGASUS_OS_TYPE_WINDOWS         #ifdef PEGASUS_OS_TYPE_WINDOWS
                 events = select(0, &fdread, NULL, NULL, &tv);      int events = select(0, &fdread, NULL, NULL, &tv);
         #else         #else
                 events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
         #endif         #endif
       int selectErrno = getSocketError();
   
       _entriesMutex.lock();
  
     autoEntryMutex.lock();      struct timeval timeNow;
     // After enqueue a message and the autoEntryMutex has been released and locked again,      Time::gettimeofday(&timeNow);
     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries  
       // After enqueue a message and the autoEntryMutex has been released and
       // locked again, the array of _entries can be changed. The ArrayIterator
       // has be reset with the original _entries
     entries.reset(_entries);     entries.reset(_entries);
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS      if (events == PEGASUS_SOCKET_ERROR)
     if(events == SOCKET_ERROR)  
 #else  
     if(events == -1)  
 #endif  
     {     {
          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
          "Monitor::run - errorno = %d has occurred on select.", errno);              "Monitor::run - select() returned error %d.", selectErrno));
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the entries structure has been corrupted or that        // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
          PEGASUS_ASSERT(errno != EBADF);          PEGASUS_ASSERT(selectErrno != EBADF);
     }     }
     else if (events)     else if (events)
     {     {
          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
          "Monitor::run select event received events = %d, monitoring %d idle entries",              "Monitor::run select event received events = %d, monitoring %d "
          events, _idleEntries);                  "idle entries",
          for ( int sindx = 0; sindx < socketEntryCount; sindx++)              events, _idleEntries));
          {          for (Uint32 indx = 0; indx < entries.size(); indx++)
              // The Monitor should only look at entries in the table that are IDLE (i.e.,          {
              // owned by the Monitor).              // The Monitor should only look at entries in the table that are
                      indx = socketCountAssociator[sindx];              // IDLE (i.e., owned by the Monitor).
               if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
              if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&  
                   (FD_ISSET(entries[indx].socket, &fdread)))                   (FD_ISSET(entries[indx].socket, &fdread)))
              {              {
                  MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                  MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                "Monitor::run indx = %d, queueId =  %d, q = %p",  
                                indx, entries[indx].queueId, q);  
   
                  PEGASUS_ASSERT(q !=0);                  PEGASUS_ASSERT(q !=0);
                   PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                       "Monitor::run indx = %d, queueId = %d, q = %p",
                       indx, entries[indx].queueId, q));
  
                  try                  try
                  {                  {
                      if (entries[indx]._type == Monitor::CONNECTION)                      if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
                      {                      {
                          static_cast<HTTPConnection *>(q)->_entry_index = indx;                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                               "entries[%d].type is TYPE_CONNECTION",
                                                  // Do not update the entry just yet. The entry gets updated once                              indx));
                                                  // the request has been read.  
                                                  //entries[indx]._status = _MonitorEntry::BUSY;                          HTTPConnection *dst =
                               reinterpret_cast<HTTPConnection *>(q);
                           dst->_entry_index = indx;
   
                           // Update idle start time because we have received some
                           // data. Any data is good data at this point, and we'll
                           // keep the connection alive, even if we've exceeded
                           // the idleConnectionTimeout, which will be checked
                           // when we call closeConnectionOnTimeout() next.
                           Time::gettimeofday(&dst->_idleStartTime);
   
                           // Check for accept pending (ie. SSL handshake pending)
                           // or idle connection timeouts for sockets from which
                           // we received data (avoiding extra queue lookup below).
                           if (!dst->closeConnectionOnTimeout(&timeNow))
                           {
                               PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                                   "Entering HTTPConnection::run() for "
                                       "indx = %d, queueId = %d, q = %p",
                                   indx, entries[indx].queueId, q));
  
                                                  // If allocate_and_awaken failure, retry on next iteration  
                                                  HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);  
                                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                 "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",  
                                                  dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);  
                                                  try                                                  try
                                                  {                                                  {
                                                      dst->run(1);                                                      dst->run(1);
                                                  }                                                  }
                                                  catch (...)                                                  catch (...)
                                                  {                                                  {
                                                      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                                  PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
                                                           "Monitor::_dispatch: exception received");                                      "Caught exception from "
                                       "HTTPConnection::run()");
                                                  }                                                  }
                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                             "Monitor::_dispatch: exited run() for index %d",                                  "Exited HTTPConnection::run()");
                                                     dst->_entry_index);                              }
   
  
                                         }                                         }
                                         else if (entries[indx]._type == Monitor::INTERNAL)                      else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
                                         {                                         {
                                                 // set ourself to BUSY,                          char buffer;
                                                 // read the data                          Sint32 ignored =
                                             // and set ourself back to IDLE                              Socket::read(entries[indx].socket, &buffer, 1);
                                             static char buffer[2];  
                                         Socket::disableBlocking(entries[indx].socket);  
   
                                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);  
                                                 Socket::enableBlocking(entries[indx].socket);  
                                                 entries[indx]._status = _MonitorEntry::IDLE;  
                                     }                                     }
                                         else                                         else
                                         {                                         {
                                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                                                  "Non-connection entry, indx = %d, has been received.", indx);                              "Non-connection entry, indx = %d, has been "
                                   "received.",
                               indx));
                                                 int events = 0;                                                 int events = 0;
                                                 events |= SocketMessage::READ;                                                 events |= SocketMessage::READ;
                                                 Message *msg = new SocketMessage(entries[indx].socket, events);                          Message* msg = new SocketMessage(
                                                 entries[indx]._status = _MonitorEntry::BUSY;                              entries[indx].socket, events);
                                                 autoEntryMutex.unlock();                          entries[indx].status = MonitorEntry::STATUS_BUSY;
                           _entriesMutex.unlock();
                                                 q->enqueue(msg);                                                 q->enqueue(msg);
                                                 autoEntryMutex.lock();                          _entriesMutex.lock();
                                                 // After enqueue a message and the autoEntryMutex has been released and locked again,  
                                                 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries  
                                                 entries.reset(_entries);  
                                                 entries[indx]._status = _MonitorEntry::IDLE;  
                                                 handled_events = true;  
                                                 delete [] hPipeList;  
                                                 return handled_events;  
  
                           // After enqueue a message and the autoEntryMutex has
                           // been released and locked again, the array of
                           // entries can be changed. The ArrayIterator has to be
                           // reset with the latest _entries.
                           entries.reset(_entries);
                           entries[indx].status = MonitorEntry::STATUS_IDLE;
                                         }                                         }
                                 }                                 }
                                 catch(...)                                 catch(...)
                                 {                                 {
                                 }                                 }
                                 handled_events = true;  
                         }  
         }  
                 delete [] hPipeList;  
                 return handled_events;  
     }  
   
   
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
   
         //if no pipes are registered return immediately  
   
         int pEvents = -1;  
         int pCount = -1;  
         BOOL bPeekPipe = 0;  
         DWORD dwBytesAvail=0;  
         // The pipe is sniffed and check if there are any data. If available, the  
         // message is picked from the Queue and appropriate methods are invoked.  
   
   
         // pipeProcessCount records the number of requests that are processed.  
         // At the end of loop this is verified against the count of request  
         // on local connection . If there are any pipes which needs to be  
         // processed we would apply delay and then proceed to iterate.  
   
     Uint32 pipeProcessCount =0;  
   
     for (int counter = 1; counter < maxIterations ; counter ++)  
     {  
   
   
                 // pipeIndex is used to index into indexPipeCountAssociator to fetch  
                 // index of the _MonitorEntry of Monitor  
         for (int pipeIndex = 0; pipeIndex < pipeEntryCount; pipeIndex++)  
             {  
             dwBytesAvail = 0;  
                     bPeekPipe = ::PeekNamedPipe(hPipeList[pipeIndex],  
                                                     NULL,  
                                                                     NULL,  
                                                                         NULL,  
                                         &dwBytesAvail,  
                                                                         NULL  
                                                                        );  
   
                         // If peek on NamedPipe was successfull and data is available  
             if (bPeekPipe && dwBytesAvail)  
                 {  
   
                             Tracer::trace(TRC_HTTP,Tracer::LEVEL4," PIPE_PEEKING FOUND = %u BYTES", dwBytesAvail);  
   
                             pEvents = 1;  
                     entries[indexPipeCountAssociator[pipeIndex]].pipeSet = true;  
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                     "Monitor::run select event received events = %d, \  
                                         monitoring %d idle entries",  
                     pEvents,  
                                         _idleEntries);  
   
                                 int pIndx = indexPipeCountAssociator[pipeIndex];  
   
                                 if ((entries[pIndx]._status.get() == _MonitorEntry::IDLE) &&  
                                          entries[pIndx].namedPipe.isConnected() &&  
                                          (pEvents))  
                         {  
   
                                 MessageQueue *q = 0;  
   
                     try  
                                         {  
   
                                         q = MessageQueue::lookup (entries[pIndx].queueId);  
                     }  
                     catch (Exception e)  
                     {  
                                         e.getMessage();  
                                 }  
                     catch(...)  
                     {  
                                 }                                 }
               // else check for accept pending (ie. SSL handshake pending) or
               // idle connection timeouts for sockets from which we did not
               // receive data.
               else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                   entries[indx].type == MonitorEntry::TYPE_CONNECTION)
  
                                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                   "Monitor::run indx = %d, queueId =  %d,\  
                                                                   q = %p",pIndx, entries[pIndx].queueId, q);  
                     try  
                     {                     {
                                         if (entries[pIndx]._type == Monitor::CONNECTION)                  MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                         {                  HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  dst->closeConnectionOnTimeout(&timeNow);
                                                                       "entries[indx].type for indx = \  
                                                                               %d is Monitor::CONNECTION",  
                                                                                   pIndx);  
                                                     static_cast<HTTPConnection *>(q)->_entry_index = pIndx;  
                                                 HTTPConnection *dst = reinterpret_cast \  
                                                                                            <HTTPConnection *>(q);  
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                       "Monitor::_dispatch: entering run() \  
                                                                                   for indx  = %d, queueId = %d, \  
                                                                                   q = %p",\  
                                                               dst->_entry_index,  
                                                                                   dst->_monitor->_entries\  
                                                                                   [dst->_entry_index].queueId, dst);  
   
                                                 try  
                                                 {  
   
                                                         dst->run(1);  
   
                                                         // Record that the requested data is read/Written  
                                                         pipeProcessCount++;  
   
                                                 }                                                 }
                                                 catch (...)  
                                                 {  
                                                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                               "Monitor::_dispatch: \  
                                                                                            exception received");  
                                                 }                                                 }
   
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                               "Monitor::_dispatch: exited \  
                                                                        \run() index %d",  
                                                                                    dst->_entry_index);  
   
   
                                         }                                         }
       // else if "events" is zero (ie. select timed out) then we still need
       // to check if there are any pending SSL handshakes that have timed out.
                                         else                                         else
                                         {                                         {
                                                 /* The condition          for (Uint32 indx = 0; indx < entries.size(); indx++)
                                                            entries[indx]._type == Monitor::INTERNAL can be  
                                                            ignored for pipes as the tickler is of  
                                                            Monitor::INTERNAL type. The tickler is  
                                                            a socket.  
                                                 */  
   
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                                   "Non-connection entry, indx = %d,\  
                                                                                           has been received.", pIndx);  
                                                 int events = 0;  
                                                 Message *msg = 0;  
   
                                                     pEvents |= NamedPipeMessage::READ;  
                                                     msg = new NamedPipeMessage(entries[pIndx].namedPipe, pEvents);  
                                     entries[pIndx]._status = _MonitorEntry::BUSY;  
                                     autoEntryMutex.unlock();  
                                             q->enqueue(msg);  
                                                 autoEntryMutex.lock();  
                                     entries.reset(_entries);  
                                     entries[pIndx]._status = _MonitorEntry::IDLE;  
                                                         delete [] hPipeList;  
                                 return(handled_events);  
   
                                         }  
   
   
                                 }  
                                 catch(...)  
                                 {                                 {
               if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                         }                  entries[indx].type == MonitorEntry::TYPE_CONNECTION)
                     }  
   
                 }  
         }  
   
                 //Check if all the pipes had recieved the data, If no then try again  
         if (pipeEntryCount == pipeProcessCount)  
                 {                 {
                     break;                  MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
                 }                 }
   
   
     }     }
   
         delete [] hPipeList;  
   
 #endif  
   
     return(handled_events);  
 } }
  
 void Monitor::stopListeningForConnections(Boolean wait) void Monitor::stopListeningForConnections(Boolean wait)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
     // set boolean then tickle the server to recognize _stopConnections     // set boolean then tickle the server to recognize _stopConnections
     _stopConnections = 1;     _stopConnections = 1;
Line 960 
Line 662 
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     PEGASUS_SOCKET socket,      SocketHandle socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)      Uint32 type)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
    AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
   
    // Check to see if we need to dynamically grow the _entries array    // Check to see if we need to dynamically grow the _entries array
    // We always want the _entries array to 2 bigger than the      // We always want the _entries array to be 2 bigger than the
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();  
    if((int)_solicitSocketCount >= (size-1)){      for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){      {
                 _MonitorEntry entry(0, 0, 0);          _entries.append(MonitorEntry());
                 _entries.append(entry);  
         }  
    }    }
  
    int index;      for (Uint32 index = 1; index < _entries.size(); index++)
    for(index = 1; index < (int)_entries.size(); index++)  
    {    {
       try       try
       {       {
          if(_entries[index]._status.get() == _MonitorEntry::EMPTY)              if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
             _entries[index]._type = type;                  _entries[index].type = type;
             _entries[index]._status = _MonitorEntry::IDLE;                  _entries[index].status = MonitorEntry::STATUS_IDLE;
  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG                  return (int)index;
             {  
                 AutoMutex automut(Monitor::_cout_mut);  
                 PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
             }  
 #endif  
             return index;  
          }          }
       }       }
       catch(...)       catch(...)
       {       {
       }       }
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful      // decrease the count, if we are here we didn't do anything meaningful
       _solicitSocketCount--;
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
    {  
        AutoMutex automut(Monitor::_cout_mut);  
        PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
    }  
 #endif  
    return -1;    return -1;
   
 } }
  
 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)  void Monitor::unsolicitSocketMessages(SocketHandle socket)
 {  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {     {
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
  
     /*     /*
         Start at index = 1 because _entries[0] is the tickle entry which never needs          Start at index = 1 because _entries[0] is the tickle entry which
         to be EMPTY;          never needs to be reset to EMPTY;
     */     */
     unsigned int index;      for (Uint32 index = 1; index < _entries.size(); index++)
     for(index = 1; index < _entries.size(); index++)  
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;              _entries[index].reset();
           _entries[index].socket = PEGASUS_INVALID_SOCKET;  
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 1062 
Line 729 
  
     /*     /*
         Dynamic Contraction:         Dynamic Contraction:
         To remove excess entries we will start from the end of the _entries array          To remove excess entries we will start from the end of the _entries
         and remove all entries with EMPTY status until we find the first NON EMPTY.          array and remove all entries with EMPTY status until we find the
         This prevents the positions, of the NON EMPTY entries, from being changed.          first NON EMPTY.  This prevents the positions, of the NON EMPTY
           entries, from being changed.
     */     */
     index = _entries.size() - 1;      for (Uint32 index = _entries.size() - 1;
     while(_entries[index]._status.get() == _MonitorEntry::EMPTY){           (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)               (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
                 _entries.remove(index);           index--)
         index--;  
     }  
     PEG_METHOD_EXIT();  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 }  
   
 // Note: this is no longer called with PEP 183.  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)  
 {  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);  
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",  
         dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);  
    try  
    {  
       dst->run(1);  
    }  
    catch (...)  
    {  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::_dispatch: exception received");  
    }  
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);  
   
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);  
   
    // Once the HTTPConnection thread has set the status value to either  
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
    // to the Monitor.  It is no longer permissible to access the connection  
    // or the entry in the _entries table.  
    if (dst->_connectionClosePending)  
    {  
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;  
    }  
    else  
    {  
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;  
    }  
    return 0;  
 }  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 //This method is anlogus to solicitSocketMessages. It does the same thing for named Pipes  
 int  Monitor::solicitPipeMessages(  
     NamedPipe namedPipe,  
     Uint32 events,  //not sure what has to change for this enum  
     Uint32 queueId,  
     int type)  
 {  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");  
   
    AutoMutex autoMut(_entry_mut);  
    // Check to see if we need to dynamically grow the _entries array  
    // We always want the _entries array to 2 bigger than the  
    // current connections requested  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
 {  
    AutoMutex automut(Monitor::_cout_mut);  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);  
 }  
 #endif  
   
    _solicitSocketCount++;  // bump the count  
    int size = (int)_entries.size();  
    if((int)_solicitSocketCount >= (size-1)){  
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){  
                 _MonitorEntry entry(0, 0, 0);  
                 _entries.append(entry);  
         }  
    }  
   
    int index;  
    for(index = 1; index < (int)_entries.size(); index++)  
    {  
       try  
       {  
          if(_entries[index]._status.get() == _MonitorEntry::EMPTY)  
          {  
             _entries[index].socket = NULL;  
             _entries[index].namedPipe = namedPipe;  
             _entries[index].namedPipeConnection = true;  
             _entries[index].queueId  = queueId;  
             _entries[index]._type = type;  
             _entries[index]._status = _MonitorEntry::IDLE;  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
   {  
             AutoMutex automut(Monitor::_cout_mut);  
             PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);  
   }  
 #endif  
   
             return index;  
          }  
       }  
       catch(...)  
       {  
       }  
   
    }  
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);  
   
    PEG_METHOD_EXIT();  
    return -1;  
   
 }  
   
 //////////////////////////////////////////////////////////////////////////////  
 // Method Name      : unsolicitPipeMessages  
 // Input Parameter  : namedPipe  - type NamedPipe  
 // Return Type      : void  
 //============================================================================  
 // This method is invoked from HTTPAcceptor::handleEnqueue for server  
 // when the CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked  
 // from HTTPAcceptor::destroyConnections method when the CIMServer is shutdown.  
 // For the CIMClient, this is invoked from HTTPConnector::handleEnqueue when the  
 // CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked from  
 // HTTPConnector::disconnect when CIMClient requests a disconnect request.  
 // The list of _MonitorEntry is searched for the matching pipe.  
 // The Handle of the identified is closed and _MonitorEntry for the  
 // requested pipe is removed.  
 ///////////////////////////////////////////////////////////////////////////////  
   
 void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)  
 {  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");  
     AutoMutex autoMut(_entry_mut);  
   
     /*  
         Start at index = 1 because _entries[0] is the tickle entry which never needs  
         to be EMPTY;  
     */  
     unsigned int index;  
     for (index = 1; index < _entries.size(); index++)  
     {  
         if (_entries[index].namedPipe.getPipe() == namedPipe.getPipe())  
         {  
             _entries[index]._status = _MonitorEntry::EMPTY;  
             // Ensure that the client has read the data  
                     ::FlushFileBuffers (namedPipe.getPipe());  
                     //Disconnect to release the pipe. This doesn't release Pipe Handle  
                     ::DisconnectNamedPipe (_entries[index].namedPipe.getPipe());  
             // Must use CloseHandle to Close Pipe  
                         ::CloseHandle(_entries[index].namedPipe.getPipe());  
                     _entries[index].namedPipe.disconnect();  
             _solicitSocketCount--;  
             break;  
         }  
     }  
   
     /*  
         Dynamic Contraction:  
         To remove excess entries we will start from the end of the _entries array  
         and remove all entries with EMPTY status until we find the first NON EMPTY.  
         This prevents the positions, of the NON EMPTY entries, from being changed.  
     */  
     index = _entries.size() - 1;  
     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)  
         {  
         if ((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||  
             (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))  
         {         {
                     _entries.remove(index);                     _entries.remove(index);
         }         }
         index--;  
     }  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
 #endif  
   
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.103.10.27  
changed lines
  Added in v.1.130

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2