(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.72.2.1 and 1.103.10.27

version 1.72.2.1, 2004/07/26 18:09:08 version 1.103.10.27, 2006/10/18 04:24:42
Line 1 
Line 1 
 //%2003////////////////////////////////////////////////////////////////////////  //%2006////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.  // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.; // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
   // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; Symantec Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 23 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com  
 //              Amit K Arora (Bug#1153) amita@in.ibm.com  
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
  
 #include <cstring> #include <cstring>
 #include "Monitor.h"  #include <Pegasus/Common/Monitor.h>
 #include "MessageQueue.h"  #include <Pegasus/Common/MessageQueue.h>
 #include "Socket.h"  #include <Pegasus/Common/Socket.h>
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
   #include <Pegasus/Common/ArrayIterator.h>
   
   //const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes
   
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
   // Maximum iterations of Pipe processing in Monitor::run
   const Uint32 maxIterations = 3;
   
   #endif
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \ #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 of <winsock.h>. It may have been indirectly included (e.g., by including \ of <winsock.h>. It may have been indirectly included (e.g., by including \
 <windows.h>). Finthe inclusion of that header which is visible to this \  <windows.h>). Find inclusion of that header which is visible to this \
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \ compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 otherwise, less than 64 clients (the default) will be able to connect to the \ otherwise, less than 64 clients (the default) will be able to connect to the \
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM." CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
Line 66 
Line 77 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   static AtomicInt _connections(0);
  
 static AtomicInt _connections = 0;  Mutex Monitor::_cout_mut;
   
 static struct timeval create_time = {0, 1};  
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
  
 ////////////////////////////////////////////////////////////////////////////////  
 //  
 // MonitorRep  
 //  
 ////////////////////////////////////////////////////////////////////////////////  
  
 struct MonitorRep  // Added for NamedPipe implementation for windows
 {  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
     fd_set rd_fd_set;   #define PIPE_INCREMENT 1
     fd_set wr_fd_set;  #endif
     fd_set ex_fd_set;  
     fd_set active_rd_fd_set;  
     fd_set active_wr_fd_set;  
     fd_set active_ex_fd_set;  
 };  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 96 
Line 94 
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   
 Monitor::Monitor() Monitor::Monitor()
    : _module_handle(0), _controller(0), _async(false), _stopConnections(0), _solicitSocketCount(0)     : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0),
        _tickle_client_socket(-1),
        _tickle_server_socket(-1),
        _tickle_peer_socket(-1)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler     // setup the tickler
Line 115 
Line 124 
        _MonitorEntry entry(0, 0, 0);        _MonitorEntry entry(0, 0, 0);
        _entries.append(entry);        _entries.append(entry);
     }     }
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 Monitor::Monitor(Boolean async)  Monitor::~Monitor()
    : _module_handle(0), _controller(0), _async(async), _stopConnections(0), _solicitSocketCount(0)  
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     Socket::initializeInterface();      {
     _rep = 0;          AutoMutex automut(Monitor::_cout_mut);
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);          PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
     // setup the tickler  #endif
     initializeTickler();      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     // Start the count at 1 because initilizeTickler()      try{
     // has added an entry in the first position of the          if(_tickle_peer_socket >= 0)
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {     {
        _MonitorEntry entry(0, 0, 0);              Socket::close(_tickle_peer_socket);
        _entries.append(entry);  
     }     }
           if(_tickle_client_socket >= 0)
           {
               Socket::close(_tickle_client_socket);
 } }
           if(_tickle_server_socket >= 0)
 Monitor::~Monitor()  
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Socket::close(_tickle_server_socket);
                   "deregistering with module controller");          }
       }
     if(_module_handle != NULL)      catch(...)
     {     {
        _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
        _controller = 0;                    "Failed to close tickle sockets");
        delete _module_handle;  
     }     }
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");  
  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
     Socket::uninitializeInterface();     Socket::uninitializeInterface();
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "returning from monitor destructor");                   "returning from monitor destructor");
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 void Monitor::initializeTickler(){ void Monitor::initializeTickler(){
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
     /*     /*
        NOTE: On any errors trying to        NOTE: On any errors trying to
              setup out tickle connection,              setup out tickle connection,
Line 167 
Line 189 
     /* setup the tickle server/listener */     /* setup the tickle server/listener */
  
     // get a socket for the server side     // get a socket for the server side
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         //handle error         //handle error
         throw Exception("Monitor::initializeTickler(), create socket failed on tickle server.");          MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
                                    "Received error number $0 while creating the internal socket.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
  
     // initialize the address     // initialize the address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 184 
Line 210 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,(struct sockaddr *)&_tickle_server_addr, sizeof(_tickle_server_addr))) < 0){      if((::bind(_tickle_server_socket,
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                  sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), bind failed on tickle server socket.");  #ifdef PEGASUS_OS_ZOS
       MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
                                    "Received error:$0 while binding the internal socket.",strerror(errno));
   #else
           MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
                                    "Received error number $0 while binding the internal socket.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
   #endif
           throw Exception(parms);
     }     }
  
     // tell the kernel we are a server     // tell the kernel we are a server
     if((::listen(_tickle_server_socket,3)) < 0){     if((::listen(_tickle_server_socket,3)) < 0){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), listen failed on tickle server socket");          MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
                            "Received error number $0 while listening to the internal socket.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
  
     // make sure we have the correct socket for our server     // make sure we have the correct socket for our server
     int sock = ::getsockname(_tickle_server_socket,(struct sockaddr*)&_tickle_server_addr, &_addr_size);      int sock = ::getsockname(_tickle_server_socket,
                      reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                      &_addr_size);
     if(sock < 0){     if(sock < 0){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), getsockname failed on tickle server socket");          MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
                            "Received error number $0 while getting the internal socket name.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
  
     /* set up the tickle client/connector */     /* set up the tickle client/connector */
  
     // get a socket for our tickle client     // get a socket for our tickle client
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), create socket failed on tickle client.");          MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
                            "Received error number $0 while creating the internal client socket.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
  
     // setup the address of the client     // setup the address of the client
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 229 
Line 288 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_client_addr.sin_family = PF_INET;     _tickle_client_addr.sin_family = PF_INET;
     _tickle_client_addr.sin_port = 0;     _tickle_client_addr.sin_port = 0;
  
     // bind socket to client side     // bind socket to client side
     if((::bind(_tickle_client_socket,(struct sockaddr*)&_tickle_client_addr, sizeof(_tickle_client_addr))) < 0){      if((::bind(_tickle_client_socket,
                  reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                  sizeof(_tickle_client_addr))) < 0){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), bind failed on tickle client socket.");          MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
                            "Received error number $0 while binding the internal client socket.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
  
     // connect to server side     // connect to server side
     if((::connect(_tickle_client_socket,(struct sockaddr*)&_tickle_server_addr, sizeof(_tickle_server_addr))) < 0){      if((::connect(_tickle_client_socket,
                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                     sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), connect failed between tickle client and tickle server.");          MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
                            "Received error number $0 while connecting the internal client socket.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
  
     /* set up the slave connection */     /* set up the slave connection */
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
     pegasus_sleep(1);     pegasus_sleep(1);
  
     // this call may fail, we will try a max of 20 times to establish this peer connection     // this call may fail, we will try a max of 20 times to establish this peer connection
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,(struct sockaddr*)&_tickle_peer_addr, &peer_size)) < 0){      if((_tickle_peer_socket = ::accept(_tickle_server_socket,
               reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
               &peer_size)) < 0){
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
           // Only retry on non-windows platforms.
         if(_tickle_peer_socket == -1 && errno == EAGAIN)         if(_tickle_peer_socket == -1 && errno == EAGAIN)
         {         {
           int retries = 0;           int retries = 0;
           do           do
           {           {
             pegasus_sleep(1);             pegasus_sleep(1);
             _tickle_peer_socket = ::accept(_tickle_server_socket,(struct sockaddr*)&_tickle_peer_addr, &peer_size);              _tickle_peer_socket = ::accept(_tickle_server_socket,
                   reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                   &peer_size);
             retries++;             retries++;
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
         }         }
   #endif
     }     }
     if(_tickle_peer_socket == -1){     if(_tickle_peer_socket == -1){
         // handle error         // handle error
         throw Exception("Monitor::initializeTickler(), accept failed, peer socket connection not established.");          MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
                            "Received error number $0 while accepting the internal socket connection.",
   #if !defined(PEGASUS_OS_TYPE_WINDOWS)
                                    errno);
   #else
                                    WSAGetLastError());
   #endif
           throw Exception(parms);
     }     }
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
     // checks entries with IDLE state for events     // checks entries with IDLE state for events
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
           Tracer::trace(TRC_HTTP,Tracer::LEVEL2,"!!!!!!!! TICKLE SOCKET-ID = %u",_tickle_peer_socket);
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
     _entries.append(entry);     _entries.append(entry);
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 void Monitor::tickle(void) void Monitor::tickle(void)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
   static char _buffer[] =   static char _buffer[] =
     {     {
       '0','0'       '0','0'
     };     };
                  Tracer::trace (TRC_HTTP, Tracer::LEVEL2,
                                      "Now Monitor::Tickle ");
       AutoMutex autoMutex(_tickle_mutex);
   Socket::disableBlocking(_tickle_client_socket);   Socket::disableBlocking(_tickle_client_socket);
                          Tracer::trace (TRC_HTTP, Tracer::LEVEL2,
                                              "Now Monitor::Tickle::Write() ");
   
   Socket::write(_tickle_client_socket,&_buffer, 2);   Socket::write(_tickle_client_socket,&_buffer, 2);
   Socket::enableBlocking(_tickle_client_socket);   Socket::enableBlocking(_tickle_client_socket);
                          Tracer::trace (TRC_HTTP, Tracer::LEVEL2,
                                      "Now Monitor::Tickled ");
   
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
   }
   
   void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
   {
       // Set the state to requested state
       _entries[index]._status = status;
 } }
  
 Boolean Monitor::run(Uint32 milliseconds) Boolean Monitor::run(Uint32 milliseconds)
Line 291 
Line 414 
  
     Boolean handled_events = false;     Boolean handled_events = false;
      int i = 0;      int i = 0;
    // #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)  
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};  
 //#else  
   //  struct timeval tv = {0, 1};  
 //#endif  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
     _entry_mut.lock(pegasus_thread_self());  
       AutoMutex autoEntryMutex(_entry_mut);
   
       ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections == 1)      if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)          for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)                  if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||                                          if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.value() == _MonitorEntry::DYING )                          entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                         entries[indx]._status = _MonitorEntry::EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       _entries[indx]._status = _MonitorEntry::DYING;                        entries[indx]._status = _MonitorEntry::DYING;
                    }                    }
                }                }
            }            }
         }         }
         _stopConnections = 0;         _stopConnections = 0;
           _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
        if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&          const _MonitorEntry &entry = entries[indx];
                 (_entries[indx]._type == Monitor::CONNECTION))          if ((entry._status.get() == _MonitorEntry::DYING) &&
              (entry._type == Monitor::CONNECTION))
        {        {
           MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);  
               MessageQueue *q = MessageQueue::lookup(entry.queueId);
           PEGASUS_ASSERT(q != 0);           PEGASUS_ASSERT(q != 0);
           MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();              HTTPConnection &h = *static_cast<HTTPConnection *>(q);
           Message* message= new CloseConnectionMessage(_entries[indx].socket);  
               if (h._connectionClosePending == false)
                           {
                               continue;
                           }
   
   
               // NOTE: do not attempt to delete while there are pending responses
               // coming thru. The last response to come thru after a
               // _connectionClosePending will reset _responsePending to false
               // and then cause the monitor to rerun this code and clean up.
               // (see HTTPConnection.cpp)
   
               if (h._responsePending == true)
               {
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                   if  (!entry.namedPipeConnection)
                   {
   #endif
                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
                           "Ignoring connection delete request because "
                           "responses are still pending. "
                           "connection=0x%p, socket=%d\n",
                           (void *)&h, h.getSocket());
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                   }
                   else
                   {
                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
                           "Ignoring connection delete request because "
                           "responses are still pending. "
                           "connection=0x%p, NamedPipe=%d\n",
                           (void *)&h, h.getNamedPipe().getPipe());
                   }
   #endif
                   continue;
               }
               h._connectionClosePending = false;
               MessageQueue &o = h.get_owner();
                       Message* message = 0;
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
               if (!entry.namedPipeConnection)
               {
   #endif
                   message= new CloseConnectionMessage(entry.socket);
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
               }
               else
               {
   
                               message= new CloseConnectionMessage(entry.namedPipe);
   
               }
   #endif
           message->dest = o.getQueueId();           message->dest = o.getQueueId();
  
           // HTTPAcceptor is responsible for closing the connection.           // HTTPAcceptor is responsible for closing the connection.
Line 343 
Line 529 
           // Once HTTPAcceptor completes processing of the close           // Once HTTPAcceptor completes processing of the close
           // connection, the lock is re-requested and processing of           // connection, the lock is re-requested and processing of
           // the for loop continues.  This is safe with the current           // the for loop continues.  This is safe with the current
           // implementation of the _entries object.  Note that the              // implementation of the entries object.  Note that the
           // loop condition accesses the _entries.size() on each              // loop condition accesses the entries.size() on each
           // iteration, so that a change in size while the mutex is           // iteration, so that a change in size while the mutex is
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();              autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());              autoEntryMutex.lock();
               // After enqueue a message and the autoEntryMutex has been released and locked again,
               // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
   
               entries.reset(_entries);
        }        }
     }     }
  
Line 363 
Line 553 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     int maxSocketCurrentPass = 0;  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      PEGASUS_SOCKET maxSocketCurrentPass = 0;
       int indx;
   
           // Record the indexes at which Sockets are available
           Array <Uint32> socketCountAssociator;
       int socketEntryCount=0;
   
        // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
       //This array associates named pipe connections to their place in [indx]
       //in the entries array. The value in portion zero of the array is the
       //index of the fist named pipe connection in the entries array
   
           // Record the indexes at which Pipes are available
           Array <Uint32> indexPipeCountAssociator;
       int pipeEntryCount=0;
       int MaxPipes = PIPE_INCREMENT;
       // List of Pipe Handlers
       HANDLE * hPipeList = new HANDLE[PIPE_INCREMENT];
   #endif
   
       // This loop takes care of setting the namedpipe which has to be used from the list....
       for ( indx = 0,socketEntryCount=0 ;
                                indx < (int)entries.size(); indx++)
     {     {
        if(maxSocketCurrentPass < _entries[indx].socket)  
           maxSocketCurrentPass = _entries[indx].socket;  
  
        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)  // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                   if (!entries[indx].namedPipeConnection)
           {
   #endif
               if (maxSocketCurrentPass < entries[indx].socket)
                           {
                                   maxSocketCurrentPass = entries[indx].socket;
                           }
               if(entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
           _idleEntries++;           _idleEntries++;
           FD_SET(_entries[indx].socket, &fdread);                  FD_SET(entries[indx].socket, &fdread);
                   socketCountAssociator.append(indx);
                                   socketEntryCount++;
               }
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
           }
                   else
                   {
                       entries[indx].pipeSet = false;
                           if (pipeEntryCount >= MaxPipes)
                           {
                               MaxPipes += PIPE_INCREMENT;
                                   HANDLE* temp_pList = new HANDLE[MaxPipes];
                                   for (Uint32 i =0;i<pipeEntryCount;i++)
                                   {
                                       temp_pList[i] = hPipeList[i];
                                   }
                                   delete [] hPipeList;
                                   hPipeList = temp_pList;
        }        }
                           hPipeList[pipeEntryCount] = entries[indx].namedPipe.getPipe();
                           indexPipeCountAssociator.append(indx);
                           pipeEntryCount++;
                   }
   
   #endif
     }     }
  
     /*     /*
Line 382 
Line 628 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     // Fixed in monitor_2 but added because Monitor is still the default monitor.      autoEntryMutex.unlock();
     // When _idleEntries is 0 don't immediately return, otherwise this loops out of control  
     // kicking off kill idle thread threads.  E.g. There is nothing to select on when the cimserver      int events = -1;
     // is shutting down.          // Since the pipes have been introduced, the ratio of procesing
     /*if( _idleEntries == 0 )          // time Socket:Pipe :: 3/4:1/4 respectively
     {  
         Thread::sleep( milliseconds );          Uint32 newMilliseconds = milliseconds;
         _entry_mut.unlock();          #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
         return false;  
     }*/          newMilliseconds = (milliseconds * 3)/4 ;
   
     _entry_mut.unlock();      #endif
     //int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);  
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);          struct timeval tv = {newMilliseconds/1000, newMilliseconds%1000*1000};
    _entry_mut.lock(pegasus_thread_self());  
   
           #ifdef PEGASUS_OS_TYPE_WINDOWS
                   events = select(0, &fdread, NULL, NULL, &tv);
           #else
                   events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
           #endif
   
       autoEntryMutex.lock();
       // After enqueue a message and the autoEntryMutex has been released and locked again,
       // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
       entries.reset(_entries);
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
Line 408 
Line 665 
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the _entries structure has been corrupted or that         // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
Line 418 
Line 675 
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)_entries.size(); indx++)           for ( int sindx = 0; sindx < socketEntryCount; sindx++)
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&                       indx = socketCountAssociator[sindx];
              (FD_ISSET(_entries[indx].socket, &fdread)))  
                if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                     (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);                   MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, _entries[indx].queueId, q);                                 indx, entries[indx].queueId, q);
   
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(_entries[indx]._type == Monitor::CONNECTION)                       if (entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                      "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);  
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
                    _entries[indx]._status = _MonitorEntry::BUSY;  
                                                    // Do not update the entry just yet. The entry gets updated once
                                                    // the request has been read.
                                                    //entries[indx]._status = _MonitorEntry::BUSY;
   
                    // If allocate_and_awaken failure, retry on next iteration                    // If allocate_and_awaken failure, retry on next iteration
 /*  
                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(  
                            (void *)q, _dispatch))  
                    {  
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                           "Monitor::run: Insufficient resources to process request.");  
                       _entries[indx]._status = _MonitorEntry::IDLE;  
                       _entry_mut.unlock();  
                       return true;  
                    }  
 */  
 // begin hack  
                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
Line 466 
Line 716 
                         "Monitor::_dispatch: exception received");                         "Monitor::_dispatch: exception received");
                    }                    }
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                              "Monitor::_dispatch: exited run() for index %d",
                                                       dst->_entry_index);
                    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);  
  
    // Once the HTTPConnection thread has set the status value to either  
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
    // to the Monitor.  It is no longer permissible to access the connection  
    // or the entry in the _entries table.  
                 if (dst->_connectionClosePending)  
                 {  
                         dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;  
                 }                 }
                 else                                          else if (entries[indx]._type == Monitor::INTERNAL)
                 {                 {
                         dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;  
                 }  
   
 // end hack  
                 }  
                 else if( _entries[indx]._type == Monitor::INTERNAL){  
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
   
                         _entries[indx]._status == _MonitorEntry::BUSY;  
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_tickle_server_socket);                                          Socket::disableBlocking(entries[indx].socket);
                         Socket::read(_tickle_server_socket,&buffer, 2);  
                         Socket::enableBlocking(_tickle_server_socket);                                          Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                         _entries[indx]._status == _MonitorEntry::IDLE;                                                  Socket::enableBlocking(entries[indx].socket);
                                                   entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
   
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                                                  Message *msg = new SocketMessage(entries[indx].socket, events);
                    _entries[indx]._status = _MonitorEntry::BUSY;                                                  entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                                                  autoEntryMutex.unlock();
   
                    q->enqueue(msg);                    q->enqueue(msg);
                    _entries[indx]._status = _MonitorEntry::IDLE;                                                  autoEntryMutex.lock();
                    return true;                                                  // After enqueue a message and the autoEntryMutex has been released and locked again,
                                                   // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
                                                   entries.reset(_entries);
                                                   entries[indx]._status = _MonitorEntry::IDLE;
                                                   handled_events = true;
                                                   delete [] hPipeList;
                                                   return handled_events;
   
                 }                 }
              }              }
              catch(...)              catch(...)
Line 518 
Line 760 
              handled_events = true;              handled_events = true;
           }           }
        }        }
     }                  delete [] hPipeList;
     _entry_mut.unlock();                  return handled_events;
     return(handled_events);  
 } }
  
 void Monitor::stopListeningForConnections()  
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
   
           //if no pipes are registered return immediately
   
           int pEvents = -1;
           int pCount = -1;
           BOOL bPeekPipe = 0;
           DWORD dwBytesAvail=0;
           // The pipe is sniffed and check if there are any data. If available, the
           // message is picked from the Queue and appropriate methods are invoked.
   
   
           // pipeProcessCount records the number of requests that are processed.
           // At the end of loop this is verified against the count of request
           // on local connection . If there are any pipes which needs to be
           // processed we would apply delay and then proceed to iterate.
   
       Uint32 pipeProcessCount =0;
   
       for (int counter = 1; counter < maxIterations ; counter ++)
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");  
     // set boolean then tickle the server to recognize _stopConnections  
     _stopConnections = 1;  
     tickle();  
     PEG_METHOD_EXIT();  
 }  
  
  
 int  Monitor::solicitSocketMessages(                  // pipeIndex is used to index into indexPipeCountAssociator to fetch
     Sint32 socket,                  // index of the _MonitorEntry of Monitor
     Uint32 events,          for (int pipeIndex = 0; pipeIndex < pipeEntryCount; pipeIndex++)
     Uint32 queueId,  
     int type)  
 { {
      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");              dwBytesAvail = 0;
                       bPeekPipe = ::PeekNamedPipe(hPipeList[pipeIndex],
                                                       NULL,
                                                                       NULL,
                                                                           NULL,
                                           &dwBytesAvail,
                                                                           NULL
                                                                          );
  
    _entry_mut.lock(pegasus_thread_self());                          // If peek on NamedPipe was successfull and data is available
    // Check to see if we need to dynamically grow the _entries array              if (bPeekPipe && dwBytesAvail)
    // We always want the _entries array to 2 bigger than the                  {
    // current connections requested  
    _solicitSocketCount++;  // bump the count  
    int size = (int)_entries.size();  
    if(_solicitSocketCount >= (size-1)){  
         for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){  
                 _MonitorEntry entry(0, 0, 0);  
                 _entries.append(entry);  
         }  
    }  
  
    int index;                              Tracer::trace(TRC_HTTP,Tracer::LEVEL4," PIPE_PEEKING FOUND = %u BYTES", dwBytesAvail);
    for(index = 1; index < (int)_entries.size(); index++)  
                               pEvents = 1;
                       entries[indexPipeCountAssociator[pipeIndex]].pipeSet = true;
                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                       "Monitor::run select event received events = %d, \
                                           monitoring %d idle entries",
                       pEvents,
                                           _idleEntries);
   
                                   int pIndx = indexPipeCountAssociator[pipeIndex];
   
                                   if ((entries[pIndx]._status.get() == _MonitorEntry::IDLE) &&
                                            entries[pIndx].namedPipe.isConnected() &&
                                            (pEvents))
    {    {
   
                                   MessageQueue *q = 0;
   
       try       try
       {       {
          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)  
          {  
             _entries[index].socket = socket;  
             _entries[index].queueId  = queueId;  
             _entries[index]._type = type;  
             _entries[index]._status = _MonitorEntry::IDLE;  
             _entry_mut.unlock();  
  
             return index;                                          q = MessageQueue::lookup (entries[pIndx].queueId);
          }          }
                       catch (Exception e)
                       {
                                           e.getMessage();
       }       }
       catch(...)       catch(...)
       {       {
       }       }
  
    }                                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful                                    "Monitor::run indx = %d, queueId =  %d,\
    _entry_mut.unlock();                                                                    q = %p",pIndx, entries[pIndx].queueId, q);
    PEG_METHOD_EXIT();                      try
    return -1;                      {
                                           if (entries[pIndx]._type == Monitor::CONNECTION)
                           {
  
 }                                                      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                         "entries[indx].type for indx = \
                                                                                 %d is Monitor::CONNECTION",
                                                                                     pIndx);
                                                       static_cast<HTTPConnection *>(q)->_entry_index = pIndx;
                                                   HTTPConnection *dst = reinterpret_cast \
                                                                                              <HTTPConnection *>(q);
                                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                         "Monitor::_dispatch: entering run() \
                                                                                     for indx  = %d, queueId = %d, \
                                                                                     q = %p",\
                                                                 dst->_entry_index,
                                                                                     dst->_monitor->_entries\
                                                                                     [dst->_entry_index].queueId, dst);
  
 void Monitor::unsolicitSocketMessages(Sint32 socket)                                                  try
 { {
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");                                                          dst->run(1);
     _entry_mut.lock(pegasus_thread_self());  
  
     /*                                                          // Record that the requested data is read/Written
         Start at index = 1 because _entries[0] is the tickle entry which never needs                                                          pipeProcessCount++;
         to be EMPTY;  
     */  
     int index;  
     for(index = 1; index < _entries.size(); index++)  
     {  
        if(_entries[index].socket == socket)  
        {  
           _entries[index]._status = _MonitorEntry::EMPTY;  
           _entries[index].socket = -1;  
           _solicitSocketCount--;  
           break;  
        }  
     }  
  
     /*  
         Dynamic Contraction:  
         To remove excess entries we will start from the end of the _entries array  
         and remove all entries with EMPTY status until we find the first NON EMPTY.  
         This prevents the positions, of the NON EMPTY entries, from being changed.  
     */  
     index = _entries.size() - 1;  
     while(_entries[index]._status == _MonitorEntry::EMPTY){  
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)  
                 _entries.remove(index);  
         index--;  
     }  
   
     _entry_mut.unlock();  
     PEG_METHOD_EXIT();  
 }  
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)  
 {  
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);  
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",  
         dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);  
    try  
    {  
       dst->run(1);  
    }    }
    catch (...)    catch (...)
    {    {
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exception received");                                                                                "Monitor::_dispatch: \
                                                                                              exception received");
    }    }
   
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                                                                "Monitor::_dispatch: exited \
                                                                          \run() index %d",
                                                                                      dst->_entry_index);
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);  
  
    // Once the HTTPConnection thread has set the status value to either  
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
    // to the Monitor.  It is no longer permissible to access the connection  
    // or the entry in the _entries table.  
    if (dst->_connectionClosePending)  
    {  
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;  
    }    }
    else    else
    {    {
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;                                                  /* The condition
    }                                                             entries[indx]._type == Monitor::INTERNAL can be
    return 0;                                                             ignored for pipes as the tickler is of
 }                                                             Monitor::INTERNAL type. The tickler is
                                                              a socket.
                                                   */
   
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
 ////************************* monitor 2 *****************************////  
   
   
   
   
   
 m2e_rep::m2e_rep(void)  
   :Base(), state(IDLE)  
   
 {  
 }  
   
 m2e_rep::m2e_rep(monitor_2_entry_type _type,  
                  pegasus_socket _sock,  
                  void* _accept,  
                  void* _dispatch)  
   : Base(), type(_type), state(IDLE), psock(_sock),  
     accept_parm(_accept), dispatch_parm(_dispatch)  
 {  
   
 }  
   
 m2e_rep::~m2e_rep(void)  
 {  
 }  
   
 m2e_rep::m2e_rep(const m2e_rep& r)  
   : Base()  
 {  
   if(this != &r){  
     type = r.type;  
     psock = r.psock;  
     accept_parm = r.accept_parm;  
     dispatch_parm = r.dispatch_parm;  
     state = IDLE;  
  
   }                                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 }                                                                                    "Non-connection entry, indx = %d,\
                                                                                             has been received.", pIndx);
                                                   int events = 0;
                                                   Message *msg = 0;
  
                                                       pEvents |= NamedPipeMessage::READ;
                                                       msg = new NamedPipeMessage(entries[pIndx].namedPipe, pEvents);
                                       entries[pIndx]._status = _MonitorEntry::BUSY;
                                       autoEntryMutex.unlock();
                                               q->enqueue(msg);
                                                   autoEntryMutex.lock();
                                       entries.reset(_entries);
                                       entries[pIndx]._status = _MonitorEntry::IDLE;
                                                           delete [] hPipeList;
                                   return(handled_events);
  
 m2e_rep& m2e_rep::operator =(const m2e_rep& r)  
 {  
   if(this != &r) {  
     type = r.type;  
     psock = r.psock;  
     accept_parm = r.accept_parm;  
     dispatch_parm = r.dispatch_parm;  
     state = IDLE;  
   }  
   return *this;  
 } }
  
 Boolean m2e_rep::operator ==(const m2e_rep& r)  
 {  
   if(this == &r)  
     return true;  
   return false;  
 }  
  
 Boolean m2e_rep::operator ==(void* r)  
 {  
   if((void*)this == r)  
     return true;  
   return false;  
 } }
                                   catch(...)
 m2e_rep::operator pegasus_socket() const  
 { {
   return psock;  
 }  
  
   
 monitor_2_entry::monitor_2_entry(void)  
 {  
   _rep = new m2e_rep();  
 } }
   
 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,  
                                  monitor_2_entry_type _type,  
                                  void* _accept_parm, void* _dispatch_parm)  
 {  
   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);  
 } }
  
 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)  
 {  
   if(this != &e){  
     Inc(this->_rep = e._rep);  
   }   }
 } }
  
 monitor_2_entry::~monitor_2_entry(void)                  //Check if all the pipes had recieved the data, If no then try again
           if (pipeEntryCount == pipeProcessCount)
 { {
                       break;
   Dec(_rep);  
 } }
  
 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)  
 {  
   if(this != &e){  
     Dec(_rep);  
     Inc(this->_rep = e._rep);  
   }  
   return *this;  
 }  
  
 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const  
 {  
   if(this == &me)  
     return true;  
   return false;  
 } }
  
 Boolean monitor_2_entry::operator ==(void* k) const          delete [] hPipeList;
 {  
   if((void *)this == k)  
     return true;  
   return false;  
 }  
  
   #endif
  
 monitor_2_entry_type monitor_2_entry::get_type(void) const      return(handled_events);
 {  
   return _rep->type;  
 } }
  
 void monitor_2_entry::set_type(monitor_2_entry_type t)  void Monitor::stopListeningForConnections(Boolean wait)
 { {
   _rep->type = t;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
 }  
   
   
 monitor_2_entry_state  monitor_2_entry::get_state(void) const  
 { {
   return (monitor_2_entry_state) _rep->state.value();          AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 } }
   #endif
       PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
       // set boolean then tickle the server to recognize _stopConnections
       _stopConnections = 1;
       tickle();
  
 void monitor_2_entry::set_state(monitor_2_entry_state t)      if (wait)
 { {
   _rep->state = t;        // Wait for the monitor to notice _stopConnections.  Otherwise the
         // caller of this function may unbind the ports while the monitor
         // is still accepting connections on them.
         _stopConnectionsSem.wait();
 } }
  
 void* monitor_2_entry::get_accept(void) const      PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
 { {
   return _rep->accept_parm;          AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 } }
   #endif
 void monitor_2_entry::set_accept(void* a)  
 {  
   _rep->accept_parm = a;  
 } }
  
  
 void* monitor_2_entry::get_dispatch(void) const  int  Monitor::solicitSocketMessages(
 {      PEGASUS_SOCKET socket,
   return _rep->dispatch_parm;      Uint32 events,
 }      Uint32 queueId,
       int type)
 void monitor_2_entry::set_dispatch(void* a)  
 {  
   _rep->dispatch_parm = a;  
 }  
   
 pegasus_socket monitor_2_entry::get_sock(void) const  
 { {
   return _rep->psock;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
 }  
   
   
 void monitor_2_entry::set_sock(pegasus_socket& s)  
 { {
   _rep->psock = s;          AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 } }
   
 //static monitor_2* _m2_instance;  
   
 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);  
   
 monitor_2::monitor_2(void)  
   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),  
     _ready(true, 0), _die(0), _requestCount(0)  
 {  
   try {  
   
     bsd_socket_factory _factory;  
   
     // set up the listener/acceptor  
     pegasus_socket temp = pegasus_socket(&_factory);  
   
     temp.socket(PF_INET, SOCK_STREAM, 0);  
     // initialize the address  
     memset(&_tickle_addr, 0, sizeof(_tickle_addr));  
 #ifdef PEGASUS_OS_ZOS  
     _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
 #endif  
     _tickle_addr.sin_family = PF_INET;  
     _tickle_addr.sin_port = 0;  
   
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);  
   
     temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));  
     temp.listen(3);  
     temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);  
   
     // set up the connector  
   
     pegasus_socket tickler = pegasus_socket(&_factory);  
     tickler.socket(PF_INET, SOCK_STREAM, 0);  
     struct sockaddr_in _addr;  
     memset(&_addr, 0, sizeof(_addr));  
 #ifdef PEGASUS_OS_ZOS  
     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #endif #endif
     _addr.sin_family = PF_INET;     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
     _addr.sin_port = 0;     AutoMutex autoMut(_entry_mut);
     tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));     // Check to see if we need to dynamically grow the _entries array
     tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));     // We always want the _entries array to 2 bigger than the
      // current connections requested
     _tickler.set_sock(tickler);     _solicitSocketCount++;  // bump the count
     _tickler.set_type(INTERNAL);     int size = (int)_entries.size();
     _tickler.set_state(BUSY);     if((int)_solicitSocketCount >= (size-1)){
           for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
     struct sockaddr_in peer;                  _MonitorEntry entry(0, 0, 0);
     memset(&peer, 0, sizeof(peer));                  _entries.append(entry);
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);  
   
     pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);  
   
     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);  
   
 // No need to set _tickle's state as BUSY, since monitor_2::run() now  
 // does a select only on sockets which are in IDLE (default) state.  
 //  _tickle->set_state(BUSY);  
   
     _listeners.insert_first(_tickle);  
   
   }   }
   catch(...){  }  
 } }
  
 monitor_2::~monitor_2(void)     int index;
      for(index = 1; index < (int)_entries.size(); index++)
 { {
   
    stop();  
   
   try {  
     monitor_2_entry* temp = _listeners.remove_first();  
     while(temp){  
       delete temp;  
       temp = _listeners.remove_first();  
     }  
   }  
   
   catch(...){  }  
   
   
   try   try
   {   {
      HTTPConnection2* temp = _connections.remove_first();           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
      while(temp)  
      {  
         delete temp;  
         temp = _connections.remove_first();  
      }  
   }  
   catch(...)  
   {  
   }  
   
   
 }  
   
   
 void monitor_2::run(void)  
 {  
   monitor_2_entry* temp;  
   int _nonIdle=0, _idleCount=0, events;  
   
   while(_die.value() == 0) {  
     _nonIdle=_idleCount=0;  
   
      struct timeval tv_idle = { 60, 0 };  
   
     // place all sockets in the select set  
     FD_ZERO(&rd_fd_set);  
     try {  
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "monitor_2::run:Creating New FD list for SELECT.");  
       while(temp != 0 ){  
         if(temp->get_state() == CLOSED ) {  
           monitor_2_entry* closed = temp;  
       temp = _listeners.next(closed);  
           _listeners.remove_no_lock(closed);  
   
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());  
   
           HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));  
           delete cn;  
           delete closed;  
         }  
         if(temp == 0)  
            break;  
   
   
         //Count the number if IDLE sockets  
         if(temp->get_state() != IDLE ) _nonIdle++;  
          else _idleCount++;  
   
         Sint32 fd = (Sint32) temp->get_sock();  
   
         //Select should be called ONLY on the FDs which are in IDLE state  
         if((fd >= 0) && (temp->get_state() == IDLE))  
         {         {
           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              _entries[index].socket = socket;
            "monitor_2::run:Adding FD %d to the list for SELECT.",fd);              _entries[index].queueId  = queueId;
           FD_SET(fd , &rd_fd_set);              _entries[index]._type = type;
         }              _entries[index]._status = _MonitorEntry::IDLE;
            temp = _listeners.next(temp);  
       }  
       _listeners.unlock();  
     }  
     catch(...){  
       return;  
     }  
   
     // important -  the dispatch routine has pointers to all the  
     // entries that are readable. These entries can be changed but  
     // the pointer must not be tampered with.  
     if(_connections.count() )  
        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);  
     else  
        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);  
  
     if(_die.value())  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
        break;                  AutoMutex automut(Monitor::_cout_mut);
                   PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if(events == SOCKET_ERROR)  
 #else  
     if(events == -1)  
 #endif #endif
     {              return index;
        Tracer::trace(TRC_HTTP, Tracer::LEVEL2,  
           "monitor_2:run:INVALID FD. errorno = %d on select.", errno);  
        // The EBADF error indicates that one or more or the file  
        // descriptions was not valid. This could indicate that  
        // the _entries structure has been corrupted or that  
        // we have a synchronization error.  
   
      // Keeping the line below commented for time being.  
      //  PEGASUS_ASSERT(errno != EBADF);  
     }     }
     else if (events)  
     {  
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);  
   
   
     try {  
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       while(temp != 0 ){  
           Sint32 fd = (Sint32) temp->get_sock();  
           if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {  
           if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);  
           FD_CLR(fd,  &rd_fd_set);  
           monitor_2_entry* ready = new monitor_2_entry(*temp);  
           try  
           {  
              _ready.insert_first(ready);  
           }           }
           catch(...)           catch(...)
           {           {
           }           }
   
           _requestCount++;  
         }  
         temp = _listeners.next(temp);  
       }  
       _listeners.unlock();  
     }     }
     catch(...){     _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
       return;     PEG_METHOD_EXIT();
     }  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     // now handle the sockets that are ready to read  
     if(_ready.count())  
        _dispatch();  
     else  
     {     {
        if(_connections.count() == 0 )         AutoMutex automut(Monitor::_cout_mut);
           _idle_dispatch(_idle_parm);         PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
    }  // if events  #endif
   } // while alive     return -1;
   _die=0;  
  
 } }
  
 int  monitor_2::solicitSocketMessages(  void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
     Sint32 socket,  
     Uint32 events,  
     Uint32 queueId,  
     int type)  
 {  
   
    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");  
   
    _entry_mut.lock(pegasus_thread_self());  
   
    for(int index = 0; index < (int)_entries.size(); index++)  
    {  
       try  
       {  
          if(_entries[index]._status.value() == monitor_2_entry::EMPTY)  
          {          {
             _entries[index].socket = socket;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
             //_entries[index].queueId  = queueId;  
             //_entries[index]._type = type;  
             _entries[index]._status = IDLE;  
             _entry_mut.unlock();  
   
             return index;  
          }  
       }  
       catch(...)  
       {       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }       }
   #endif
  
    }      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
    _entry_mut.unlock();      AutoMutex autoMut(_entry_mut);
    PEG_METHOD_EXIT();  
    return -1;  
 }  
   
   
 void monitor_2::unsolicitSocketMessages(Sint32 socket)  
 {  
   
     PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");  
     _entry2_mut.lock(pegasus_thread_self());  
  
     for(int index = 0; index < (int)_entries2.size(); index++)      /*
           Start at index = 1 because _entries[0] is the tickle entry which never needs
           to be EMPTY;
       */
       unsigned int index;
       for(index = 1; index < _entries.size(); index++)
     {     {
        if(_entries2[index].socket == socket)         if(_entries[index].socket == socket)
        {        {
           _entries2[index]._status = monitor_2_entry::EMPTY;            _entries[index]._status = _MonitorEntry::EMPTY;
           _entries2[index].socket = -1;            _entries[index].socket = PEGASUS_INVALID_SOCKET;
             _solicitSocketCount--;
           break;           break;
        }        }
     }     }
     _entry2_mut.unlock();  
     PEG_METHOD_EXIT();  
 }  
  
 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))      /*
 {          Dynamic Contraction:
   void* old = (void *)_session_dispatch;          To remove excess entries we will start from the end of the _entries array
   _session_dispatch = dp;          and remove all entries with EMPTY status until we find the first NON EMPTY.
   return old;          This prevents the positions, of the NON EMPTY entries, from being changed.
       */
       index = _entries.size() - 1;
       while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
           if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                   _entries.remove(index);
           index--;
 } }
       PEG_METHOD_EXIT();
 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
 { {
   void* old = (void*)_accept_dispatch;          AutoMutex automut(Monitor::_cout_mut);
   _accept_dispatch = dp;          PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
   return old;  
 } }
   #endif
 void* monitor_2::set_idle_dispatch(void (*dp)(void*))  
 {  
    void* old = (void*)_idle_dispatch;  
    _idle_dispatch = dp;  
    return old;  
 } }
  
 void* monitor_2::set_idle_parm(void* parm)  // Note: this is no longer called with PEP 183.
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 { {
    void* old = _idle_parm;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
    _idle_parm = parm;  
    return old;  
 }  
   
   
   
 //-----------------------------------------------------------------  
 // Note on deleting the monitor_2_entry nodes:  
 //  Each case: in the switch statement needs to handle the deletion  
 //  of the monitor_2_entry * node differently. A SESSION dispatch  
 //  routine MUST DELETE the entry during its dispatch handling.  
 //  All other dispatch routines MUST NOT delete the entry during the  
 //  dispatch handling, but must allow monitor_2::_dispatch to delete  
 //   the entry.  
 //  
 //  The reason is pretty obscure and it is debatable whether or not  
 //  to even bother, but during cimserver shutdown the single monitor_2_entry*  
 //  will leak unless the _session_dispatch routine takes care of deleting it.  
 //  
 //  The reason is that a shutdown messages completely stops everything and  
 //  the _session_dispatch routine never returns. So monitor_2::_dispatch is  
 //  never able to do its own cleanup.  
 //  
 // << Mon Oct 13 09:33:33 2003 mdd >>  
 //-----------------------------------------------------------------  
   
 void monitor_2::_dispatch(void)  
 { {
    monitor_2_entry* entry;          AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
      HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
           dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
    try    try
    {    {
         dst->run(1);
          entry = _ready.remove_first();  
    }    }
    catch(...)    catch(...)
    {    {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
             "Monitor::_dispatch: exception received");
    }    }
      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
             "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
  
   while(entry != 0 ) {     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
     switch(entry->get_type()) {  
     case INTERNAL:  
       static char buffer[2];  
       entry->get_sock().disableBlocking();  
       entry->get_sock().read(&buffer, 2);  
       entry->get_sock().enableBlocking();  
       entry->set_state(IDLE);   // Set state of the socket to IDLE so that  
                                 // monitor_2::run can add to the list of FDs  
                                 // on which select would be called.  
   
   
   
       delete entry;  
  
       break;     // Once the HTTPConnection thread has set the status value to either
     case LISTEN:     // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
       {     // to the Monitor.  It is no longer permissible to access the connection
         static struct sockaddr peer;     // or the entry in the _entries table.
         static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);     if (dst->_connectionClosePending)
         entry->get_sock().disableBlocking();  
         pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);  
         entry->set_state(IDLE);  // Set state of the LISTEN socket to IDLE  
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if((Sint32)connected  == SOCKET_ERROR)  
 #else  
         if((Sint32)connected == -1 )  
 #endif  
         {         {
            delete entry;        dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
            break;  
         }  
   
         entry->get_sock().enableBlocking();  
         monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());  
         if(temp && _accept_dispatch != 0)  
            _accept_dispatch(temp);  
         delete entry;  
   
       }       }
       break;     else
     case SESSION:  
     case CLIENTSESSION:  
        if(_session_dispatch != 0 )  
        {  
           // NOTE: _session_dispatch will delete entry - do not do it here  
              unsigned client=0;  
          if(entry->get_type() == CLIENTSESSION) client = 1;  
          Sint32 sock=(Sint32)(entry->get_sock());  
   
              _session_dispatch(entry);  
   
          if(client)  
          {          {
            HTTPConnection2 *cn = monitor_2::remove_connection(sock);        dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
                if(cn) delete cn;  
            // stop();  
            _die=1;  
          }  
        }  
   
       else {  
         static char buffer[4096];  
         int bytes = entry->get_sock().read(&buffer, 4096);  
         delete entry;  
       }       }
      return 0;
       break;  
     case UNTYPED:  
     default:  
            delete entry;  
       break;  
     }     }
     _requestCount--;  
  
     if(_ready.count() == 0 )  // Added for NamedPipe implementation for windows
        break;  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
   //This method is anlogus to solicitSocketMessages. It does the same thing for named Pipes
     try  int  Monitor::solicitPipeMessages(
     {      NamedPipe namedPipe,
        entry = _ready.remove_first();      Uint32 events,  //not sure what has to change for this enum
     }      Uint32 queueId,
     catch(...)      int type)
     {     {
     }     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
  
   }     AutoMutex autoMut(_entry_mut);
 }     // Check to see if we need to dynamically grow the _entries array
      // We always want the _entries array to 2 bigger than the
 void monitor_2::stop(void)     // current connections requested
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
 { {
   _die = 1;     AutoMutex automut(Monitor::_cout_mut);
   tickle();     PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
   // shut down the listener list, free the list nodes  
   _tickler.get_sock().close();  
   _listeners.shutdown_queue();  
 } }
   #endif
  
 void monitor_2::tickle(void)     _solicitSocketCount++;  // bump the count
 {     int size = (int)_entries.size();
   static char _buffer[] =     if((int)_solicitSocketCount >= (size-1)){
     {          for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
       '0','0'                  _MonitorEntry entry(0, 0, 0);
     };                  _entries.append(entry);
           }
   _tickler.get_sock().disableBlocking();  
   
   _tickler.get_sock().write(&_buffer, 2);  
   _tickler.get_sock().enableBlocking();  
   
 } }
  
      int index;
 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps,     for(index = 1; index < (int)_entries.size(); index++)
                                        monitor_2_entry_type type,  
                                        void* accept_parm,  
                                        void* dispatch_parm)  
 { {
   Sint32 fd1,fd2;        try
   
   fd2=(Sint32) ps;  
   
   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);  
   
 // The purpose of the following piece of code is to avoid duplicate entries in  
 // the _listeners list. Would it be too much of an overhead ?  
 try {  
   
      monitor_2_entry* temp;  
   
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       while(temp != 0 )  
       {       {
         fd1=(Sint32) temp->get_sock();           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
   
         if(fd1 == fd2)  
         {         {
               _entries[index].socket = NULL;
            Tracer::trace(TRC_HTTP, Tracer::LEVEL3,              _entries[index].namedPipe = namedPipe;
           "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);              _entries[index].namedPipeConnection = true;
             if(temp->get_state() == CLOSED)              _entries[index].queueId  = queueId;
               _entries[index]._type = type;
               _entries[index]._status = _MonitorEntry::IDLE;
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
             {             {
               temp->set_state(IDLE);              AutoMutex automut(Monitor::_cout_mut);
               Tracer::trace(TRC_HTTP, Tracer::LEVEL3,              PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);
               "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);  
              }              }
              _listeners.unlock();  #endif
             delete m2e;  
             return 0;              return index;
         }  
        temp = _listeners.next(temp);  
       }       }
    }    }
    catch(...)    catch(...)
    {    {
       delete m2e;  
       return 0;  
    }    }
  
      }
      _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
      PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
  
   _listeners.unlock();     PEG_METHOD_EXIT();
      return -1;
  
   try{  
     _listeners.insert_first(m2e);  
   }  
   catch(...){  
     delete m2e;  
     return 0;  
   }  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
       "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);  
   tickle();  
   return m2e;  
 } }
  
 Boolean monitor_2::remove_entry(Sint32 s)  //////////////////////////////////////////////////////////////////////////////
 {  // Method Name      : unsolicitPipeMessages
   monitor_2_entry* temp;  // Input Parameter  : namedPipe  - type NamedPipe
   try {  // Return Type      : void
     _listeners.try_lock(pegasus_thread_self());  //============================================================================
     temp = _listeners.next(0);  // This method is invoked from HTTPAcceptor::handleEnqueue for server
     while(temp != 0){  // when the CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked
       if(s == (Sint32)temp->_rep->psock ){  // from HTTPAcceptor::destroyConnections method when the CIMServer is shutdown.
         temp = _listeners.remove_no_lock(temp);  // For the CIMClient, this is invoked from HTTPConnector::handleEnqueue when the
         delete temp;  // CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked from
         _listeners.unlock();  // HTTPConnector::disconnect when CIMClient requests a disconnect request.
         return true;  // The list of _MonitorEntry is searched for the matching pipe.
       }  // The Handle of the identified is closed and _MonitorEntry for the
       temp = _listeners.next(temp);  // requested pipe is removed.
     }  ///////////////////////////////////////////////////////////////////////////////
     _listeners.unlock();  
   }  
   catch(...){  
   }  
   return false;  
 }  
  
 Uint32 monitor_2::getOutstandingRequestCount(void)  void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)
 { {
   return _requestCount.value();  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 } }
   #endif
  
       PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");
       AutoMutex autoMut(_entry_mut);
  
 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)      /*
 {          Start at index = 1 because _entries[0] is the tickle entry which never needs
           to be EMPTY;
    HTTPConnection2* temp;      */
    try      unsigned int index;
    {      for (index = 1; index < _entries.size(); index++)
       monitor_2::_connections.lock(pegasus_thread_self());  
       temp = monitor_2::_connections.next(0);  
       while(temp != 0 )  
       {  
          if(sock == temp->getSocket())  
          {          {
             temp = monitor_2::_connections.remove_no_lock(temp);          if (_entries[index].namedPipe.getPipe() == namedPipe.getPipe())
             monitor_2::_connections.unlock();  
             return temp;  
          }  
          temp = monitor_2::_connections.next(temp);  
       }  
       monitor_2::_connections.unlock();  
    }  
    catch(...)  
    {    {
               _entries[index]._status = _MonitorEntry::EMPTY;
               // Ensure that the client has read the data
                       ::FlushFileBuffers (namedPipe.getPipe());
                       //Disconnect to release the pipe. This doesn't release Pipe Handle
                       ::DisconnectNamedPipe (_entries[index].namedPipe.getPipe());
               // Must use CloseHandle to Close Pipe
                           ::CloseHandle(_entries[index].namedPipe.getPipe());
                       _entries[index].namedPipe.disconnect();
               _solicitSocketCount--;
               break;
    }    }
    return 0;  
 } }
  
 Boolean monitor_2::insert_connection(HTTPConnection2* connection)      /*
           Dynamic Contraction:
           To remove excess entries we will start from the end of the _entries array
           and remove all entries with EMPTY status until we find the first NON EMPTY.
           This prevents the positions, of the NON EMPTY entries, from being changed.
       */
       index = _entries.size() - 1;
       while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
 { {
    try          if ((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||
               (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))
    {    {
       monitor_2::_connections.insert_first(connection);                      _entries.remove(index);
    }    }
    catch(...)          index--;
       }
       PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
    {    {
       return false;          AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
    }    }
    return true;  #endif
 } }
  
   #endif
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.72.2.1  
changed lines
  Added in v.1.103.10.27

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2