(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.103.10.27 and 1.109

version 1.103.10.27, 2006/10/18 04:24:42 version 1.109, 2006/06/28 00:15:41
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
   // Author: Mike Brasher (mbrasher@bmc.com)
   //
   // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
   //              Amit K Arora (Bug#1153) amita@in.ibm.com
   //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
   //              Sushma Fernandes (sushma@hp.com) for Bug#2057
   //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
   //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
   #include "Network.h"
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
   
 #include <cstring> #include <cstring>
 #include <Pegasus/Common/Monitor.h>  #include "Monitor.h"
 #include <Pegasus/Common/MessageQueue.h>  #include "MessageQueue.h"
 #include <Pegasus/Common/Socket.h>  #include "Socket.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include <Pegasus/Common/ArrayIterator.h>  #include "ArrayIterator.h"
   
 //const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes  
   
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 // Maximum iterations of Pipe processing in Monitor::run  
 const Uint32 maxIterations = 3;  
   
 #endif  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024  
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \  
 of <winsock.h>. It may have been indirectly included (e.g., by including \  
 <windows.h>). Find inclusion of that header which is visible to this \  
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \  
 otherwise, less than 64 clients (the default) will be able to connect to the \  
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."  
   
 # endif  
 # define FD_SETSIZE 1024  
 # include <windows.h>  
 #else  
 # include <sys/types.h>  
 # include <sys/socket.h>  
 # include <sys/time.h>  
 # include <netinet/in.h>  
 # include <netdb.h>  
 # include <arpa/inet.h>  
 #endif  
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
Line 79 
Line 58 
  
 static AtomicInt _connections(0); static AtomicInt _connections(0);
  
 Mutex Monitor::_cout_mut;  ////////////////////////////////////////////////////////////////////////////////
   //
   // _getError()
   //
   ////////////////////////////////////////////////////////////////////////////////
  
 // Added for NamedPipe implementation for windows  static inline int _getError()
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  {
  #define PIPE_INCREMENT 1  #ifdef PEGASUS_OS_TYPE_WINDOWS
       return WSAGetLastError();
   #else
       return errno;
 #endif #endif
   }
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 94 
Line 80 
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   
 Monitor::Monitor() Monitor::Monitor()
    : _stopConnections(0),    : _stopConnections(0),
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
Line 103 
Line 88 
      _tickle_server_socket(-1),      _tickle_server_socket(-1),
      _tickle_peer_socket(-1)      _tickle_peer_socket(-1)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
Line 124 
Line 103 
        _MonitorEntry entry(0, 0, 0);        _MonitorEntry entry(0, 0, 0);
        _entries.append(entry);        _entries.append(entry);
     }     }
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{     try{
Line 165 
Line 132 
     Socket::uninitializeInterface();     Socket::uninitializeInterface();
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "returning from monitor destructor");                   "returning from monitor destructor");
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
 void Monitor::initializeTickler(){ void Monitor::initializeTickler(){
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     /*     /*
        NOTE: On any errors trying to        NOTE: On any errors trying to
              setup out tickle connection,              setup out tickle connection,
Line 193 
Line 148 
         //handle error         //handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
       // set TCP_NODELAY
       int opt = 1;
       setsockopt(_tickle_server_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
   
     // initialize the address     // initialize the address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
Line 213 
Line 168 
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);      SocketLength _addr_size = sizeof(_tickle_server_addr);
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,     if((::bind(_tickle_server_socket,
Line 226 
Line 181 
 #else #else
         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
                                  "Received error number $0 while binding the internal socket.",                                  "Received error number $0 while binding the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
 #endif #endif
         throw Exception(parms);         throw Exception(parms);
     }     }
Line 240 
Line 191 
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",         MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
                          "Received error number $0 while listening to the internal socket.",                          "Received error number $0 while listening to the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
Line 256 
Line 203 
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",         MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
                          "Received error number $0 while getting the internal socket name.",                          "Received error number $0 while getting the internal socket name.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
Line 271 
Line 214 
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
                          "Received error number $0 while creating the internal client socket.",                          "Received error number $0 while creating the internal client socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
       // set TCP_NODELAY
       setsockopt(_tickle_client_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
   
     // setup the address of the client     // setup the address of the client
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
Line 298 
Line 240 
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
                          "Received error number $0 while binding the internal client socket.",                          "Received error number $0 while binding the internal client socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
Line 313 
Line 251 
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
                          "Received error number $0 while connecting the internal client socket.",                          "Received error number $0 while connecting the internal client socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the slave connection */     /* set up the slave connection */
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);      SocketLength peer_size = sizeof(_tickle_peer_addr);
     pegasus_sleep(1);     pegasus_sleep(1);
  
     // this call may fail, we will try a max of 20 times to establish this peer connection     // this call may fail, we will try a max of 20 times to establish this peer connection
Line 350 
Line 284 
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",         MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
                          "Received error number $0 while accepting the internal socket connection.",                          "Received error number $0 while accepting the internal socket connection.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)                                   _getError());
                                  errno);  
 #else  
                                  WSAGetLastError());  
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
     // checks entries with IDLE state for events     // checks entries with IDLE state for events
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
         Tracer::trace(TRC_HTTP,Tracer::LEVEL2,"!!!!!!!! TICKLE SOCKET-ID = %u",_tickle_peer_socket);  
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
     _entries.append(entry);     _entries.append(entry);
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
 void Monitor::tickle(void) void Monitor::tickle(void)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     static char _buffer[] =     static char _buffer[] =
     {     {
       '0','0'       '0','0'
     };     };
                Tracer::trace (TRC_HTTP, Tracer::LEVEL2,  
                                    "Now Monitor::Tickle ");  
     AutoMutex autoMutex(_tickle_mutex);     AutoMutex autoMutex(_tickle_mutex);
     Socket::disableBlocking(_tickle_client_socket);     Socket::disableBlocking(_tickle_client_socket);
                        Tracer::trace (TRC_HTTP, Tracer::LEVEL2,  
                                            "Now Monitor::Tickle::Write() ");  
   
     Socket::write(_tickle_client_socket,&_buffer, 2);     Socket::write(_tickle_client_socket,&_buffer, 2);
     Socket::enableBlocking(_tickle_client_socket);     Socket::enableBlocking(_tickle_client_socket);
                        Tracer::trace (TRC_HTTP, Tracer::LEVEL2,  
                                    "Now Monitor::Tickled ");  
   
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ) void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
Line 415 
Line 319 
     Boolean handled_events = false;     Boolean handled_events = false;
     int i = 0;     int i = 0;
  
       struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
Line 456 
Line 361 
         if ((entry._status.get() == _MonitorEntry::DYING) &&         if ((entry._status.get() == _MonitorEntry::DYING) &&
            (entry._type == Monitor::CONNECTION))            (entry._type == Monitor::CONNECTION))
         {         {
   
             MessageQueue *q = MessageQueue::lookup(entry.queueId);             MessageQueue *q = MessageQueue::lookup(entry.queueId);
             PEGASUS_ASSERT(q != 0);             PEGASUS_ASSERT(q != 0);
             HTTPConnection &h = *static_cast<HTTPConnection *>(q);             HTTPConnection &h = *static_cast<HTTPConnection *>(q);
  
             if (h._connectionClosePending == false)             if (h._connectionClosePending == false)
                         {  
                             continue;                             continue;
                         }  
   
  
             // NOTE: do not attempt to delete while there are pending responses             // NOTE: do not attempt to delete while there are pending responses
             // coming thru. The last response to come thru after a             // coming thru. The last response to come thru after a
Line 475 
Line 376 
  
             if (h._responsePending == true)             if (h._responsePending == true)
             {             {
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
                 if  (!entry.namedPipeConnection)  
                 {  
 #endif  
                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
                         "Ignoring connection delete request because "                         "Ignoring connection delete request because "
                         "responses are still pending. "                         "responses are still pending. "
                         "connection=0x%p, socket=%d\n",                         "connection=0x%p, socket=%d\n",
                         (void *)&h, h.getSocket());                         (void *)&h, h.getSocket());
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
                 }  
                 else  
                 {  
                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "  
                         "Ignoring connection delete request because "  
                         "responses are still pending. "  
                         "connection=0x%p, NamedPipe=%d\n",  
                         (void *)&h, h.getNamedPipe().getPipe());  
                 }  
 #endif  
                 continue;                 continue;
             }             }
             h._connectionClosePending = false;             h._connectionClosePending = false;
             MessageQueue &o = h.get_owner();             MessageQueue &o = h.get_owner();
                     Message* message = 0;            Message* message= new CloseConnectionMessage(entry.socket);
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
             if (!entry.namedPipeConnection)  
             {  
 #endif  
                 message= new CloseConnectionMessage(entry.socket);  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
             }  
             else  
             {  
   
                             message= new CloseConnectionMessage(entry.namedPipe);  
   
             }  
 #endif  
             message->dest = o.getQueueId();             message->dest = o.getQueueId();
  
             // HTTPAcceptor is responsible for closing the connection.             // HTTPAcceptor is responsible for closing the connection.
Line 540 
Line 405 
             autoEntryMutex.lock();             autoEntryMutex.lock();
             // After enqueue a message and the autoEntryMutex has been released and locked again,             // After enqueue a message and the autoEntryMutex has been released and locked again,
             // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.             // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
   
             entries.reset(_entries);             entries.reset(_entries);
         }         }
     }     }
Line 553 
Line 417 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
       SocketHandle maxSocketCurrentPass = 0;
     PEGASUS_SOCKET maxSocketCurrentPass = 0;      for( int indx = 0; indx < (int)entries.size(); indx++)
     int indx;  
   
         // Record the indexes at which Sockets are available  
         Array <Uint32> socketCountAssociator;  
     int socketEntryCount=0;  
   
      // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
     //This array associates named pipe connections to their place in [indx]  
     //in the entries array. The value in portion zero of the array is the  
     //index of the fist named pipe connection in the entries array  
   
         // Record the indexes at which Pipes are available  
         Array <Uint32> indexPipeCountAssociator;  
     int pipeEntryCount=0;  
     int MaxPipes = PIPE_INCREMENT;  
     // List of Pipe Handlers  
     HANDLE * hPipeList = new HANDLE[PIPE_INCREMENT];  
 #endif  
   
     // This loop takes care of setting the namedpipe which has to be used from the list....  
     for ( indx = 0,socketEntryCount=0 ;  
                              indx < (int)entries.size(); indx++)  
     {  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
                 if (!entries[indx].namedPipeConnection)  
         {         {
 #endif  
             if (maxSocketCurrentPass < entries[indx].socket)             if (maxSocketCurrentPass < entries[indx].socket)
                         {  
                                 maxSocketCurrentPass = entries[indx].socket;                                 maxSocketCurrentPass = entries[indx].socket;
                         }  
             if(entries[indx]._status.get() == _MonitorEntry::IDLE)             if(entries[indx]._status.get() == _MonitorEntry::IDLE)
             {             {
                 _idleEntries++;                 _idleEntries++;
                 FD_SET(entries[indx].socket, &fdread);                 FD_SET(entries[indx].socket, &fdread);
                 socketCountAssociator.append(indx);  
                                 socketEntryCount++;  
             }  
   
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
         }         }
                 else  
                 {  
                     entries[indx].pipeSet = false;  
                         if (pipeEntryCount >= MaxPipes)  
                         {  
                             MaxPipes += PIPE_INCREMENT;  
                                 HANDLE* temp_pList = new HANDLE[MaxPipes];  
                                 for (Uint32 i =0;i<pipeEntryCount;i++)  
                                 {  
                                     temp_pList[i] = hPipeList[i];  
                                 }  
                                 delete [] hPipeList;  
                                 hPipeList = temp_pList;  
                     }  
                         hPipeList[pipeEntryCount] = entries[indx].namedPipe.getPipe();  
                         indexPipeCountAssociator.append(indx);  
                         pipeEntryCount++;  
                 }  
   
 #endif  
     }     }
  
     /*     /*
Line 630 
Line 438 
  
     autoEntryMutex.unlock();     autoEntryMutex.unlock();
  
     int events = -1;      //
         // Since the pipes have been introduced, the ratio of procesing      // The first argument to select() is ignored on Windows and it is not
         // time Socket:Pipe :: 3/4:1/4 respectively      // a socket value.  The original code assumed that the number of sockets
       // and a socket value have the same type.  On Windows they do not.
         Uint32 newMilliseconds = milliseconds;      //
         #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
   
         newMilliseconds = (milliseconds * 3)/4 ;  
   
     #endif  
   
         struct timeval tv = {newMilliseconds/1000, newMilliseconds%1000*1000};  
   
   
         #ifdef PEGASUS_OS_TYPE_WINDOWS         #ifdef PEGASUS_OS_TYPE_WINDOWS
                 events = select(0, &fdread, NULL, NULL, &tv);      int events = select(0, &fdread, NULL, NULL, &tv);
         #else         #else
                 events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
         #endif         #endif
   
     autoEntryMutex.lock();     autoEntryMutex.lock();
     // After enqueue a message and the autoEntryMutex has been released and locked again,     // After enqueue a message and the autoEntryMutex has been released and locked again,
     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
     entries.reset(_entries);     entries.reset(_entries);
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS      if (events == PEGASUS_SOCKET_ERROR)
     if(events == SOCKET_ERROR)  
 #else  
     if(events == -1)  
 #endif  
     {     {
          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
          "Monitor::run - errorno = %d has occurred on select.", errno);          "Monitor::run - errorno = %d has occurred on select.", errno);
Line 675 
Line 469 
          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
          "Monitor::run select event received events = %d, monitoring %d idle entries",          "Monitor::run select event received events = %d, monitoring %d idle entries",
          events, _idleEntries);          events, _idleEntries);
          for ( int sindx = 0; sindx < socketEntryCount; sindx++)         for( int indx = 0; indx < (int)entries.size(); indx++)
          {          {
              // The Monitor should only look at entries in the table that are IDLE (i.e.,              // The Monitor should only look at entries in the table that are IDLE (i.e.,
              // owned by the Monitor).              // owned by the Monitor).
                      indx = socketCountAssociator[sindx];  
   
              if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&              if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                   (FD_ISSET(entries[indx].socket, &fdread)))                   (FD_ISSET(entries[indx].socket, &fdread)))
              {              {
Line 688 
Line 480 
                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                "Monitor::run indx = %d, queueId =  %d, q = %p",                                "Monitor::run indx = %d, queueId =  %d, q = %p",
                                indx, entries[indx].queueId, q);                                indx, entries[indx].queueId, q);
   
                  PEGASUS_ASSERT(q !=0);                  PEGASUS_ASSERT(q !=0);
  
                  try                  try
                  {                  {
                      if (entries[indx]._type == Monitor::CONNECTION)                      if (entries[indx]._type == Monitor::CONNECTION)
                      {                      {
                      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                        "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
                          static_cast<HTTPConnection *>(q)->_entry_index = indx;                          static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                                                  // Do not update the entry just yet. The entry gets updated once                                                  // Do not update the entry just yet. The entry gets updated once
Line 702 
Line 495 
                                                  //entries[indx]._status = _MonitorEntry::BUSY;                                                  //entries[indx]._status = _MonitorEntry::BUSY;
  
                                                  // If allocate_and_awaken failure, retry on next iteration                                                  // If allocate_and_awaken failure, retry on next iteration
   /* Removed for PEP 183.
                      if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
                              (void *)q, _dispatch))
                      {
                         Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                             "Monitor::run: Insufficient resources to process request.");
                         entries[indx]._status = _MonitorEntry::IDLE;
                         return true;
                      }
   */
   // Added for PEP 183
                                                  HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                                                  HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                 "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                                                                 "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
Line 716 
Line 520 
                                                           "Monitor::_dispatch: exception received");                                                           "Monitor::_dispatch: exception received");
                                                  }                                                  }
                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                             "Monitor::_dispatch: exited run() for index %d",                     "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
                                                     dst->_entry_index);  
  
                      // It is possible the entry status may not be set to busy.
                      // The following will fail in that case.
                      // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
                      // Once the HTTPConnection thread has set the status value to either
                      // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
                      // to the Monitor.  It is no longer permissible to access the connection
                      // or the entry in the _entries table.
   
                      // The following is not relevant as the worker thread or the
                      // reader thread will update the status of the entry.
                      //if (dst->_connectionClosePending)
                      //{
                      //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
                      //}
                      //else
                      //{
                      //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
                      //}
   // end Added for PEP 183
                                         }                                         }
                                         else if (entries[indx]._type == Monitor::INTERNAL)                  else if( entries[indx]._type == Monitor::INTERNAL){
                                         {  
                                                 // set ourself to BUSY,                                                 // set ourself to BUSY,
                                                 // read the data                                                 // read the data
                                             // and set ourself back to IDLE                                             // and set ourself back to IDLE
   
                           entries[indx]._status = _MonitorEntry::BUSY;
                                             static char buffer[2];                                             static char buffer[2];
                                         Socket::disableBlocking(entries[indx].socket);                                         Socket::disableBlocking(entries[indx].socket);
   
                                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);                                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                                                 Socket::enableBlocking(entries[indx].socket);                                                 Socket::enableBlocking(entries[indx].socket);
                                                 entries[indx]._status = _MonitorEntry::IDLE;                                                 entries[indx]._status = _MonitorEntry::IDLE;
Line 736 
Line 558 
                                         {                                         {
                                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                  "Non-connection entry, indx = %d, has been received.", indx);                                                  "Non-connection entry, indx = %d, has been received.", indx);
   
                                                 int events = 0;                                                 int events = 0;
                                                 events |= SocketMessage::READ;                                                 events |= SocketMessage::READ;
                                                 Message *msg = new SocketMessage(entries[indx].socket, events);                                                 Message *msg = new SocketMessage(entries[indx].socket, events);
Line 748 
Line 569 
                                                 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries                                                 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
                                                 entries.reset(_entries);                                                 entries.reset(_entries);
                                                 entries[indx]._status = _MonitorEntry::IDLE;                                                 entries[indx]._status = _MonitorEntry::IDLE;
                                                 handled_events = true;  
                                                 delete [] hPipeList;  
                                                 return handled_events;  
  
                      return true;
                                         }                                         }
                                 }                                 }
                                 catch(...)                                 catch(...)
Line 760 
Line 579 
                                 handled_events = true;                                 handled_events = true;
                         }                         }
         }         }
                 delete [] hPipeList;  
                 return handled_events;  
     }     }
  
   
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
   
         //if no pipes are registered return immediately  
   
         int pEvents = -1;  
         int pCount = -1;  
         BOOL bPeekPipe = 0;  
         DWORD dwBytesAvail=0;  
         // The pipe is sniffed and check if there are any data. If available, the  
         // message is picked from the Queue and appropriate methods are invoked.  
   
   
         // pipeProcessCount records the number of requests that are processed.  
         // At the end of loop this is verified against the count of request  
         // on local connection . If there are any pipes which needs to be  
         // processed we would apply delay and then proceed to iterate.  
   
     Uint32 pipeProcessCount =0;  
   
     for (int counter = 1; counter < maxIterations ; counter ++)  
     {  
   
   
                 // pipeIndex is used to index into indexPipeCountAssociator to fetch  
                 // index of the _MonitorEntry of Monitor  
         for (int pipeIndex = 0; pipeIndex < pipeEntryCount; pipeIndex++)  
             {  
             dwBytesAvail = 0;  
                     bPeekPipe = ::PeekNamedPipe(hPipeList[pipeIndex],  
                                                     NULL,  
                                                                     NULL,  
                                                                         NULL,  
                                         &dwBytesAvail,  
                                                                         NULL  
                                                                        );  
   
                         // If peek on NamedPipe was successfull and data is available  
             if (bPeekPipe && dwBytesAvail)  
                 {  
   
                             Tracer::trace(TRC_HTTP,Tracer::LEVEL4," PIPE_PEEKING FOUND = %u BYTES", dwBytesAvail);  
   
                             pEvents = 1;  
                     entries[indexPipeCountAssociator[pipeIndex]].pipeSet = true;  
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                     "Monitor::run select event received events = %d, \  
                                         monitoring %d idle entries",  
                     pEvents,  
                                         _idleEntries);  
   
                                 int pIndx = indexPipeCountAssociator[pipeIndex];  
   
                                 if ((entries[pIndx]._status.get() == _MonitorEntry::IDLE) &&  
                                          entries[pIndx].namedPipe.isConnected() &&  
                                          (pEvents))  
                         {  
   
                                 MessageQueue *q = 0;  
   
                     try  
                                         {  
   
                                         q = MessageQueue::lookup (entries[pIndx].queueId);  
                     }  
                     catch (Exception e)  
                     {  
                                         e.getMessage();  
                                 }  
                     catch(...)  
                     {  
                                 }  
   
                                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                   "Monitor::run indx = %d, queueId =  %d,\  
                                                                   q = %p",pIndx, entries[pIndx].queueId, q);  
                     try  
                     {  
                                         if (entries[pIndx]._type == Monitor::CONNECTION)  
                         {  
   
                                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                       "entries[indx].type for indx = \  
                                                                               %d is Monitor::CONNECTION",  
                                                                                   pIndx);  
                                                     static_cast<HTTPConnection *>(q)->_entry_index = pIndx;  
                                                 HTTPConnection *dst = reinterpret_cast \  
                                                                                            <HTTPConnection *>(q);  
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                       "Monitor::_dispatch: entering run() \  
                                                                                   for indx  = %d, queueId = %d, \  
                                                                                   q = %p",\  
                                                               dst->_entry_index,  
                                                                                   dst->_monitor->_entries\  
                                                                                   [dst->_entry_index].queueId, dst);  
   
                                                 try  
                                                 {  
   
                                                         dst->run(1);  
   
                                                         // Record that the requested data is read/Written  
                                                         pipeProcessCount++;  
   
                                                 }  
                                                 catch (...)  
                                                 {  
                                                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                               "Monitor::_dispatch: \  
                                                                                            exception received");  
                                                 }  
   
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                               "Monitor::_dispatch: exited \  
                                                                        \run() index %d",  
                                                                                    dst->_entry_index);  
   
   
                                         }  
                                         else  
                                         {  
                                                 /* The condition  
                                                            entries[indx]._type == Monitor::INTERNAL can be  
                                                            ignored for pipes as the tickler is of  
                                                            Monitor::INTERNAL type. The tickler is  
                                                            a socket.  
                                                 */  
   
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                                                                   "Non-connection entry, indx = %d,\  
                                                                                           has been received.", pIndx);  
                                                 int events = 0;  
                                                 Message *msg = 0;  
   
                                                     pEvents |= NamedPipeMessage::READ;  
                                                     msg = new NamedPipeMessage(entries[pIndx].namedPipe, pEvents);  
                                     entries[pIndx]._status = _MonitorEntry::BUSY;  
                                     autoEntryMutex.unlock();  
                                             q->enqueue(msg);  
                                                 autoEntryMutex.lock();  
                                     entries.reset(_entries);  
                                     entries[pIndx]._status = _MonitorEntry::IDLE;  
                                                         delete [] hPipeList;  
                                 return(handled_events);  
   
                                         }  
   
   
                                 }  
                                 catch(...)  
                                 {  
   
                         }  
                     }  
   
                 }  
         }  
   
                 //Check if all the pipes had recieved the data, If no then try again  
         if (pipeEntryCount == pipeProcessCount)  
                 {  
                     break;  
                 }  
   
   
     }  
   
         delete [] hPipeList;  
   
 #endif  
   
     return(handled_events);     return(handled_events);
 } }
  
 void Monitor::stopListeningForConnections(Boolean wait) void Monitor::stopListeningForConnections(Boolean wait)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
     // set boolean then tickle the server to recognize _stopConnections     // set boolean then tickle the server to recognize _stopConnections
     _stopConnections = 1;     _stopConnections = 1;
Line 960 
Line 600 
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     PEGASUS_SOCKET socket,      SocketHandle socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)     int type)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
    AutoMutex autoMut(_entry_mut);    AutoMutex autoMut(_entry_mut);
    // Check to see if we need to dynamically grow the _entries array    // Check to see if we need to dynamically grow the _entries array
Line 1007 
Line 635 
             _entries[index]._type = type;             _entries[index]._type = type;
             _entries[index]._status = _MonitorEntry::IDLE;             _entries[index]._status = _MonitorEntry::IDLE;
  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
             {  
                 AutoMutex automut(Monitor::_cout_mut);  
                 PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
             }  
 #endif  
             return index;             return index;
          }          }
       }       }
Line 1022 
Line 644 
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
    {  
        AutoMutex automut(Monitor::_cout_mut);  
        PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
    }  
 #endif  
    return -1;    return -1;
  
 } }
  
 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)  void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
Line 1073 
Line 683 
         index--;         index--;
     }     }
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 } }
  
 // Note: this is no longer called with PEP 183. // Note: this is no longer called with PEP 183.
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 { {
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
Line 1123 
Line 721 
    return 0;    return 0;
 } }
  
 // Added for NamedPipe implementation for windows  
 #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 //This method is anlogus to solicitSocketMessages. It does the same thing for named Pipes  
 int  Monitor::solicitPipeMessages(  
     NamedPipe namedPipe,  
     Uint32 events,  //not sure what has to change for this enum  
     Uint32 queueId,  
     int type)  
 {  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");  
   
    AutoMutex autoMut(_entry_mut);  
    // Check to see if we need to dynamically grow the _entries array  
    // We always want the _entries array to 2 bigger than the  
    // current connections requested  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
 {  
    AutoMutex automut(Monitor::_cout_mut);  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);  
 }  
 #endif  
   
    _solicitSocketCount++;  // bump the count  
    int size = (int)_entries.size();  
    if((int)_solicitSocketCount >= (size-1)){  
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){  
                 _MonitorEntry entry(0, 0, 0);  
                 _entries.append(entry);  
         }  
    }  
   
    int index;  
    for(index = 1; index < (int)_entries.size(); index++)  
    {  
       try  
       {  
          if(_entries[index]._status.get() == _MonitorEntry::EMPTY)  
          {  
             _entries[index].socket = NULL;  
             _entries[index].namedPipe = namedPipe;  
             _entries[index].namedPipeConnection = true;  
             _entries[index].queueId  = queueId;  
             _entries[index]._type = type;  
             _entries[index]._status = _MonitorEntry::IDLE;  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
   {  
             AutoMutex automut(Monitor::_cout_mut);  
             PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);  
   }  
 #endif  
   
             return index;  
          }  
       }  
       catch(...)  
       {  
       }  
   
    }  
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);  
   
    PEG_METHOD_EXIT();  
    return -1;  
   
 }  
   
 //////////////////////////////////////////////////////////////////////////////  
 // Method Name      : unsolicitPipeMessages  
 // Input Parameter  : namedPipe  - type NamedPipe  
 // Return Type      : void  
 //============================================================================  
 // This method is invoked from HTTPAcceptor::handleEnqueue for server  
 // when the CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked  
 // from HTTPAcceptor::destroyConnections method when the CIMServer is shutdown.  
 // For the CIMClient, this is invoked from HTTPConnector::handleEnqueue when the  
 // CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked from  
 // HTTPConnector::disconnect when CIMClient requests a disconnect request.  
 // The list of _MonitorEntry is searched for the matching pipe.  
 // The Handle of the identified is closed and _MonitorEntry for the  
 // requested pipe is removed.  
 ///////////////////////////////////////////////////////////////////////////////  
   
 void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)  
 {  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");  
     AutoMutex autoMut(_entry_mut);  
   
     /*  
         Start at index = 1 because _entries[0] is the tickle entry which never needs  
         to be EMPTY;  
     */  
     unsigned int index;  
     for (index = 1; index < _entries.size(); index++)  
     {  
         if (_entries[index].namedPipe.getPipe() == namedPipe.getPipe())  
         {  
             _entries[index]._status = _MonitorEntry::EMPTY;  
             // Ensure that the client has read the data  
                     ::FlushFileBuffers (namedPipe.getPipe());  
                     //Disconnect to release the pipe. This doesn't release Pipe Handle  
                     ::DisconnectNamedPipe (_entries[index].namedPipe.getPipe());  
             // Must use CloseHandle to Close Pipe  
                         ::CloseHandle(_entries[index].namedPipe.getPipe());  
                     _entries[index].namedPipe.disconnect();  
             _solicitSocketCount--;  
             break;  
         }  
     }  
   
     /*  
         Dynamic Contraction:  
         To remove excess entries we will start from the end of the _entries array  
         and remove all entries with EMPTY status until we find the first NON EMPTY.  
         This prevents the positions, of the NON EMPTY entries, from being changed.  
     */  
     index = _entries.size() - 1;  
     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)  
         {  
         if ((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||  
             (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))  
         {  
                     _entries.remove(index);  
         }  
         index--;  
     }  
     PEG_METHOD_EXIT();  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
 #endif  
 }  
   
 #endif  
   
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.103.10.27  
changed lines
  Added in v.1.109

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2