(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.103.10.20 and 1.139.8.1

version 1.103.10.20, 2006/07/19 16:58:52 version 1.139.8.1, 2012/02/15 17:47:07
Line 1 
Line 1 
 //%2006////////////////////////////////////////////////////////////////////////  //%LICENSE////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development  // Licensed to The Open Group (TOG) under one or more contributor license
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.  // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;  // this work for additional information regarding copyright ownership.
 // IBM Corp.; EMC Corporation, The Open Group.  // Each contributor licenses this file to you under the OpenPegasus Open
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;  // Source License; you may not use this file except in compliance with the
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.  // License.
 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;  //
 // EMC Corporation; VERITAS Software Corporation; The Open Group.  // Permission is hereby granted, free of charge, to any person obtaining a
 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;  // copy of this software and associated documentation files (the "Software"),
 // EMC Corporation; Symantec Corporation; The Open Group.  // to deal in the Software without restriction, including without limitation
 //  // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 // Permission is hereby granted, free of charge, to any person obtaining a copy  // and/or sell copies of the Software, and to permit persons to whom the
 // of this software and associated documentation files (the "Software"), to  // Software is furnished to do so, subject to the following conditions:
 // deal in the Software without restriction, including without limitation the  //
 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or  // The above copyright notice and this permission notice shall be included
 // sell copies of the Software, and to permit persons to whom the Software is  // in all copies or substantial portions of the Software.
 // furnished to do so, subject to the following conditions:  //
 //  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN  // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED  // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT  // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR  // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT  // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN  // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION  //
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  //////////////////////////////////////////////////////////////////////////
 //  
 //==============================================================================  
 //  
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com  
 //              Amit K Arora (Bug#1153) amita@in.ibm.com  
 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090  
 //              Sushma Fernandes (sushma@hp.com) for Bug#2057  
 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101  
 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)  
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
   #include "Network.h"
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
   
 #include <cstring> #include <cstring>
 #include "Monitor.h" #include "Monitor.h"
 #include "MessageQueue.h" #include "MessageQueue.h"
 #include "Socket.h" #include "Socket.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
   #include <Pegasus/Common/HTTPAcceptor.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
   #include "HostAddress.h"
   #include <errno.h>
   
 //const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024  
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \  
 of <winsock.h>. It may have been indirectly included (e.g., by including \  
 <windows.h>). Find inclusion of that header which is visible to this \  
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \  
 otherwise, less than 64 clients (the default) will be able to connect to the \  
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."  
   
 # endif  
 # define FD_SETSIZE 1024  
 # include <windows.h>  
 #else  
 # include <sys/types.h>  
 # include <sys/socket.h>  
 # include <sys/time.h>  
 # include <netinet/in.h>  
 # include <netdb.h>  
 # include <arpa/inet.h>  
 #endif  
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0);  
 Mutex Monitor::_cout_mut;  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
  #define PIPE_INCREMENT 1  
 #endif  
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor  // Tickler
 // //
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32  Tickler::Tickler()
 Monitor::Monitor()      : _listenSocket(PEGASUS_INVALID_SOCKET),
    : _stopConnections(0),        _clientSocket(PEGASUS_INVALID_SOCKET),
      _stopConnectionsSem(0),        _serverSocket(PEGASUS_INVALID_SOCKET)
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 {  
     {     {
         AutoMutex automut(Monitor::_cout_mut);      try
         PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  
     Socket::initializeInterface();  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {     {
        _MonitorEntry entry(0, 0, 0);          _initialize();
        _entries.append(entry);  
     }     }
       catch (...)
     {     {
         AutoMutex automut(Monitor::_cout_mut);          _uninitialize();
         PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);          throw;
     }     }
 } }
  
 Monitor::~Monitor()  Tickler::~Tickler()
 {  
     {     {
         AutoMutex automut(Monitor::_cout_mut);      _uninitialize();
         PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }     }
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
  
     try{  void Tickler::notify()
         if(_tickle_peer_socket >= 0)  
         {         {
             Socket::close(_tickle_peer_socket);      Socket::write(_clientSocket, "\0", 1);
         }         }
         if(_tickle_client_socket >= 0)  
   void Tickler::reset()
         {         {
             Socket::close(_tickle_client_socket);      // Clear all bytes from the tickle socket
         }      char buffer[32];
         if(_tickle_server_socket >= 0)      while (Socket::read(_serverSocket, buffer, 32) > 0)
         {         {
             Socket::close(_tickle_server_socket);  
         }         }
     }     }
     catch(...)  
   #if defined(PEGASUS_OS_TYPE_UNIX)
   
   // Use an anonymous pipe for the tickle connection.
   
   void Tickler::_initialize()
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      int fds[2];
                   "Failed to close tickle sockets");  
     }  
  
     Socket::uninitializeInterface();      if (pipe(fds) == -1)
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "returning from monitor destructor");  
     {     {
         AutoMutex automut(Monitor::_cout_mut);          MessageLoaderParms parms(
         PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);              "Common.Monitor.TICKLE_CREATE",
               "Received error number $0 while creating the internal socket.",
               getSocketError());
           throw Exception(parms);
     }     }
   
       _serverSocket = fds[0];
       _clientSocket = fds[1];
   
       Socket::disableBlocking(_serverSocket);
 } }
  
 void Monitor::initializeTickler(){  #else
   
   // Use an external loopback socket connection to allow the tickle socket to
   // be included in the select() array on non-Unix platforms.
   
   void Tickler::_initialize()
     {     {
         AutoMutex automut(Monitor::_cout_mut);      //
         PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);      // Set up the addresses for the listen, client, and server sockets
     }      // based on whether IPv6 is enabled.
     /*      //
        NOTE: On any errors trying to  
              setup out tickle connection,  
              throw an exception/end the server  
     */  
  
     /* setup the tickle server/listener */      Socket::initializeInterface();
  
     // get a socket for the server side  # ifdef PEGASUS_ENABLE_IPV6
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){      struct sockaddr_storage listenAddress;
         //handle error      struct sockaddr_storage clientAddress;
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",      struct sockaddr_storage serverAddress;
                                  "Received error number $0 while creating the internal socket.",  # else
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)      struct sockaddr_in listenAddress;
                                  errno);      struct sockaddr_in clientAddress;
 #else      struct sockaddr_in serverAddress;
                                  WSAGetLastError());  # endif
   
       int addressFamily;
       SocketLength addressLength;
   
       memset(&listenAddress, 0, sizeof (listenAddress));
   
   # ifdef PEGASUS_ENABLE_IPV6
       if (System::isIPv6StackActive())
       {
           // Use the IPv6 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV6,
               "::1",
               &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
           listenAddress.ss_family = AF_INET6;
           reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
   
           addressFamily = AF_INET6;
           addressLength = sizeof(struct sockaddr_in6);
       }
       else
 #endif #endif
         throw Exception(parms);      {
           // Use the IPv4 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV4,
               "127.0.0.1",
               &reinterpret_cast<struct sockaddr_in*>(
                   &listenAddress)->sin_addr.s_addr);
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
               AF_INET;
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
   
           addressFamily = AF_INET;
           addressLength = sizeof(struct sockaddr_in);
     }     }
  
     // initialize the address      // Use the same address for the client socket as the listen socket
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));      clientAddress = listenAddress;
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
     _tickle_server_addr.sin_family = PF_INET;  
     _tickle_server_addr.sin_port = 0;  
  
     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);      //
       // Set up a listen socket to allow the tickle client and server to connect
       //
  
     // bind server side to socket      // Create the listen socket
     if((::bind(_tickle_server_socket,      if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
                reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),               PEGASUS_INVALID_SOCKET)
                sizeof(_tickle_server_addr))) < 0){      {
         // handle error          MessageLoaderParms parms(
 #ifdef PEGASUS_OS_ZOS              "Common.Monitor.TICKLE_CREATE",
     MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",              "Received error number $0 while creating the internal socket.",
                                  "Received error:$0 while binding the internal socket.",strerror(errno));              getSocketError());
 #else          throw Exception(parms);
         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",      }
   
       // Bind the listen socket to the loopback address
       if (::bind(
               _listenSocket,
               reinterpret_cast<struct sockaddr*>(&listenAddress),
               addressLength) < 0)
       {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_BIND",
                                  "Received error number $0 while binding the internal socket.",                                  "Received error number $0 while binding the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // tell the kernel we are a server      // Listen for a connection from the tickle client
     if((::listen(_tickle_server_socket,3)) < 0){      if ((::listen(_listenSocket, 3)) < 0)
         // handle error      {
         MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",          MessageLoaderParms parms(
               "Common.Monitor.TICKLE_LISTEN",
                          "Received error number $0 while listening to the internal socket.",                          "Received error number $0 while listening to the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // make sure we have the correct socket for our server      // Verify we have the correct listen socket
     int sock = ::getsockname(_tickle_server_socket,      SocketLength tmpAddressLength = addressLength;
                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),      int sock = ::getsockname(
                    &_addr_size);          _listenSocket,
     if(sock < 0){          reinterpret_cast<struct sockaddr*>(&listenAddress),
         // handle error          &tmpAddressLength);
         MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",      if (sock < 0)
       {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_SOCKNAME",
                          "Received error number $0 while getting the internal socket name.",                          "Received error number $0 while getting the internal socket name.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              getSocketError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the tickle client/connector */      //
       // Set up the client side of the tickle connection.
       //
  
     // get a socket for our tickle client      // Create the client socket
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){      if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
         // handle error               PEGASUS_INVALID_SOCKET)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",      {
                          "Received error number $0 while creating the internal client socket.",          MessageLoaderParms parms(
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              "Common.Monitor.TICKLE_CLIENT_CREATE",
                                  errno);              "Received error number $0 while creating the internal client "
 #else                  "socket.",
                                  WSAGetLastError());              getSocketError());
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // setup the address of the client      // Bind the client socket to the loopback address
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));      if (::bind(
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM              _clientSocket,
 #pragma convert(37)              reinterpret_cast<struct sockaddr*>(&clientAddress),
 #endif              addressLength) < 0)
     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");      {
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM          MessageLoaderParms parms(
 #pragma convert(0)              "Common.Monitor.TICKLE_CLIENT_BIND",
 #endif              "Received error number $0 while binding the internal client "
     _tickle_client_addr.sin_family = PF_INET;                  "socket.",
     _tickle_client_addr.sin_port = 0;              getSocketError());
   
     // bind socket to client side  
     if((::bind(_tickle_client_socket,  
                reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),  
                sizeof(_tickle_client_addr))) < 0){  
         // handle error  
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",  
                          "Received error number $0 while binding the internal client socket.",  
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)  
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // connect to server side      // Connect the client socket to the listen socket address
     if((::connect(_tickle_client_socket,      if (::connect(
                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),              _clientSocket,
                   sizeof(_tickle_server_addr))) < 0){              reinterpret_cast<struct sockaddr*>(&listenAddress),
         // handle error              addressLength) < 0)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",      {
                          "Received error number $0 while connecting the internal client socket.",          MessageLoaderParms parms(
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)              "Common.Monitor.TICKLE_CLIENT_CONNECT",
                                  errno);              "Received error number $0 while connecting the internal client "
 #else                  "socket.",
                                  WSAGetLastError());              getSocketError());
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the slave connection */      //
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));      // Set up the server side of the tickle connection.
     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);      //
     pegasus_sleep(1);  
       tmpAddressLength = addressLength;
     // this call may fail, we will try a max of 20 times to establish this peer connection  
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,      // Accept the client socket connection.
             reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),      _serverSocket = ::accept(
             &peer_size)) < 0){          _listenSocket,
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)          reinterpret_cast<struct sockaddr*>(&serverAddress),
         // Only retry on non-windows platforms.          &tmpAddressLength);
         if(_tickle_peer_socket == -1 && errno == EAGAIN)  
         {      if (_serverSocket == PEGASUS_SOCKET_ERROR)
           int retries = 0;      {
           do          MessageLoaderParms parms(
           {              "Common.Monitor.TICKLE_ACCEPT",
             pegasus_sleep(1);              "Received error number $0 while accepting the internal socket "
             _tickle_peer_socket = ::accept(_tickle_server_socket,                  "connection.",
                 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),              getSocketError());
                 &peer_size);          throw Exception(parms);
             retries++;  
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);  
         }         }
 #endif  
       //
       // Close the listen socket and make the other sockets non-blocking
       //
   
       Socket::close(_listenSocket);
       Socket::disableBlocking(_serverSocket);
       Socket::disableBlocking(_clientSocket);
     }     }
     if(_tickle_peer_socket == -1){  
         // handle error  
         MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",  
                          "Received error number $0 while accepting the internal socket connection.",  
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)  
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif #endif
         throw Exception(parms);  
     }  void Tickler::_uninitialize()
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only  
     // checks entries with IDLE state for events  
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);  
     entry._status = _MonitorEntry::IDLE;  
     _entries.append(entry);  
     {     {
         AutoMutex automut(Monitor::_cout_mut);      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
         PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 }  
  
 void Monitor::tickle(void)      try
 { {
           Socket::close(_serverSocket);
           Socket::close(_clientSocket);
           Socket::close(_listenSocket);
       }
       catch (...)
     {     {
         AutoMutex automut(Monitor::_cout_mut);          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
         PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);              "Failed to close tickle sockets");
       }
       Socket::uninitializeInterface();
     }     }
     static char _buffer[] =  
   
   ////////////////////////////////////////////////////////////////////////////////
   //
   // Monitor
   //
   ////////////////////////////////////////////////////////////////////////////////
   
   #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   Monitor::Monitor()
      : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0)
     {     {
       '0','0'      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     };      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     AutoMutex autoMutex(_tickle_mutex);      // Create a MonitorEntry for the Tickler and set its state to IDLE so the
     Socket::disableBlocking(_tickle_client_socket);      // Monitor will watch for its events.
     Socket::write(_tickle_client_socket,&_buffer, 2);      _entries.append(MonitorEntry(
     Socket::enableBlocking(_tickle_client_socket);          _tickler.getReadHandle(),
           1,
           MonitorEntry::STATUS_IDLE,
           MonitorEntry::TYPE_TICKLER));
   
       // Start the count at 1 because _entries[0] is the Tickler
       for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
     {     {
         AutoMutex automut(Monitor::_cout_mut);          _entries.append(MonitorEntry());
         PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }     }
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )  Monitor::~Monitor()
 { {
     // Set the state to requested state      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
     _entries[index]._status = status;                    "returning from monitor destructor");
 } }
  
 Boolean Monitor::run(Uint32 milliseconds)  void Monitor::tickle()
 { {
     {      _tickler.notify();
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }     }
  
     Boolean handled_events = false;  void Monitor::setState(
     int i = 0;      Uint32 index,
       MonitorEntry::Status status)
   {
       AutoMutex autoEntryMutex(_entriesMutex);
       // Set the state to requested state
       _entries[index].status = status;
   }
  
   void Monitor::run(Uint32 milliseconds)
   {
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     AutoMutex autoEntryMutex(_entry_mut);      AutoMutex autoEntryMutex(_entriesMutex);
  
     ArrayIterator<_MonitorEntry> entries(_entries);      ArrayIterator<MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries      // Check the stopConnections flag.  If set, clear the Acceptor monitor
       // entries
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             if (entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
             {             {
                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)                  if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
                 {                 {
                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||                      if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
                         entries[indx]._status.get() == _MonitorEntry::DYING )                          entries[indx].status == MonitorEntry::STATUS_DYING)
                    {                    {
                        // remove the entry                        // remove the entry
                                entries[indx]._status = _MonitorEntry::EMPTY;                          entries[indx].status = MonitorEntry::STATUS_EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       entries[indx]._status = _MonitorEntry::DYING;                          entries[indx].status = MonitorEntry::STATUS_DYING;
                    }                    }
                }                }
            }            }
Line 431 
Line 413 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)entries.size(); indx++)      for (Uint32 indx = 0; indx < entries.size(); indx++)
     {     {
         const _MonitorEntry &entry = entries[indx];          const MonitorEntry& entry = entries[indx];
        if ((entry._status.get() == _MonitorEntry::DYING) &&  
            (entry._type == Monitor::CONNECTION))          if ((entry.status == MonitorEntry::STATUS_DYING) &&
               (entry.type == MonitorEntry::TYPE_CONNECTION))
        {        {
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
           PEGASUS_ASSERT(q != 0);           PEGASUS_ASSERT(q != 0);
Line 452 
Line 435 
  
                                         if (h._responsePending == true)                                         if (h._responsePending == true)
                                         {                                         {
                         if (!entry.namedPipeConnection)                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                         {                      "Monitor::run - Ignoring connection delete request "
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                          "because responses are still pending. "
                                                                                                         "Ignoring connection delete request because "  
                                                                                                         "responses are still pending. "  
                                                                                                         "connection=0x%p, socket=%d\n",                                                                                                         "connection=0x%p, socket=%d\n",
                                                                                                         (void *)&h, h.getSocket());                      (void *)&h, h.getSocket()));
                         }  
                         else  
                         {  
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "  
                                                                                                         "Ignoring connection delete request because "  
                                                                                                         "responses are still pending. "  
                                                                                                         "connection=0x%p, NamedPipe=%d\n",  
                                                                                                         (void *)&h, h.getNamedPipe().getPipe());  
                         }  
                                                 continue;                                                 continue;
                                         }                                         }
                                         h._connectionClosePending = false;                                         h._connectionClosePending = false;
           MessageQueue &o = h.get_owner();              HTTPAcceptor &o = h.getOwningAcceptor();
           Message* message;              Message* message= new CloseConnectionMessage(entry.socket);
           if (!entry.namedPipeConnection)  
           {  
               message= new CloseConnectionMessage(entry.socket);  
           }  
           else  
           {  
               message= new CloseConnectionMessage(entry.namedPipe);  
   
           }  
           message->dest = o.getQueueId();           message->dest = o.getQueueId();
  
           // HTTPAcceptor is responsible for closing the connection.           // HTTPAcceptor is responsible for closing the connection.
Line 496 
Line 459 
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           autoEntryMutex.unlock();              _entriesMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           autoEntryMutex.lock();              _entriesMutex.lock();
           // After enqueue a message and the autoEntryMutex has been released and locked again,  
           // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.              // After enqueue a message and the autoEntryMutex has been
               // released and locked again, the array of _entries can be
               // changed. The ArrayIterator has be reset with the original
               // _entries.
           entries.reset(_entries);           entries.reset(_entries);
        }        }
     }     }
Line 513 
Line 479 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     //Array<HANDLE> pipeEventArray;      SocketHandle maxSocketCurrentPass = 0;
         PEGASUS_SOCKET maxSocketCurrentPass = 0;      for (Uint32 indx = 0; indx < entries.size(); indx++)
     int indx;  
   
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
   
     //This array associates named pipe connections to their place in [indx]  
     //in the entries array. The value in poition zero of the array is the  
     //index of the fist named pipe connection in the entries array  
     Array <Uint32> indexPipeCountAssociator;  
     int pipeEntryCount=0;  
     int MaxPipes = PIPE_INCREMENT;  
     HANDLE* hEvents = new HANDLE[PIPE_INCREMENT];  
   
 #endif  
   
     for( indx = 0; indx < (int)entries.size(); indx++)  
     {     {
   
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
        if(entries[indx].isNamedPipeConnection())  
        {  
   
            //entering this clause mean that a Named Pipe connection is at entries[indx]  
            //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;  
   
            //cout << "In Monitor::run - pipe being added to array is " << entries[indx].namedPipe.getName() << endl;  
   
             entries[indx].pipeSet = false;  
   
            // We can Keep a counter in the Monitor class for the number of named pipes ...  
            //  Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )  
             if (pipeEntryCount >= MaxPipes)  
             {  
                // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' begining - pipeEntryCount=" <<  
                    // pipeEntryCount << " MaxPipes=" << MaxPipes << endl;  
                  MaxPipes += PIPE_INCREMENT;  
                  HANDLE* temp_hEvents = new HANDLE[MaxPipes];  
   
                  for (Uint32 i =0;i<pipeEntryCount;i++)  
                  {  
                      temp_hEvents[i] = hEvents[i];  
                  }  
   
                  delete [] hEvents;  
   
                  hEvents = temp_hEvents;  
                 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' ending"<< endl;  
   
             }  
   
            //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);  
            hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap()->hEvent;  
   
            indexPipeCountAssociator.append(indx);  
   
     pipeEntryCount++;  
   
   
   
        }  
        else  
   
 #endif  
        {  
   
            if(maxSocketCurrentPass < entries[indx].socket)            if(maxSocketCurrentPass < entries[indx].socket)
             maxSocketCurrentPass = entries[indx].socket;             maxSocketCurrentPass = entries[indx].socket;
  
            if(entries[indx]._status.get() == _MonitorEntry::IDLE)          if (entries[indx].status == MonitorEntry::STATUS_IDLE)
            {            {
                _idleEntries++;                _idleEntries++;
                FD_SET(entries[indx].socket, &fdread);                FD_SET(entries[indx].socket, &fdread);
            }            }
   
        }  
   }   }
  
     /*     /*
Line 599 
Line 498 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     autoEntryMutex.unlock();      _entriesMutex.unlock();
  
     //     //
     // The first argument to select() is ignored on Windows and it is not     // The first argument to select() is ignored on Windows and it is not
     // a socket value.  The original code assumed that the number of sockets     // a socket value.  The original code assumed that the number of sockets
     // and a socket value have the same type.  On Windows they do not.     // and a socket value have the same type.  On Windows they do not.
     //     //
   
     int events;  
     int pEvents;  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
       int events = select(0, &fdread, NULL, NULL, &tv);
    // events = select(0, &fdread, NULL, NULL, &tv);  
   
     //if (events == NULL)  
     //{  // This connection uses namedPipes  
   
         events = 0;  
         DWORD dwWait=NULL;  
         pEvents = 0;  
   
         {  
         AutoMutex automut(Monitor::_cout_mut);  
         cout << "Monitor::run - Calling WaitForMultipleObjects\n";  
         }  
   
         //this should be in a try block  
   
     dwWait = WaitForMultipleObjects(  
                  MaxPipes,  
                  hEvents,               //ABB:- array of event objects  
                  FALSE,                 // ABB:-does not wait for all  
                  milliseconds);        //ABB:- timeout value   //WW this may need be shorter  
   
     if(dwWait == WAIT_TIMEOUT)  
         {  
         {  
             AutoMutex automut(Monitor::_cout_mut);  
         cout << "Wait WAIT_TIMEOUT\n";  
         cout << "Monitor::run before the select in TIMEOUT clause events = " << events << endl;  
   
         events = select(0, &fdread, NULL, NULL, &tv);  
         cout << "Monitor::run after the select in TIMEOUT clause events = " << events << endl;  
         }  
   
                    // Sleep(2000);  
             //continue;  
   
              //return false;  // I think we do nothing.... Mybe there is a socket connection... so  
              // cant return.  
         }  
         else if (dwWait == WAIT_FAILED)  
         {  
             if (GetLastError() == 6) //WW this may be too specific  
             {  
                 AutoMutex automut(Monitor::_cout_mut);  
                 cout << "Monitor::run about to call 'select since waitForMultipleObjects failed\n";  
                 /********* NOTE  
                 this time (tv) combined with the waitForMulitpleObjects timeout is  
                 too long it will cause the client side to time out  
                 ******************/  
                 events = select(0, &fdread, NULL, NULL, &tv);  
   
             }  
             else  
             {  
                 AutoMutex automut(Monitor::_cout_mut);  
                 cout << "Wait Failed returned\n";  
                 cout << "failed with " << GetLastError() << "." << endl;  
                 pEvents = -1;  
                 return false;  
             }  
         }  
         else  
         {  
             int pCount = dwWait - WAIT_OBJECT_0;  // determines which pipe  
             {  
                  {  
                      AutoMutex automut(Monitor::_cout_mut);  
                      // cout << endl << "****************************" <<  
                      //  "Monitor::run WaitForMultiPleObject returned activity on server pipe: "<<  
                      //  pCount<< endl <<  endl;  
                      cout << "Monitor::run WaitForMultiPleObject returned activity pipeEntrycount is " <<  
                      pipeEntryCount <<  
                      " this is the type " << entries[indexPipeCountAssociator[pCount]]._type << " this is index " << indexPipeCountAssociator[pCount] << endl;  
                  }  
   
                /* There is a timeing problem here sometimes the wite in HTTPConnection i s  
              not all the way done (has not _monitor->setState (_entry_index, _MonitorEntry::IDLE) )  
              there for that should be done here if it is not done alread*/  
   
                if (entries[indexPipeCountAssociator[pCount]]._status.get() != _MonitorEntry::IDLE)  
                {  
                    this->setState(indexPipeCountAssociator[pCount], _MonitorEntry::IDLE);  
                    AutoMutex automut(Monitor::_cout_mut);  
                    cout << "setting state of index " << indexPipeCountAssociator[pCount]  << " to IDLE" << endl;  
                }  
   
   
             }  
   
             pEvents = 1;  
   
             //this statment gets the pipe entry that was trigered  
             entries[indexPipeCountAssociator[pCount]].pipeSet = true;  
   
         }  
 #else #else
     events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
     autoEntryMutex.lock();      int selectErrno = getSocketError();
     // After enqueue a message and the autoEntryMutex has been released and locked again,  
     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries  
     entries.reset(_entries);  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if(pEvents == -1)  
     {  
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::run - errorno = %d has occurred on select.",GetLastError() );  
        // The EBADF error indicates that one or more or the file  
        // descriptions was not valid. This could indicate that  
        // the entries structure has been corrupted or that  
        // we have a synchronization error.  
  
         // We need to generate an assert  here...      _entriesMutex.lock();
        PEGASUS_ASSERT(GetLastError()!= EBADF);  
  
       struct timeval timeNow;
       Time::gettimeofday(&timeNow);
  
     }      // After enqueue a message and the autoEntryMutex has been released and
       // locked again, the array of _entries can be changed. The ArrayIterator
       // has be reset with the original _entries
       entries.reset(_entries);
  
     if(events == SOCKET_ERROR)      if (events == PEGASUS_SOCKET_ERROR)
 #else  
     if(events == -1)  
 #endif  
     {     {
           PEG_TRACE((TRC_HTTP, Tracer::LEVEL1,
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              "Monitor::run - select() returned error %d.", selectErrno));
           "Monitor::run - errorno = %d has occurred on select.", errno);  
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the entries structure has been corrupted or that        // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);          PEGASUS_ASSERT(selectErrno != EBADF);
     }  
     else if ((events)||(pEvents))  
     {  
   
         {  
                  AutoMutex automut(Monitor::_cout_mut);  
      cout << "IN Monior::run events= " << events << " pEvents= " << pEvents<< endl;  
         }         }
       else if (events)
   
      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::run select event received events = %d, monitoring %d idle entries",  
            events, _idleEntries);  
        for( int indx = 0; indx < (int)entries.size(); indx++)  
        {        {
            //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl;          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           // The Monitor should only look at entries in the table that are IDLE (i.e.,              "Monitor::run select event received events = %d, monitoring %d "
           // owned by the Monitor).                  "idle entries",
         // cout << endl << " status of entry " << indx << " is " << entries[indx]._status.get() << endl;              events, _idleEntries));
           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&          for (Uint32 indx = 0; indx < entries.size(); indx++)
              ((FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||          {
              (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents))))              // The Monitor should only look at entries in the table that are
           {              // IDLE (i.e., owned by the Monitor).
               if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                   (FD_ISSET(entries[indx].socket, &fdread)))
               {               {
                  AutoMutex automut(Monitor::_cout_mut);                  MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                  cout <<"Monitor::run - index  " << indx << " just got into 'if' statement" << endl;  
               }  
   
               MessageQueue *q;  
            try{  
   
                  q = MessageQueue::lookup(entries[indx].queueId);  
               }  
              catch (Exception e)  
              {  
                  AutoMutex automut(Monitor::_cout_mut);  
                  cout << " this is what lookup gives - " << e.getMessage() << endl;  
                  exit(1);  
              }  
              catch(...)  
              {  
                  AutoMutex automut(Monitor::_cout_mut);  
                  cout << "MessageQueue::lookup gives strange exception " << endl;  
                  exit(1);  
              }  
   
   
   
   
               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "Monitor::run indx = %d, queueId =  %d, q = %p",  
                   indx, entries[indx].queueId, q);  
            //  printf("Monitor::run indx = %d, queueId =  %d, q = %p",  
              //     indx, entries[indx].queueId, q);  
              //cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;  
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
                   PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                       "Monitor::run indx = %d, queueId = %d, q = %p",
                       indx, entries[indx].queueId, q));
  
              try              try
              {              {
                 /* {                      if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
                  AutoMutex automut(Monitor::_cout_mut);  
                   cout <<" this is the type " << entries[indx]._type <<  
                       " for index " << indx << endl;  
                cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;  
                  }*/  
                if(entries[indx]._type == Monitor::CONNECTION)  
                 {  
                     {  
                     cout << "In Monitor::run Monitor::CONNECTION clause" << endl;  
                     AutoMutex automut(Monitor::_cout_mut);  
                     }  
   
                                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);  
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;  
   
                    // Do not update the entry just yet. The entry gets updated once  
                    // the request has been read.  
                    //entries[indx]._status = _MonitorEntry::BUSY;  
   
                    // If allocate_and_awaken failure, retry on next iteration  
 /* Removed for PEP 183.  
                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(  
                            (void *)q, _dispatch))  
                    {  
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                           "Monitor::run: Insufficient resources to process request.");  
                       entries[indx]._status = _MonitorEntry::IDLE;  
                       return true;  
                    }  
 */  
 // Added for PEP 183  
                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);  
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",  
                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);  
   
                    /*In the case of named Pipes, the request has already been read from the pipe  
                    therefor this section passed the request data to the HTTPConnection  
                    NOTE: not sure if this would be better suited in a sparate private method  
                    */  
   
                    dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needd  
                    {                    {
                        AutoMutex automut(Monitor::_cout_mut);                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    cout << "In Monitor::run after dst->setNamedPipe string read is " <<  entries[indx].namedPipe.raw << endl;                              "entries[%d].type is TYPE_CONNECTION",
                    }                              indx));
   
                           HTTPConnection *dst =
                               reinterpret_cast<HTTPConnection *>(q);
                           dst->_entry_index = indx;
   
                           // Update idle start time because we have received some
                           // data. Any data is good data at this point, and we'll
                           // keep the connection alive, even if we've exceeded
                           // the idleConnectionTimeout, which will be checked
                           // when we call closeConnectionOnTimeout() next.
                           Time::gettimeofday(&dst->_idleStartTime);
   
                           // Check for accept pending (ie. SSL handshake pending)
                           // or idle connection timeouts for sockets from which
                           // we received data (avoiding extra queue lookup below).
                           if (!dst->closeConnectionOnTimeout(&timeNow))
                           {
                               PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                                   "Entering HTTPConnection::run() for "
                                       "indx = %d, queueId = %d, q = %p",
                                   indx, entries[indx].queueId, q));
  
                    try                    try
                    {                    {
                        {  
                        AutoMutex automut(Monitor::_cout_mut);  
                        cout << "In Monitor::run about to call 'dst->run(1)' "  << endl;  
                        }  
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
                    {                    {
                        AutoMutex automut(Monitor::_cout_mut);                                  PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL1,
                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                                      "Caught exception from "
                        "Monitor::_dispatch: exception received");                                      "HTTPConnection::run()");
                    }  
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);  
   
                    if (entries[indx].isNamedPipeConnection())  
                    {  
                        entries[indx]._type = Monitor::ACCEPTOR;  
                    }  
   
                    // It is possible the entry status may not be set to busy.  
                    // The following will fail in that case.  
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);  
                    // Once the HTTPConnection thread has set the status value to either  
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
                    // to the Monitor.  It is no longer permissible to access the connection  
                    // or the entry in the _entries table.  
   
                    // The following is not relevant as the worker thread or the  
                    // reader thread will update the status of the entry.  
                    //if (dst->_connectionClosePending)  
                    //{  
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;  
                    //}  
                    //else  
                    //{  
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;  
                    //}  
 // end Added for PEP 183  
                 }  
                 else if( entries[indx]._type == Monitor::INTERNAL){  
                         // set ourself to BUSY,  
                         // read the data  
                         // and set ourself back to IDLE  
             cout << endl << " in - entries[indx]._type == Monitor::INTERNAL- " << endl << endl;  
             if (!entries[indx].isNamedPipeConnection())  
             {  
                             entries[indx]._status = _MonitorEntry::BUSY;  
                             static char buffer[2];  
                         Socket::disableBlocking(entries[indx].socket);  
                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);  
                         Socket::enableBlocking(entries[indx].socket);  
                             entries[indx]._status = _MonitorEntry::IDLE;  
             }             }
                               PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                                   "Exited HTTPConnection::run()");
                 }                 }
                 else  
                 {  
             {  
   
             AutoMutex automut(Monitor::_cout_mut);  
             cout << "In Monitor::run else clause of CONNECTION if statments" << endl;  
             }             }
                       else if (entries[indx].type == MonitorEntry::TYPE_TICKLER)
                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                      "Non-connection entry, indx = %d, has been received.", indx);  
                    int events = 0;  
            Message *msg;  
            {  
   
            AutoMutex automut(Monitor::_cout_mut);  
            cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;  
            }  
            if (entries[indx].isNamedPipeConnection())  
            {  
                if(!entries[indx].namedPipe.isConnectionPipe)  
                { /*if we enter this clasue it means that the named pipe that we are  
                    looking at has recived a connection but is not the pipe we get connection requests over.  
                    therefore we need to change the _type to CONNECTION and wait for a CIM Operations request*/  
                    entries[indx]._type = Monitor::CONNECTION;  
   
   
      /* This is a test  - this shows that the read file needs to be done  
      before we call wiatForMultipleObjects*/  
     /******************************************************  
     ********************************************************/  
   
   
   
         memset(entries[indx].namedPipe.raw,'\0',4096);  
         BOOL rc = ::ReadFile(  
                 entries[indx].namedPipe.getPipe(),  
                 &entries[indx].namedPipe.raw,  
                 NAMEDPIPE_MAX_BUFFER_SIZE,  
                 &entries[indx].namedPipe.bytesRead,  
                 entries[indx].namedPipe.getOverlap());  
   
         {  
          AutoMutex automut(Monitor::_cout_mut);  
          cout << "Monitor::run just called read on index " << indx << endl;  
         }  
   
   
          //&entries[indx].namedPipe.bytesRead = &size;  
         if(!rc)  
         {         {
             AutoMutex automut(Monitor::_cout_mut);                          _tickler.reset();
            cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;  
   
         }  
   
   
   
     /******************************************************  
     ********************************************************/  
   
   
   
   
                  continue;  
   
   
                }                }
                       else
                {                {
                    AutoMutex automut(Monitor::_cout_mut);                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                     cout << " In Monitor::run about to create a Pipe message" << endl;                              "Non-connection entry, indx = %d, has been "
                                   "received.",
                               indx));
                           Message* msg = new SocketMessage(
                               entries[indx].socket, SocketMessage::READ);
                           entries[indx].status = MonitorEntry::STATUS_BUSY;
                           _entriesMutex.unlock();
                           q->enqueue(msg);
                           _entriesMutex.lock();
  
                           // After enqueue a message and the autoEntryMutex has
                           // been released and locked again, the array of
                           // entries can be changed. The ArrayIterator has to be
                           // reset with the latest _entries.
                           entries.reset(_entries);
                           entries[indx].status = MonitorEntry::STATUS_IDLE;
                }                }
   
                events |= NamedPipeMessage::READ;  
                msg = new NamedPipeMessage(entries[indx].namedPipe, events);  
            }            }
            else                  catch (...)
            {  
                {                {
                AutoMutex automut(Monitor::_cout_mut);  
                cout << " In Monitor::run ..its a socket message" << endl;  
                }                }
                events |= SocketMessage::READ;  
                        msg = new SocketMessage(entries[indx].socket, events);  
            }            }
               // else check for accept pending (ie. SSL handshake pending) or
                    entries[indx]._status = _MonitorEntry::BUSY;              // idle connection timeouts for sockets from which we did not
                    autoEntryMutex.unlock();              // receive data.
                    q->enqueue(msg);              else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                    autoEntryMutex.lock();                  entries[indx].type == MonitorEntry::TYPE_CONNECTION)
            // After enqueue a message and the autoEntryMutex has been released and locked again,  
            // the array of entries can be changed. The ArrayIterator has be reset with the original _entries  
            entries.reset(_entries);  
                    entries[indx]._status = _MonitorEntry::IDLE;  
  
                    {                    {
                        AutoMutex automut(Monitor::_cout_mut);                  MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                        PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);                  PEGASUS_ASSERT(q != 0);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
                    }                    }
                    return true;  
                 }                 }
              }              }
              catch(...)      // else if "events" is zero (ie. select timed out) then we still need
       // to check if there are any pending SSL handshakes that have timed out.
       else
              {              {
              }          for (Uint32 indx = 0; indx < entries.size(); indx++)
              handled_events = true;          {
           }              if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                   entries[indx].type == MonitorEntry::TYPE_CONNECTION)
               {
                   MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                   PEGASUS_ASSERT(q != 0);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
        }        }
     }     }
   
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }     }
     return(handled_events);  
 } }
  
 void Monitor::stopListeningForConnections(Boolean wait) void Monitor::stopListeningForConnections(Boolean wait)
 { {
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
     // set boolean then tickle the server to recognize _stopConnections     // set boolean then tickle the server to recognize _stopConnections
     _stopConnections = 1;     _stopConnections = 1;
Line 1049 
Line 674 
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 } }
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     PEGASUS_SOCKET socket,      SocketHandle socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)      Uint32 type)
 { {
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
    AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
   
    // Check to see if we need to dynamically grow the _entries array    // Check to see if we need to dynamically grow the _entries array
    // We always want the _entries array to 2 bigger than the      // We always want the _entries array to be 2 bigger than the
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();  
    if((int)_solicitSocketCount >= (size-1)){      for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){      {
                 _MonitorEntry entry(0, 0, 0);          _entries.append(MonitorEntry());
                 _entries.append(entry);  
         }  
    }    }
  
    int index;      for (Uint32 index = 1; index < _entries.size(); index++)
    for(index = 1; index < (int)_entries.size(); index++)  
    {    {
       try       try
       {       {
          if(_entries[index]._status.get() == _MonitorEntry::EMPTY)              if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
             _entries[index]._type = type;                  _entries[index].type = type;
             _entries[index]._status = _MonitorEntry::IDLE;                  _entries[index].status = MonitorEntry::STATUS_IDLE;
  
             {                  PEG_METHOD_EXIT();
                 AutoMutex automut(Monitor::_cout_mut);                  return (int)index;
                 PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
             }  
             return index;  
          }          }
       }       }
       catch(...)       catch(...)
       {       {
       }       }
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful      // decrease the count, if we are here we didn't do anything meaningful
       _solicitSocketCount--;
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
    {  
        AutoMutex automut(Monitor::_cout_mut);  
        PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
    }  
    return -1;    return -1;
   
 } }
  
 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)  void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
  
     /*     /*
         Start at index = 1 because _entries[0] is the tickle entry which never needs          Start at index = 1 because _entries[0] is the tickle entry which
         to be EMPTY;          never needs to be reset to EMPTY;
     */     */
     unsigned int index;      for (Uint32 index = 1; index < _entries.size(); index++)
     for(index = 1; index < _entries.size(); index++)  
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;              _entries[index].reset();
           _entries[index].socket = PEGASUS_INVALID_SOCKET;  
           _solicitSocketCount--;  
           break;  
        }  
     }  
   
     /*  
         Dynamic Contraction:  
         To remove excess entries we will start from the end of the _entries array  
         and remove all entries with EMPTY status until we find the first NON EMPTY.  
         This prevents the positions, of the NON EMPTY entries, from being changed.  
     */  
     index = _entries.size() - 1;  
     while(_entries[index]._status.get() == _MonitorEntry::EMPTY){  
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)  
                 _entries.remove(index);  
         index--;  
     }  
     PEG_METHOD_EXIT();  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 }  
   
 // Note: this is no longer called with PEP 183.  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)  
 {  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);  
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",  
         dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);  
    try  
    {  
       dst->run(1);  
    }  
    catch (...)  
    {  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::_dispatch: exception received");  
    }  
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);  
   
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);  
   
    // Once the HTTPConnection thread has set the status value to either  
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
    // to the Monitor.  It is no longer permissible to access the connection  
    // or the entry in the _entries table.  
    if (dst->_connectionClosePending)  
    {  
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;  
    }  
    else  
    {  
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;  
    }  
    return 0;  
 }  
   
   
 //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes  
 int  Monitor::solicitPipeMessages(  
     NamedPipe namedPipe,  
     Uint32 events,  //not sure what has to change for this enum  
     Uint32 queueId,  
     int type)  
 {  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");  
    AutoMutex autoMut(_entry_mut);  
    // Check to see if we need to dynamically grow the _entries array  
    // We always want the _entries array to 2 bigger than the  
    // current connections requested  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);  
   
   
   
    _solicitSocketCount++;  // bump the count  
    int size = (int)_entries.size();  
    if((int)_solicitSocketCount >= (size-1)){  
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){  
                 _MonitorEntry entry(0, 0, 0);  
                 _entries.append(entry);  
         }  
    }  
   
    int index;  
    for(index = 1; index < (int)_entries.size(); index++)  
    {  
       try  
       {  
          if(_entries[index]._status.get() == _MonitorEntry::EMPTY)  
          {  
             _entries[index].socket = NULL;  
             _entries[index].namedPipe = namedPipe;  
             _entries[index].namedPipeConnection = true;  
             _entries[index].queueId  = queueId;  
             _entries[index]._type = type;  
             _entries[index]._status = _MonitorEntry::IDLE;  
   
             PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);  
   
             return index;  
          }  
       }  
       catch(...)  
       {  
       }  
   
    }  
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);  
   
    PEG_METHOD_EXIT();  
    return -1;  
   
 }  
   
 void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)  
 {  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");  
     AutoMutex autoMut(_entry_mut);  
   
     /*  
         Start at index = 1 because _entries[0] is the tickle entry which never needs  
         to be EMPTY;  
     */  
     unsigned int index;  
     for(index = 1; index < _entries.size(); index++)  
     {  
        if(_entries[index].namedPipe.getPipe() == namedPipe.getPipe())  
        {  
           _entries[index]._status = _MonitorEntry::EMPTY;  
           //_entries[index].namedPipe = PEGASUS_INVALID_SOCKET;  
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 1284 
Line 742 
  
     /*     /*
         Dynamic Contraction:         Dynamic Contraction:
         To remove excess entries we will start from the end of the _entries array          To remove excess entries we will start from the end of the _entries
         and remove all entries with EMPTY status until we find the first NON EMPTY.          array and remove all entries with EMPTY status until we find the
         This prevents the positions, of the NON EMPTY entries, from being changed.          first NON EMPTY.  This prevents the positions, of the NON EMPTY
           entries, from being changed.
     */     */
     index = _entries.size() - 1;      for (Uint32 index = _entries.size() - 1;
     while(_entries[index]._status.get() == _MonitorEntry::EMPTY){           (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
         if((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||               (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
             (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))           index--)
         {         {
             _entries.remove(index);             _entries.remove(index);
         }         }
         index--;  
     }  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }     }
 }  
   
   
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.103.10.20  
changed lines
  Added in v.1.139.8.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2