(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.115.14.3 and 1.116.4.2

version 1.115.14.3, 2007/01/24 04:31:02 version 1.116.4.2, 2007/12/14 20:47:55
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include "Network.h" #include "Network.h"
Line 44 
Line 43 
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
 #include <errno.h> #include <errno.h>
   
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0); static AtomicInt _connections(0);
  
 // Added for NamedPipe implementation for windows  
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
     !defined (PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 Mutex Monitor::_cout_mut;  
 #define PIPE_INCREMENT 1  
 #endif  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor // Monitor
Line 67 
Line 61 
    : _stopConnections(0),    : _stopConnections(0),
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
      _solicitSocketCount(0),      _solicitSocketCount(0),
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
      _solicitPipeCount(0),  
 #endif  
      _tickle_client_socket(-1),      _tickle_client_socket(-1),
      _tickle_server_socket(-1),      _tickle_server_socket(-1),
      _tickle_peer_socket(-1)      _tickle_peer_socket(-1)
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _entries_socket.reserveCapacity(numberOfMonitorEntriesToAllocate);      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
     _entries_pipe.reserveCapacity(numberOfMonitorEntriesToAllocate);  
 #endif  
     // setup the tickler     // setup the tickler
     initializeTickler();     initializeTickler();
  
Line 92 
Line 78 
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
     {     {
        _MonitorEntry entry(0, 0, 0);        _MonitorEntry entry(0, 0, 0);
        _entries_socket.append(entry);         _entries.append(entry);
     }     }
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
     for ( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {  
        _MonitorEntry entry(0, 0, 0);  
        _entries_pipe.append(entry);  
     }  
 #endif  
 } }
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
     uninitializeTickler();     uninitializeTickler();
     Socket::uninitializeInterface();     Socket::uninitializeInterface();
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                   "returning from monitor destructor");                   "returning from monitor destructor");
 } }
 void Monitor::uninitializeTickler(){  void Monitor::uninitializeTickler()
   {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{      try
       {
         if(_tickle_peer_socket >= 0)         if(_tickle_peer_socket >= 0)
         {         {
             Socket::close(_tickle_peer_socket);             Socket::close(_tickle_peer_socket);
Line 131 
Line 110 
     }     }
     catch(...)     catch(...)
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                   "Failed to close tickle sockets");                   "Failed to close tickle sockets");
     }     }
  
 } }
  
 void Monitor::initializeTickler(){  void Monitor::initializeTickler()
   {
     /*     /*
        NOTE: On any errors trying to        NOTE: On any errors trying to
              setup out tickle connection,              setup out tickle connection,
Line 148 
Line 128 
     // try until the tcpip is restarted     // try until the tcpip is restarted
     do     do
     {     {
   
         // get a socket for the server side         // get a socket for the server side
         if((_tickle_server_socket = Socket::createSocket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET)          if ((_tickle_server_socket =
                    Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
                PEGASUS_INVALID_SOCKET)
         {         {
             //handle error              MessageLoaderParms parms(
             MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",                  "Common.Monitor.TICKLE_CREATE",
                                      "Received error number $0 while creating the internal socket.",                                      "Received error number $0 while creating the internal socket.",
                                      getSocketError());                                      getSocketError());
             throw Exception(parms);             throw Exception(parms);
Line 178 
Line 159 
                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                    sizeof(_tickle_server_addr))) < 0)                    sizeof(_tickle_server_addr))) < 0)
         {         {
             // handle error  
 #ifdef PEGASUS_OS_ZOS #ifdef PEGASUS_OS_ZOS
             MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",              MessageLoaderParms parms(
                                 "Received error:$0 while binding the internal socket."                  "Common.Monitor.TICKLE_BIND_LONG",
                                 ,strerror(errno));                  "Received error:$0 while binding the internal socket.",
                   strerror(errno));
 #else #else
             MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",              MessageLoaderParms parms(
                   "Common.Monitor.TICKLE_BIND",
                                 "Received error number $0 while binding the internal socket.",                                 "Received error number $0 while binding the internal socket.",
                                 getSocketError());                                 getSocketError());
 #endif #endif
Line 194 
Line 176 
         // tell the kernel we are a server         // tell the kernel we are a server
         if((::listen(_tickle_server_socket,3)) < 0)         if((::listen(_tickle_server_socket,3)) < 0)
         {         {
             // handle error              MessageLoaderParms parms(
             MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",                  "Common.Monitor.TICKLE_LISTEN",
                                 "Received error number $0 while listening to the internal socket.",                  "Received error number $0 while listening to the internal "
                       "socket.",
                                 getSocketError());                                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // make sure we have the correct socket for our server         // make sure we have the correct socket for our server
         int sock = ::getsockname(_tickle_server_socket,          int sock = ::getsockname(
               _tickle_server_socket,
                             reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),                             reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                             &_addr_size);                             &_addr_size);
         if(sock < 0)         if(sock < 0)
         {         {
             // handle error              MessageLoaderParms parms(
             MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",                  "Common.Monitor.TICKLE_SOCKNAME",
                                 "Received error number $0 while getting the internal socket name.",                  "Received error number $0 while getting the internal socket "
                       "name.",
                                 getSocketError());                                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
Line 217 
Line 202 
         /* set up the tickle client/connector */         /* set up the tickle client/connector */
  
         // get a socket for our tickle client         // get a socket for our tickle client
         if((_tickle_client_socket = Socket::createSocket(PF_INET, SOCK_STREAM, 0))          if ((_tickle_client_socket =
            == PEGASUS_INVALID_SOCKET)                   Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
         {              PEGASUS_INVALID_SOCKET)
             // handle error          {
             MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",              MessageLoaderParms parms(
                                 "Received error number $0 while creating the internal client socket.",                  "Common.Monitor.TICKLE_CLIENT_CREATE",
                   "Received error number $0 while creating the internal client "
                       "socket.",
                                 getSocketError());                                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
Line 244 
Line 231 
                    reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),                    reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                    sizeof(_tickle_client_addr))) < 0)                    sizeof(_tickle_client_addr))) < 0)
         {         {
             // handle error              MessageLoaderParms parms(
             MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",                  "Common.Monitor.TICKLE_CLIENT_BIND",
                                      "Received error number $0 while binding the internal client socket.",                  "Received error number $0 while binding the internal client "
                       "socket.",
                                      getSocketError());                                      getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
Line 256 
Line 244 
                       reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),                       reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                       sizeof(_tickle_server_addr))) < 0)                       sizeof(_tickle_server_addr))) < 0)
         {         {
             // handle error              MessageLoaderParms parms(
             MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",                  "Common.Monitor.TICKLE_CLIENT_CONNECT",
                                      "Received error number $0 while connecting the internal client socket.",                  "Received error number $0 while connecting the internal "
                       "client socket.",
                                      getSocketError());                                      getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
Line 268 
Line 257 
         SocketLength peer_size = sizeof(_tickle_peer_addr);         SocketLength peer_size = sizeof(_tickle_peer_addr);
         Threads::sleep(1);         Threads::sleep(1);
  
         // this call may fail, we will try a max of 20 times to establish this peer connection          // this call may fail, we will try a max of 20 times to establish
           // this peer connection
         if((_tickle_peer_socket = ::accept(_tickle_server_socket,         if((_tickle_peer_socket = ::accept(_tickle_server_socket,
                                            reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),                                            reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                            &peer_size)) < 0)                                            &peer_size)) < 0)
         {         {
               if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
             if(_tickle_peer_socket == PEGASUS_SOCKET_ERROR                  getSocketError() == PEGASUS_NETWORK_TRYAGAIN)
                && getSocketError() == PEGASUS_NETWORK_TRYAGAIN)  
             {             {
                 int retries = 0;                 int retries = 0;
                 do                 do
                 {                 {
                     Threads::sleep(1);                     Threads::sleep(1);
                     _tickle_peer_socket = ::accept(_tickle_server_socket,                      _tickle_peer_socket = ::accept(
                           _tickle_server_socket,
                                                    reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),                                                    reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                                    &peer_size);                                                    &peer_size);
                     retries++;                     retries++;
                 } while(_tickle_peer_socket == PEGASUS_SOCKET_ERROR                  } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
                         && getSocketError() == PEGASUS_NETWORK_TRYAGAIN                           getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&
                         && retries < 20);                           retries < 20);
             }             }
             // TCP/IP is down, destroy sockets and retry again.             // TCP/IP is down, destroy sockets and retry again.
             if(_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&             if(_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
Line 299 
Line 289 
                 continue;                 continue;
             }             }
         }         }
   
   
         if(_tickle_peer_socket == PEGASUS_SOCKET_ERROR)         if(_tickle_peer_socket == PEGASUS_SOCKET_ERROR)
         {         {
             // handle error              MessageLoaderParms parms(
             MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",                  "Common.Monitor.TICKLE_ACCEPT",
                                      "Received error number $0 while accepting the internal socket connection.",                  "Received error number $0 while accepting the internal "
                       "socket connection.",
                                      getSocketError());                                      getSocketError());
             throw Exception(parms);             throw Exception(parms);
         } else          }
           else
         {         {
             // socket is ok             // socket is ok
             break;             break;
Line 318 
Line 308 
     Socket::disableBlocking(_tickle_peer_socket);     Socket::disableBlocking(_tickle_peer_socket);
     Socket::disableBlocking(_tickle_client_socket);     Socket::disableBlocking(_tickle_client_socket);
  
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only      // add the tickler to the list of entries to be monitored and set to
       // IDLE because Monitor only
     // checks entries with IDLE state for events     // checks entries with IDLE state for events
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
  
     // is the tickler initalized as first socket on startup ?     // is the tickler initalized as first socket on startup ?
     if (_entries_socket.size()==0)      if (_entries.size()==0)
     {     {
        // if yes, append a new entry        // if yes, append a new entry
        _entries_socket.append(entry);         _entries.append(entry);
     }     }
     else     else
     {     {
        // if not, overwrite the tickler entry with new socket        // if not, overwrite the tickler entry with new socket
        _entries_socket[0]=entry;         _entries[0]=entry;
     }     }
   
 } }
  
 void Monitor::tickle(void)  void Monitor::tickle()
 { {
     static char _buffer[] =     static char _buffer[] =
     {     {
Line 348 
Line 338 
     Socket::write(_tickle_client_socket,&_buffer, 2);     Socket::write(_tickle_client_socket,&_buffer, 2);
 } }
  
 void Monitor::setSocketState( Uint32 index, _MonitorEntry::entry_status status )  void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
 { {
     // Set the state to requested state     // Set the state to requested state
     _entries_socket[index]._status = status;      _entries[index]._status = status;
 }  
   
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 void Monitor::setPipeState( Uint32 index, _MonitorEntry::entry_status status )  
 {  
     // Set the state to requested state  
     _entries_pipe[index]._status = status;  
 }  
   
 void  Monitor::handlePipe()  
 {  
   
     AutoMutex autoEntryMutex(_entry_pipe_mut);  
   
     ArrayIterator<_MonitorEntry> entries(_entries_pipe);  
   
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries  
   
     if (_stopConnections.get() > 0)  
     {  
         for ( int indx = 0; indx < (int)entries.size(); indx++)  
         {  
             if (entries[indx]._type == Monitor::ACCEPTOR)  
             {  
                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)  
                 {  
                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||  
                         entries[indx]._status.get() == _MonitorEntry::DYING )  
                    {  
                        // remove the entry  
                        entries[indx]._status = _MonitorEntry::EMPTY;  
                    }  
                    else  
                    {  
                        // set status to DYING  
                        entries[indx]._status = _MonitorEntry::DYING;  
                    }  
                }  
            }  
         }  
         _stopConnections--;  
   
         if (_stopConnections.get() == 0)  
         {  
             _stopConnectionsSem.signal();  
         }  
     }     }
  
     for( int indx = 0; indx < (int)entries.size(); indx++)  
     {  
        const _MonitorEntry &entry = entries[indx];  
        if ((entry._status.get() == _MonitorEntry::DYING) &&  
                      (entry._type == Monitor::CONNECTION))  
         {  
             MessageQueue *q = MessageQueue::lookup(entry.queueId);  
             PEGASUS_ASSERT(q != 0);  
             HTTPConnection &h = *static_cast<HTTPConnection *>(q);  
   
             if (h._connectionClosePending == false)  
                 continue;  
   
             // NOTE: do not attempt to delete while there are pending responses  
             // coming thru. The last response to come thru after a  
             // _connectionClosePending will reset _responsePending to false  
             // and then cause the monitor to rerun this code and clean up.  
             // (see HTTPConnection.cpp)  
   
             if (h._responsePending == true)  
             {  
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                     "Monitor::handlePipe - "  
                     "Ignoring connection delete request because "  
                     "responses are still pending. "  
                     "connection=0x%p, NamedPipe=%d\n",  
                     (void *)&h, h.getNamedPipe().getPipe());  
                 continue;  
             }  
             h._connectionClosePending = false;  
             MessageQueue &o = h.get_owner();  
             Message* message = 0;  
   
             message= new CloseConnectionMessage(entry.namedPipe);  
   
             message->dest = o.getQueueId();  
   
             // HTTPAcceptor is responsible for closing the connection.  
             // The lock is released to allow HTTPAcceptor to call  
             // unsolicitPipeMessages to free the entry.  
             // Once HTTPAcceptor completes processing of the close  
             // connection, the lock is re-requested and processing of  
             // the for loop continues.  This is safe with the current  
             // implementation of the entries object.  Note that the  
             // loop condition accesses the entries.size() on each  
             // iteration, so that a change in size while the mutex is  
             // unlocked will not result in an ArrayIndexOutOfBounds  
             // exception.  
   
             _entry_pipe_mut.unlock();  
             o.enqueue(message);  
             _entry_pipe_mut.lock();  
             // After enqueue a message and the autoEntryMutex has been released and locked again,  
             // the array of _entries_socket can be changed. The ArrayIterator has be reset with the original _entries_socket.  
             entries.reset(_entries_pipe);  
         }  
     }  
   
     Uint32 _idleEntries = 0;  
     //Records the list of index in entries_pipe that are valid  
     Array <Uint32> indexPipeCountAssociator;  
     int pipeEntryCount=0;  
     int MaxPipes = PIPE_INCREMENT;  
     // List of Pipe Handlers  
     HANDLE * hPipeList = new HANDLE[PIPE_INCREMENT];  
   
     for( int indx = 0; indx < (int)entries.size()  ; indx++)  
     {  
         if (!entries[indx].namedPipeConnection)  
         {  
             continue;  
         }  
   
         entries[indx].pipeSet = false;  
         if (pipeEntryCount >= MaxPipes)  
         {  
             MaxPipes += PIPE_INCREMENT;  
             HANDLE* temp_pList = new HANDLE[MaxPipes];  
             for (Uint32 i =0;i<pipeEntryCount;i++)  
             {  
                 temp_pList[i] = hPipeList[i];  
             }  
             delete [] hPipeList;  
             hPipeList = temp_pList;  
         }  
         hPipeList[pipeEntryCount] = entries[indx].namedPipe.getPipe();  
         indexPipeCountAssociator.append(indx);  
         pipeEntryCount++;  
   
     }  
   
   
     int pEvents = -1;  
     int pCount = -1;  
     BOOL bPeekPipe = 0;  
     DWORD dwBytesAvail=0;  
     // The pipe is sniffed and check if there are any data. If available, the  
     // message is picked from the Queue and appropriate methods are invoked.  
   
   
     // pipeProcessCount records the number of requests that are processed.  
     // At the end of loop this is verified against the count of request  
     // on local connection . If there are any pipes which needs to be  
     // processed we would apply delay and then proceed to iterate.  
   
     Uint32 pipeProcessCount =0;  
   
     // pipeIndex is used to index into indexPipeCountAssociator to fetch  
     // index of the _MonitorEntry of Monitor  
     for (int pipeIndex = 0; pipeIndex < pipeEntryCount; pipeIndex++)  
     {  
         dwBytesAvail = 0;  
   
         bPeekPipe = ::PeekNamedPipe(hPipeList[pipeIndex],  
                                     NULL,  
                                     NULL,  
                                     NULL,  
                                     &dwBytesAvail,  
                                     NULL  
                                     );  
   
         // If peek on NamedPipe was successfull and data is available  
         if (bPeekPipe && dwBytesAvail)  
         {  
   
             pEvents = 1;  
             Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "EVENT TRIGGERED in Pipe = %u ",entries[indexPipeCountAssociator[pipeIndex]].namedPipe.getPipe());  
             entries[indexPipeCountAssociator[pipeIndex]].pipeSet = true;  
             int pIndx = indexPipeCountAssociator[pipeIndex];  
   
             if ((entries[pIndx]._status.get() == _MonitorEntry::IDLE) &&  
                     entries[pIndx].namedPipe.isConnected())  
             {  
   
                 MessageQueue *q = 0;  
   
                 try  
                 {  
   
                     q = MessageQueue::lookup (entries[pIndx].queueId);  
                     PEGASUS_ASSERT(q !=0);  
                 }  
                 catch (Exception e)  
                 {  
                     e.getMessage();  
                 }  
                 catch(...)  
                 {  
                 }  
   
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                 "Monitor::handlePipe indx = %d, queueId =  %d,\  
                                 q = %p",pIndx, entries[pIndx].queueId, q);  
                 try  
                 {  
                     if (entries[pIndx]._type == Monitor::CONNECTION)  
                     {  
   
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                         "entries[indx].type for indx = \  
                                         %d is Monitor::CONNECTION",  
                                         pIndx);  
                         static_cast<HTTPConnection *>(q)->_entry_index = pIndx;  
                         HTTPConnection *dst = reinterpret_cast \  
                                                     <HTTPConnection *>(q);  
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                         "Monitor::_dispatch: entering handlePipe() \  
                                         for indx  = %d, queueId = %d, \  
                                         q = %p",\  
                                         dst->_entry_index,  
                                         dst->_monitor->_entries_pipe\  
                                         [dst->_entry_index].queueId, dst);  
   
                         try  
                         {  
   
                             dst->run(1);  
   
                             // Record that the requested data is read/Written  
                             pipeProcessCount++;  
   
                         }  
                         catch (...)  
                         {  
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                             "Monitor::_dispatch: \  
                                             exception received");  
                         }  
   
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                         "Monitor::_dispatch: exited \  
                                         handlePipe() index %d",  
                                         dst->_entry_index);  
   
   
                     }  
                     else  
                     {  
                         /* The condition  
                             entries[indx]._type == Monitor::INTERNAL can be  
                             ignored for pipes as the tickler is of  
                             Monitor::INTERNAL type. The tickler is  
                             a socket.  
                         */  
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                             "Non-connection entry, indx = %d,\  
                                             has been received.", pIndx);  
   
                         Message *msg = 0;  
   
                         pEvents |= NamedPipeMessage::READ;  
                         msg = new NamedPipeMessage(entries[pIndx].namedPipe, pEvents);  
                         entries[pIndx]._status = _MonitorEntry::BUSY;  
                         _entry_pipe_mut.unlock();  
                         q->enqueue(msg);  
                         _entry_pipe_mut.lock();  
                         entries.reset(_entries_pipe);  
                         entries[pIndx]._status = _MonitorEntry::IDLE;  
                         delete [] hPipeList;  
                         return;  
                     }  
   
   
                 }  
                 catch(...)  
                 {  
   
                 }  
             }  
   
         }  
   
   
     }  
     delete [] hPipeList;  
   
     return;  
   
 }  
 #endif  
   
 void Monitor::run(Uint32 milliseconds) void Monitor::run(Uint32 milliseconds)
 { {
  
Line 654 
Line 356 
  
     AutoMutex autoEntryMutex(_entry_mut);     AutoMutex autoEntryMutex(_entry_mut);
  
     ArrayIterator<_MonitorEntry> entries(_entries_socket);      ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries      // Check the stopConnections flag.  If set, clear the Acceptor monitor
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\      // entries
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
     if (_stopConnections.get() > 0)  
 #else  
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
 #endif  
     {     {
         for ( int indx = 0; indx < (int)entries.size(); indx++)         for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
Line 684 
Line 382 
                }                }
            }            }
         }         }
         _stopConnections--;          _stopConnections = 0;
         if (_stopConnections.get() == 0)  
         {  
             _stopConnectionsSem.signal();             _stopConnectionsSem.signal();
         }         }
     }  
  
     for( int indx = 0; indx < (int)entries.size(); indx++)     for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
Line 712 
Line 407 
  
                     if (h._responsePending == true)                     if (h._responsePending == true)
                     {                     {
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                                                     "Ignoring connection delete request because "                      "Monitor::run - Ignoring connection delete request "
                                                     "responses are still pending. "                          "because responses are still pending. "
                                                     "connection=0x%p, socket=%d\n",                                                     "connection=0x%p, socket=%d\n",
                                                     (void *)&h, h.getSocket());                      (void *)&h, h.getSocket()));
                         continue;                         continue;
                     }                     }
                     h._connectionClosePending = false;                     h._connectionClosePending = false;
Line 739 
Line 434 
           _entry_mut.unlock();           _entry_mut.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock();           _entry_mut.lock();
           // After enqueue a message and the autoEntryMutex has been released and locked again,  
           // the array of _entries_socket can be changed. The ArrayIterator has be reset with the original _entries_socket.              // After enqueue a message and the autoEntryMutex has been
           entries.reset(_entries_socket);              // released and locked again, the array of _entries can be
               // changed. The ArrayIterator has be reset with the original
               // _entries.
               entries.reset(_entries);
        }        }
     }     }
  
Line 785 
Line 483 
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
     _entry_mut.lock();     _entry_mut.lock();
     // After enqueue a message and the autoEntryMutex has been released and locked again,  
     // the array of _entries_socket can be changed. The ArrayIterator has be reset with the original _entries_socket      struct timeval timeNow;
     entries.reset(_entries_socket);      Time::gettimeofday(&timeNow);
   
       // After enqueue a message and the autoEntryMutex has been released and
       // locked again, the array of _entries can be changed. The ArrayIterator
       // has be reset with the original _entries
       entries.reset(_entries);
  
     if (events == PEGASUS_SOCKET_ERROR)     if (events == PEGASUS_SOCKET_ERROR)
     {     {
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run - errorno = %d has occurred on select.", errno);              "Monitor::run - errorno = %d has occurred on select.", errno));
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the entries structure has been corrupted or that        // the entries structure has been corrupted or that
Line 802 
Line 505 
     }     }
     else if (events)     else if (events)
     {     {
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",              "Monitor::run select event received events = %d, monitoring %d "
            events, _idleEntries);                  "idle entries",
               events, _idleEntries));
        for( int indx = 0; indx < (int)entries.size(); indx++)        for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,              // The Monitor should only look at entries in the table that are
           // owned by the Monitor).              // IDLE (i.e., owned by the Monitor).
           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(entries[indx].socket, &fdread)))              (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, entries[indx].queueId, q);                      indx, entries[indx].queueId, q));
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(entries[indx]._type == Monitor::CONNECTION)                 if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                              "entries[indx].type for indx = %d is "
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                                  "Monitor::CONNECTION",
                               indx));
                    // Do not update the entry just yet. The entry gets updated once                          HTTPConnection *dst =
                    // the request has been read.                              reinterpret_cast<HTTPConnection *>(q);
                    //entries[indx]._status = _MonitorEntry::BUSY;                          dst->_entry_index = indx;
   
                    // If allocate_and_awaken failure, retry on next iteration                          // Update idle start time because we have received some
 /* Removed for PEP 183.                          // data. Any data is good data at this point, and we'll
                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(                          // keep the connection alive, even if we've exceeded
                            (void *)q, _dispatch))                          // the idleConnectionTimeout, which will be checked
                    {                          // when we call closeConnectionOnTimeout() next.
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                          Time::gettimeofday(&dst->_idleStartTime);
                           "Monitor::run: Insufficient resources to process request.");  
                       entries[indx]._status = _MonitorEntry::IDLE;                          // Check for accept pending (ie. SSL handshake pending)
                       return true;                          // or idle connection timeouts for sockets from which
                    }                          // we received data (avoiding extra queue lookup below).
 */                          if (!dst->closeConnectionOnTimeout(&timeNow))
 // Added for PEP 183                          {
                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              "Monitor::_dispatch: entering run() for "
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                                  "indx = %d, queueId = %d, q = %p",
                    dst->_entry_index, dst->_monitor->_entries_socket[dst->_entry_index].queueId, dst);                              dst->_entry_index,
                                   dst->_monitor->
                                       _entries[dst->_entry_index].queueId,
                               dst));
   
                    try                    try
                    {                    {
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
                    {                    {
                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                        "Monitor::_dispatch: exception received");                        "Monitor::_dispatch: exception received");
                    }                    }
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                              "Monitor::_dispatch: exited run() for index %d",
                               dst->_entry_index));
                    // It is possible the entry status may not be set to busy.                          }
                    // The following will fail in that case.  
               // PEGASUS_ASSERT(dst->_monitor->_entries_socket[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);  
            // Once the HTTPConnection thread has set the status value to either  
            // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
            // to the Monitor.  It is no longer permissible to access the connection  
            // or the entry in the _entries_socket table.  
   
                    // The following is not relevant as the worker thread or the  
                    // reader thread will update the status of the entry.  
            //if (dst->_connectionClosePending)  
            //{  
            //  dst->_monitor->_entries_socket[dst->_entry_index]._status = _MonitorEntry::DYING;  
            //}  
            //else  
            //{  
            //  dst->_monitor->_entries_socket[dst->_entry_index]._status = _MonitorEntry::IDLE;  
            //}  
 // end Added for PEP 183  
         }         }
             else if( entries[indx]._type == Monitor::INTERNAL){                      else if (entries[indx]._type == Monitor::INTERNAL)
                       {
             // set ourself to BUSY,             // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                entries[indx]._status = _MonitorEntry::BUSY;                entries[indx]._status = _MonitorEntry::BUSY;
             static char buffer[2];             static char buffer[2];
             Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);                          Sint32 amt =
                               Socket::read(entries[indx].socket,&buffer, 2);
  
             if(amt == PEGASUS_SOCKET_ERROR &&             if(amt == PEGASUS_SOCKET_ERROR &&
                getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED )                getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED )
             {             {
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                       "Monitor::run: Tickler socket got an IO error. "                       "Monitor::run: Tickler socket got an IO error. "
                       "Going to re-create Socket and wait for TCP/IP restart.");                                      "Going to re-create Socket and wait for "
                                       "TCP/IP restart.");
                 uninitializeTickler();                 uninitializeTickler();
                 initializeTickler();                 initializeTickler();
                           }
             } else                          else
             {             {
                 entries[indx]._status = _MonitorEntry::IDLE;                 entries[indx]._status = _MonitorEntry::IDLE;
             }             }
   
         }         }
         else         else
         {         {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                              "Non-connection entry, indx = %d, has been "
                                   "received.",
                               indx));
            int events = 0;            int events = 0;
            events |= SocketMessage::READ;            events |= SocketMessage::READ;
            Message *msg = new SocketMessage(entries[indx].socket, events);                          Message* msg = new SocketMessage(
                               entries[indx].socket, events);
            entries[indx]._status = _MonitorEntry::BUSY;            entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                    _entry_mut.unlock();
            q->enqueue(msg);            q->enqueue(msg);
                    _entry_mut.lock();                    _entry_mut.lock();
            // After enqueue a message and the autoEntryMutex has been released and locked again,  
            // the array of entries can be changed. The ArrayIterator has be reset with the original _entries_socket                          // After enqueue a message and the autoEntryMutex has
            entries.reset(_entries_socket);                          // been released and locked again, the array of
                           // entries can be changed. The ArrayIterator has be
                           // reset with the original _entries
                           entries.reset(_entries);
            entries[indx]._status = _MonitorEntry::IDLE;            entries[indx]._status = _MonitorEntry::IDLE;
         }         }
          }          }
Line 923 
Line 621 
          {          {
          }          }
       }       }
               // else check for accept pending (ie. SSL handshake pending) or
               // idle connection timeouts for sockets from which we did not
               // receive data.
               else if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                   entries[indx]._type == Monitor::CONNECTION)
               {
                   MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
           }
       }
       // else if "events" is zero (ie. select timed out) then we still need
       // to check if there are any pending SSL handshakes that have timed out.
       else
       {
           for (int indx = 0; indx < (int)entries.size(); indx++)
           {
               if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                   entries[indx]._type == Monitor::CONNECTION)
               {
                   MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
        }        }
     }     }
 } }
Line 931 
Line 656 
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
     // set boolean then tickle the server to recognize _stopConnections     // set boolean then tickle the server to recognize _stopConnections
     #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
         !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
     _stopConnections = 2;  
     #else  
     _stopConnections = 1;     _stopConnections = 1;
     #endif  
   
     tickle();     tickle();
  
     if (wait)     if (wait)
Line 952 
Line 671 
 } }
  
  
   
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     SocketHandle socket,     SocketHandle socket,
     Uint32 events,     Uint32 events,
Line 961 
Line 679 
 { {
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
    AutoMutex autoMut(_entry_mut);    AutoMutex autoMut(_entry_mut);
    // Check to see if we need to dynamically grow the _entries_socket array      // Check to see if we need to dynamically grow the _entries array
    // We always want the _entries_socket array to 2 bigger than the      // We always want the _entries array to 2 bigger than the
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries_socket.size();      int size = (int)_entries.size();
    if((int)_solicitSocketCount >= (size-1)){      if ((int)_solicitSocketCount >= (size-1))
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){      {
           for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
           {
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries_socket.append(entry);              _entries.append(entry);
         }         }
    }    }
  
    int index;    int index;
    for(index = 1; index < (int)_entries_socket.size(); index++)      for (index = 1; index < (int)_entries.size(); index++)
    {    {
       try       try
       {       {
          if(_entries_socket[index]._status.get() == _MonitorEntry::EMPTY)              if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
          {          {
             _entries_socket[index].socket = socket;                  _entries[index].socket = socket;
             _entries_socket[index].queueId  = queueId;                  _entries[index].queueId  = queueId;
             _entries_socket[index]._type = type;                  _entries[index]._type = type;
             _entries_socket[index]._status = _MonitorEntry::IDLE;                  _entries[index]._status = _MonitorEntry::IDLE;
  
             return index;             return index;
          }          }
Line 992 
Line 712 
       {       {
       }       }
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful      // decrease the count, if we are here we didn't do anything meaningful
       _solicitSocketCount--;
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
    return -1;    return -1;
   
 } }
  
 void Monitor::unsolicitSocketMessages(SocketHandle socket) void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
  
     /*     /*
         Start at index = 1 because _entries_socket[0] is the tickle entry which never needs          Start at index = 1 because _entries[0] is the tickle entry which
         to be EMPTY;          never needs to be EMPTY;
     */     */
     unsigned int index;     unsigned int index;
     for(index = 1; index < _entries_socket.size(); index++)      for (index = 1; index < _entries.size(); index++)
     {     {
        if(_entries_socket[index].socket == socket)          if (_entries[index].socket == socket)
        {        {
           _entries_socket[index]._status = _MonitorEntry::EMPTY;              _entries[index]._status = _MonitorEntry::EMPTY;
           _entries_socket[index].socket = PEGASUS_INVALID_SOCKET;              _entries[index].socket = PEGASUS_INVALID_SOCKET;
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 1022 
Line 741 
  
     /*     /*
     Dynamic Contraction:     Dynamic Contraction:
     To remove excess entries we will start from the end of the _entries_socket array          To remove excess entries we will start from the end of the _entries
     and remove all entries with EMPTY status until we find the first NON EMPTY.          array and remove all entries with EMPTY status until we find the
     This prevents the positions, of the NON EMPTY entries, from being changed.          first NON EMPTY.  This prevents the positions, of the NON EMPTY
           entries, from being changed.
     */     */
     index = _entries_socket.size() - 1;      index = _entries.size() - 1;
     while(_entries_socket[index]._status.get() == _MonitorEntry::EMPTY){      while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
     if(_entries_socket.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)      {
                 _entries_socket.remove(index);          if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                   _entries.remove(index);
     index--;     index--;
     }     }
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 1039 
Line 760 
 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 { {
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
 // Added for NamedPipe implementation for windows      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)              "q = %p",
    Array<_MonitorEntry>& entry = (dst->_namedPipeConnection)? dst->_monitor->_entries_pipe:dst->_monitor->_entries_socket;          dst->_entry_index,
 #else          dst->_monitor->_entries[dst->_entry_index].queueId,
    Array<_MonitorEntry>& entry = dst->_entries_socket;          dst));
 #endif  
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",  
        dst->_entry_index, entry[dst->_entry_index].queueId, dst);  
    try    try
    {    {
       dst->run(1);       dst->run(1);
    }    }
    catch (...)    catch (...)
    {    {
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exception received");           "Monitor::_dispatch: exception received");
    }    }
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
  
    PEGASUS_ASSERT(entry[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);      PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
           _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection      // Monitor::DYING or Monitor::IDLE, it has returned control of the
    // to the Monitor.  It is no longer permissible to access the connection      // connection to the Monitor.  It is no longer permissible to access the
    // or the entry in the _entries table.      // connection or the entry in the _entries table.
    if (dst->_connectionClosePending)    if (dst->_connectionClosePending)
    {    {
       entry[dst->_entry_index]._status = _MonitorEntry::DYING;          dst->_monitor->_entries[dst->_entry_index]._status =
               _MonitorEntry::DYING;
    }    }
    else    else
    {    {
       entry[dst->_entry_index]._status = _MonitorEntry::IDLE;          dst->_monitor->_entries[dst->_entry_index]._status =
               _MonitorEntry::IDLE;
    }    }
    return 0;    return 0;
 } }
 // Added for NamedPipe implementation for windows  
 #if defined (PEGASUS_OS_TYPE_WINDOWS) &&\  
     !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)  
 //This method is anlogus to solicitSocketMessages. It does the same thing for named Pipes  
 int  Monitor::solicitPipeMessages(  
     NamedPipe namedPipe,  
     Uint32 events,  //not sure what has to change for this enum  
     Uint32 queueId,  
     int type)  
 {  
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");  
    AutoMutex autoMut(_entry_pipe_mut);  
    // Check to see if we need to dynamically grow the _entries array  
    // We always want the _entries array to 2 bigger than the  
    // current connections requested  
   
    _solicitPipeCount++;  // bump the count  
    int size = (int)_entries_pipe.size();  
    if((int)_solicitPipeCount >= (size-1)){  
         for(int i = 0; i < ((int)_solicitPipeCount - (size-1)); i++){  
                 _MonitorEntry entry(0, 0, 0);  
                 _entries_pipe.append(entry);  
         }  
    }  
   
    int index;  
    for(index = 1; index < (int)_entries_pipe.size(); index++)  
    {  
       try  
       {  
          if(_entries_pipe[index]._status.get() == _MonitorEntry::EMPTY)  
          {  
             _entries_pipe[index].socket = NULL;  
             _entries_pipe[index].namedPipe = namedPipe;  
             _entries_pipe[index].namedPipeConnection = true;  
             _entries_pipe[index].queueId  = queueId;  
             _entries_pipe[index]._type = type;  
             _entries_pipe[index]._status = _MonitorEntry::IDLE;  
             return index;  
          }  
       }  
       catch(...)  
       {  
       }  
   
    }  
    _solicitPipeCount--;  // decrease the count, if we are here we didnt do anything meaningful  
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);  
   
    PEG_METHOD_EXIT();  
    return -1;  
   
 }  
   
 //////////////////////////////////////////////////////////////////////////////  
 // Method Name      : unsolicitPipeMessages  
 // Input Parameter  : namedPipe  - type NamedPipe  
 // Return Type      : void  
 //============================================================================  
 // This method is invoked from HTTPAcceptor::handleEnqueue for server  
 // when the CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked  
 // from HTTPAcceptor::destroyConnections method when the CIMServer is shutdown.  
 // For the CIMClient, this is invoked from HTTPConnector::handleEnqueue when the  
 // CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked from  
 // HTTPConnector::disconnect when CIMClient requests a disconnect request.  
 // The list of _MonitorEntry is searched for the matching pipe.  
 // The Handle of the identified is closed and _MonitorEntry for the  
 // requested pipe is removed.  
 ///////////////////////////////////////////////////////////////////////////////  
   
 void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)  
 {  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages():"  
                           <<"(tid:" << Uint32(pegasus_thread_self()) << ")"  
                           << PEGASUS_STD(endl);  
     }  
 #endif  
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");  
     AutoMutex autoMut(_entry_pipe_mut);  
   
     /*  
         Start at index = 1 because _entries[0] is the tickle entry which never needs  
         to be EMPTY;  
     */  
     unsigned int index;  
     for (index = 1; index < _entries_pipe.size(); index++)  
     {  
         if (_entries_pipe[index].namedPipe.getPipe() == namedPipe.getPipe())  
         {  
             _entries_pipe[index]._status = _MonitorEntry::EMPTY;  
             // Ensure that the client has read the data  
             ::FlushFileBuffers (namedPipe.getPipe());  
             //Disconnect to release the pipe. This doesn't release Pipe Handle  
             ::DisconnectNamedPipe (_entries_pipe[index].namedPipe.getPipe());  
             // Must use CloseHandle to Close Pipe  
             ::CloseHandle(_entries_pipe[index].namedPipe.getPipe());  
             _entries_pipe[index].namedPipe.disconnect();  
             _solicitPipeCount--;  
             break;  
         }  
     }  
   
     /*  
         Dynamic Contraction:  
         To remove excess entries we will start from the end of the _entries array  
         and remove all entries with EMPTY status until we find the first NON EMPTY.  
         This prevents the positions, of the NON EMPTY entries, from being changed.  
     */  
     index = _entries_pipe.size() - 1;  
     while (_entries_pipe[index]._status.get() == _MonitorEntry::EMPTY  
         && index > 0)  
     {  
         if ((_entries_pipe[index].namedPipe.getPipe() == namedPipe.getPipe()) ||  
             (_entries_pipe.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))  
         {  
             _entries_pipe.remove(index);  
         }  
         index--;  
     }  
     PEG_METHOD_EXIT();  
 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG  
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages():"  
                           <<(tid:" << Uint32(pegasus_thread_self()) << ")"  
                           << PEGASUS_STD(endl);  
     }  
 #endif  
 }  
   
 #endif  
   
   
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.115.14.3  
changed lines
  Added in v.1.116.4.2

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2