(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.82 and 1.103.10.24

version 1.82, 2004/12/15 14:12:42 version 1.103.10.24, 2006/09/12 09:17:16
Line 1 
Line 1 
 //%2004////////////////////////////////////////////////////////////////////////  //%2006////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
Line 6 
Line 6 
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; Symantec Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 31 
Line 35 
 //              Amit K Arora (Bug#1153) amita@in.ibm.com //              Amit K Arora (Bug#1153) amita@in.ibm.com
 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 //              Sushma Fernandes (sushma@hp.com) for Bug#2057 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
   //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
   //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
Line 44 
Line 50 
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
   #include "ArrayIterator.h"
   
   //const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \ #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 of <winsock.h>. It may have been indirectly included (e.g., by including \ of <winsock.h>. It may have been indirectly included (e.g., by including \
 <windows.h>). Finthe inclusion of that header which is visible to this \  <windows.h>). Find inclusion of that header which is visible to this \
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \ compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 otherwise, less than 64 clients (the default) will be able to connect to the \ otherwise, less than 64 clients (the default) will be able to connect to the \
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM." CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
Line 70 
Line 79 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   static AtomicInt _connections(0);
  
 static AtomicInt _connections = 0;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   Mutex Monitor::_cout_mut;
 static struct timeval create_time = {0, 1};  #endif
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
   
 ////////////////////////////////////////////////////////////////////////////////  
 //  
 // MonitorRep  
 //  
 ////////////////////////////////////////////////////////////////////////////////  
  
 struct MonitorRep  // Added for NamedPipe implementation for windows
 {  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
     fd_set rd_fd_set;   #define PIPE_INCREMENT 1
     fd_set wr_fd_set;  #endif
     fd_set ex_fd_set;  
     fd_set active_rd_fd_set;  
     fd_set active_wr_fd_set;  
     fd_set active_ex_fd_set;  
 };  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 100 
Line 97 
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   
 Monitor::Monitor() Monitor::Monitor()
    : _module_handle(0),     : _stopConnections(0),
      _controller(0),  
      _async(false),  
      _stopConnections(0),  
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
      _solicitSocketCount(0),      _solicitSocketCount(0),
      _tickle_client_socket(-1),      _tickle_client_socket(-1),
      _tickle_server_socket(-1),      _tickle_server_socket(-1),
      _tickle_peer_socket(-1)      _tickle_peer_socket(-1)
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     Socket::initializeInterface();  
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {     {
        _MonitorEntry entry(0, 0, 0);          AutoMutex automut(Monitor::_cout_mut);
        _entries.append(entry);          PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }  
 } }
   #endif
 Monitor::Monitor(Boolean async)  
    : _module_handle(0),  
      _controller(0),  
      _async(async),  
      _stopConnections(0),  
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 {  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler     // setup the tickler
Line 156 
Line 127 
        _MonitorEntry entry(0, 0, 0);        _MonitorEntry entry(0, 0, 0);
        _entries.append(entry);        _entries.append(entry);
     }     }
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                   "deregistering with module controller");  
   
     if(_module_handle != NULL)  
     {     {
        _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);          AutoMutex automut(Monitor::_cout_mut);
        _controller = 0;          PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
        delete _module_handle;  
     }     }
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");  #endif
   
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{     try{
Line 196 
Line 168 
     Socket::uninitializeInterface();     Socket::uninitializeInterface();
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "returning from monitor destructor");                   "returning from monitor destructor");
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 void Monitor::initializeTickler(){ void Monitor::initializeTickler(){
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
     /*     /*
        NOTE: On any errors trying to        NOTE: On any errors trying to
              setup out tickle connection,              setup out tickle connection,
Line 208 
Line 192 
     /* setup the tickle server/listener */     /* setup the tickle server/listener */
  
     // get a socket for the server side     // get a socket for the server side
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         //handle error         //handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
Line 222 
Line 206 
  
     // initialize the address     // initialize the address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 232 
Line 213 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,     if((::bind(_tickle_server_socket,
                (struct sockaddr *)&_tickle_server_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                sizeof(_tickle_server_addr))) < 0){                sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
   #ifdef PEGASUS_OS_ZOS
       MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
                                    "Received error:$0 while binding the internal socket.",strerror(errno));
   #else
         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
                                  "Received error number $0 while binding the internal socket.",                                  "Received error number $0 while binding the internal socket.",
 #if !defined(PEGASUS_OS_TYPE_WINDOWS) #if !defined(PEGASUS_OS_TYPE_WINDOWS)
Line 250 
Line 234 
 #else #else
                                  WSAGetLastError());                                  WSAGetLastError());
 #endif #endif
   #endif
         throw Exception(parms);         throw Exception(parms);
     }     }
  
Line 268 
Line 253 
  
     // make sure we have the correct socket for our server     // make sure we have the correct socket for our server
     int sock = ::getsockname(_tickle_server_socket,     int sock = ::getsockname(_tickle_server_socket,
                              (struct sockaddr*)&_tickle_server_addr,                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                              &_addr_size);                              &_addr_size);
     if(sock < 0){     if(sock < 0){
         // handle error         // handle error
Line 285 
Line 270 
     /* set up the tickle client/connector */     /* set up the tickle client/connector */
  
     // get a socket for our tickle client     // get a socket for our tickle client
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
                          "Received error number $0 while creating the internal client socket.",                          "Received error number $0 while creating the internal client socket.",
Line 299 
Line 284 
  
     // setup the address of the client     // setup the address of the client
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 309 
Line 291 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_client_addr.sin_family = PF_INET;     _tickle_client_addr.sin_family = PF_INET;
     _tickle_client_addr.sin_port = 0;     _tickle_client_addr.sin_port = 0;
  
     // bind socket to client side     // bind socket to client side
     if((::bind(_tickle_client_socket,     if((::bind(_tickle_client_socket,
                (struct sockaddr*)&_tickle_client_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                sizeof(_tickle_client_addr))) < 0){                sizeof(_tickle_client_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
Line 330 
Line 311 
  
     // connect to server side     // connect to server side
     if((::connect(_tickle_client_socket,     if((::connect(_tickle_client_socket,
                   (struct sockaddr*)&_tickle_server_addr,                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                   sizeof(_tickle_server_addr))) < 0){                   sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
Line 345 
Line 326 
  
     /* set up the slave connection */     /* set up the slave connection */
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
     pegasus_sleep(1);     pegasus_sleep(1);
  
     // this call may fail, we will try a max of 20 times to establish this peer connection     // this call may fail, we will try a max of 20 times to establish this peer connection
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,     if((_tickle_peer_socket = ::accept(_tickle_server_socket,
                                        (struct sockaddr*)&_tickle_peer_addr,              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                        &peer_size)) < 0){                                        &peer_size)) < 0){
 #if !defined(PEGASUS_OS_TYPE_WINDOWS) #if !defined(PEGASUS_OS_TYPE_WINDOWS)
         // Only retry on non-windows platforms.         // Only retry on non-windows platforms.
Line 361 
Line 342 
           {           {
             pegasus_sleep(1);             pegasus_sleep(1);
             _tickle_peer_socket = ::accept(_tickle_server_socket,             _tickle_peer_socket = ::accept(_tickle_server_socket,
                                            (struct sockaddr*)&_tickle_peer_addr,                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                            &peer_size);                                            &peer_size);
             retries++;             retries++;
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
Line 384 
Line 365 
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
     _entries.append(entry);     _entries.append(entry);
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 void Monitor::tickle(void) void Monitor::tickle(void)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
     static char _buffer[] =     static char _buffer[] =
     {     {
       '0','0'       '0','0'
Line 397 
Line 390 
     Socket::disableBlocking(_tickle_client_socket);     Socket::disableBlocking(_tickle_client_socket);
     Socket::write(_tickle_client_socket,&_buffer, 2);     Socket::write(_tickle_client_socket,&_buffer, 2);
     Socket::enableBlocking(_tickle_client_socket);     Socket::enableBlocking(_tickle_client_socket);
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ) void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
Line 407 
Line 406 
  
 Boolean Monitor::run(Uint32 milliseconds) Boolean Monitor::run(Uint32 milliseconds)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
  
     Boolean handled_events = false;     Boolean handled_events = false;
     int i = 0;     int i = 0;
Line 416 
Line 421 
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     _entry_mut.lock(pegasus_thread_self());      AutoMutex autoEntryMutex(_entry_mut);
   
       ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections == 1)      if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)          for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)                  if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||                     if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.value() == _MonitorEntry::DYING )                          entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                         entries[indx]._status = _MonitorEntry::EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       _entries[indx]._status = _MonitorEntry::DYING;                        entries[indx]._status = _MonitorEntry::DYING;
                    }                    }
                }                }
            }            }
Line 445 
Line 452 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
                          const _MonitorEntry &entry = _entries[indx];          const _MonitorEntry &entry = entries[indx];
        if ((entry._status.value() == _MonitorEntry::DYING) &&         if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
              (entry._type == Monitor::ACCEPTOR)
   #else
              (entry._type == Monitor::CONNECTION)
   #endif
              )
        {        {
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
           PEGASUS_ASSERT(q != 0);           PEGASUS_ASSERT(q != 0);
Line 466 
Line 478 
  
                                         if (h._responsePending == true)                                         if (h._responsePending == true)
                                         {                                         {
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                 if (!entry.namedPipeConnection)
                 {
   #endif
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
                                                                                                         "Ignoring connection delete request because "                                                                                                         "Ignoring connection delete request because "
                                                                                                         "responses are still pending. "                                                                                                         "responses are still pending. "
                                                                                                         "connection=0x%p, socket=%d\n",                                                                                                         "connection=0x%p, socket=%d\n",
                                                                                                         (void *)&h, h.getSocket());                                                                                                         (void *)&h, h.getSocket());
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                 }
                 else
                 {
                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
                         "Ignoring connection delete request because "
                         "responses are still pending. "
                         "connection=0x%p, NamedPipe=%d\n",
                     (void *)&h, h.getNamedPipe().getPipe());
                 }
   #endif
                                                 continue;                                                 continue;
                                         }                                         }
                                         h._connectionClosePending = false;                                         h._connectionClosePending = false;
           MessageQueue &o = h.get_owner();           MessageQueue &o = h.get_owner();
           Message* message= new CloseConnectionMessage(entry.socket);            Message* message;
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
             if (!entry.namedPipeConnection)
             {
   #endif
                 message= new CloseConnectionMessage(entry.socket);
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
             }
             else
             {
                 message= new CloseConnectionMessage(entry.namedPipe);
   
             }
   #endif
           message->dest = o.getQueueId();           message->dest = o.getQueueId();
  
           // HTTPAcceptor is responsible for closing the connection.           // HTTPAcceptor is responsible for closing the connection.
Line 484 
Line 531 
           // Once HTTPAcceptor completes processing of the close           // Once HTTPAcceptor completes processing of the close
           // connection, the lock is re-requested and processing of           // connection, the lock is re-requested and processing of
           // the for loop continues.  This is safe with the current           // the for loop continues.  This is safe with the current
           // implementation of the _entries object.  Note that the            // implementation of the entries object.  Note that the
           // loop condition accesses the _entries.size() on each            // loop condition accesses the entries.size() on each
           // iteration, so that a change in size while the mutex is           // iteration, so that a change in size while the mutex is
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();            autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());            autoEntryMutex.lock();
             // After enqueue a message and the autoEntryMutex has been released and locked again,
             // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
             entries.reset(_entries);
        }        }
     }     }
  
Line 504 
Line 554 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     int maxSocketCurrentPass = 0;      //Array<HANDLE> pipeEventArray;
     for( int indx = 0; indx < (int)_entries.size(); indx++)          PEGASUS_SOCKET maxSocketCurrentPass = 0;
       int indx;
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
       //This array associates named pipe connections to their place in [indx]
       //in the entries array. The value in portion zero of the array is the
       //index of the fist named pipe connection in the entries array
       Array <Uint32> indexPipeCountAssociator;
       int pipeEntryCount=0;
       int MaxPipes = PIPE_INCREMENT;
       HANDLE* hEvents = new HANDLE[PIPE_INCREMENT];
   #endif
   
       // This loop takes care of setting the namedpipe which has to be used from the list....
       for( indx = 0; indx < (int)entries.size(); indx++)
       {
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
           if(entries[indx].namedPipeConnection)
     {     {
        if(maxSocketCurrentPass < _entries[indx].socket)              //entering this clause mean that a Named Pipe connection is at entries[indx]
           maxSocketCurrentPass = _entries[indx].socket;              // Initalizing to false since there is no event triggerred at this point.
               //Which will be set true on selection of the pipe to be used.
               entries[indx].pipeSet = false;
  
        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)              // We can Keep a counter in the Monitor class for the number of named pipes ...
               //  Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )
               if (pipeEntryCount >= MaxPipes)
               {
                   // MaxPipe is incremented by PIPE_INCREMENT and an event is created. This is to maintain
                   // the number of hEvents passed to WaitForMultipleObjects() through 2nd parameter same as
                   // the first parameter of the same call..
                   MaxPipes += PIPE_INCREMENT;
                   HANDLE* temp_hEvents = new HANDLE[MaxPipes];
   
                   for (Uint32 i =0;i<pipeEntryCount;i++)
                   {
                       temp_hEvents[i] = hEvents[i];
                   }
   
                   delete [] hEvents;
   
                   hEvents = temp_hEvents;
               }
               //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);
               hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap()->hEvent;
               indexPipeCountAssociator.append(indx);
               pipeEntryCount++;
           }
           else
           {
   #endif
               if(maxSocketCurrentPass < entries[indx].socket)
               maxSocketCurrentPass = entries[indx].socket;
   
               if(entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
           _idleEntries++;           _idleEntries++;
           FD_SET(_entries[indx].socket, &fdread);                  FD_SET(entries[indx].socket, &fdread);
        }        }
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
           }
   #endif
     }     }
  
     /*     /*
Line 523 
Line 629 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      autoEntryMutex.unlock();
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);  
    _entry_mut.lock(pegasus_thread_self());      int events;
       int pEvents;
   
   #ifdef PEGASUS_OS_TYPE_WINDOWS
   
       events = 0;
       DWORD dwWait=NULL;
       pEvents = 0;
   
   // Added for NamedPipe implementation for windows
   #ifndef PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET
   # ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           cout << "Monitor::run - Calling WaitForMultipleObjects\n";
       }
   # endif
   
       //this should be in a try block
       dwWait = WaitForMultipleObjects(
                    MaxPipes,
                    hEvents,               //ABB:- array of event objects
                    FALSE,                 // ABB:-does not wait for all
                    milliseconds);        //ABB:- timeout value   //WW this may need be shorter
   
       if(dwWait == WAIT_TIMEOUT)
       {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
           {
               AutoMutex automut(Monitor::_cout_mut);
               cout << "Wait WAIT_TIMEOUT\n";
               cout << "Monitor::run before the select in TIMEOUT clause events = " << events << endl;
           }
   #endif
   #endif
           /********* NOTE
            this time (tv) combined with the waitForMulitpleObjects timeout is
            too long it will cause the client side to time out
            ******************/
           events = select(0, &fdread, NULL, NULL, &tv);
   
   // Added for NamedPipe implementation for windows
   #ifndef PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET
   # ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
           {
               AutoMutex automut(Monitor::_cout_mut);
              cout << "Monitor::run after the select in TIMEOUT clause events = " << events << endl;
           }
   # endif
       }
       else if (dwWait == WAIT_FAILED)
       {
           if (GetLastError() == 6) //WW this may be too specific
           {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
               AutoMutex automut(Monitor::_cout_mut);
               cout << "Monitor::run about to call 'select since waitForMultipleObjects failed\n";
   }
   #endif
           /********* NOTE
            this time (tv) combined with the waitForMulitpleObjects timeout is
            too long it will cause the client side to time out
            ******************/
               events = select(0, &fdread, NULL, NULL, &tv);
           }
           else
           {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
               AutoMutex automut(Monitor::_cout_mut);
               cout << "Wait Failed returned\n";
               cout << "failed with " << GetLastError() << "." << endl;
   }
   #endif
               pEvents = -1;
               return false;
           }
       }
       else
       {
           int pCount = dwWait - WAIT_OBJECT_0;  // determines which pipe
           {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
               AutoMutex automut(Monitor::_cout_mut);
               cout << "Monitor::run WaitForMultiPleObject returned activity pipeEntrycount is "
                    << pipeEntryCount << " this is the type "
                    << entries[indexPipeCountAssociator[pCount]]._type
                    << " this is index " << indexPipeCountAssociator[pCount] << endl;
   }
   #endif
                  /* There is a timing problem here sometimes the write in HTTPConnection is
                not all the way done (has not _monitor->setState (_entry_index, _MonitorEntry::IDLE) )
                therefore that should be done here if it is not done already*/
   
               if (entries[indexPipeCountAssociator[pCount]]._status.get() != _MonitorEntry::IDLE)
               {
                   this->setState(indexPipeCountAssociator[pCount], _MonitorEntry::IDLE);
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
                   AutoMutex automut(Monitor::_cout_mut);
                   cout << "setting state of index " << indexPipeCountAssociator[pCount]  << " to IDLE" << endl;
   }
   #endif
               }
           }
           pEvents = 1;
               //this statment gets the pipe entry that has triggered an event
           entries[indexPipeCountAssociator[pCount]].pipeSet = true;
       }
   #endif
   
   #else
       events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
   #endif
       autoEntryMutex.lock();
       // After enqueue a message and the autoEntryMutex has been released and locked again,
       // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
       entries.reset(_entries);
  
   // Added for NamedPipe implementation for windows
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
       if(pEvents == -1)
       {
           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
             "Monitor::run - errorno = %d has occurred on select.",GetLastError() );
          // The EBADF error indicates that one or more or the file
          // descriptions was not valid. This could indicate that
          // the entries structure has been corrupted or that
          // we have a synchronization error.
   
           // We need to generate an assert  here...
          PEGASUS_ASSERT(GetLastError()!= EBADF);
       }
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
 #else #else
     if(events == -1)     if(events == -1)
Line 537 
Line 775 
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the _entries structure has been corrupted or that         // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
     }     }
     else if (events)      else if ((events)||(pEvents))
     {     {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
           {
               AutoMutex automut(Monitor::_cout_mut);
               cout << "IN Monior::run events= " << events << " pEvents= " << pEvents<< endl;
           }
   #endif
   
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)_entries.size(); indx++)          for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
               //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl;
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&              // cout << endl << " status of entry " << indx << " is " << entries[indx]._status.get() << endl;
              (FD_ISSET(_entries[indx].socket, &fdread)))              if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                  ((FD_ISSET(entries[indx].socket, &fdread)&& (events))
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                  || (entries[indx].namedPipeConnection && entries[indx].pipeSet && (pEvents))
   #endif
                 ))
               {
   
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                   {
                       AutoMutex automut(Monitor::_cout_mut);
                       cout <<"Monitor::run - index  " << indx << " just got into 'if' statement" << endl;
                   }
   #endif
                   MessageQueue *q;
                   try
                   {
                       q = MessageQueue::lookup(entries[indx].queueId);
                   }
                   catch (Exception e)
                   {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);                      AutoMutex automut(Monitor::_cout_mut);
                       cout << " this is what lookup gives - " << e.getMessage() << endl;
   }
   #endif
                       exit(1);
                   }
                   catch(...)
                   {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
                       AutoMutex automut(Monitor::_cout_mut);
                       cout << "MessageQueue::lookup gives strange exception " << endl;
   }
   #endif
                       exit(1);
                   }
   
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, _entries[indx].queueId, q);                    indx, entries[indx].queueId, q);
   
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(_entries[indx]._type == Monitor::CONNECTION)  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                       {
                           AutoMutex automut(Monitor::_cout_mut);
                           cout <<" this is the type " << entries[indx]._type
                                <<" for index " << indx << endl;
                           cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION"
                                << endl;
                       }
   #endif
                       if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
                           AutoMutex automut(Monitor::_cout_mut);
                           cout << "In Monitor::run Monitor::CONNECTION clause" << endl;
   }
   #endif
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                              "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                    // Do not update the entry just yet. The entry gets updated once
                    // the request has been read.                    // the request has been read.
                    //_entries[indx]._status = _MonitorEntry::BUSY;                     //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                    // If allocate_and_awaken failure, retry on next iteration
 /* Removed for PEP 183. /* Removed for PEP 183.
Line 579 
Line 879 
                    {                    {
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                           "Monitor::run: Insufficient resources to process request.");                           "Monitor::run: Insufficient resources to process request.");
                       _entries[indx]._status = _MonitorEntry::IDLE;                        entries[indx]._status = _MonitorEntry::IDLE;
                       _entry_mut.unlock();  
                       return true;                       return true;
                    }                    }
 */ */
Line 589 
Line 888 
                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                      /*In the case of named Pipes, the request has already been read from the pipe
                      therefor this section passed the request data to the HTTPConnection
                      NOTE: not sure if this would be better suited in a separate private method
                      */
                           dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needed
   #endif
   
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                      {
                          AutoMutex automut(Monitor::_cout_mut);
                          cout << "In Monitor::run after dst->setNamedPipe string read is " <<  entries[indx].namedPipe.raw << endl;
                      }
   #endif
                    try                    try
                    {                    {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                               {
                                   AutoMutex automut(Monitor::_cout_mut);
                                   cout << "In Monitor::run about to call 'dst->run(1)' "  << endl;
                               }
   #endif
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
Line 598 
Line 919 
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                         "Monitor::_dispatch: exception received");                         "Monitor::_dispatch: exception received");
                    }                    }
   
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
  
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                           if (entries[indx].namedPipeConnection)
                           {
                               entries[indx]._type = Monitor::ACCEPTOR;
                           }
   #endif
                    // It is possible the entry status may not be set to busy.                    // It is possible the entry status may not be set to busy.
                    // The following will fail in that case.                    // The following will fail in that case.
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                     // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
                    // Once the HTTPConnection thread has set the status value to either                    // Once the HTTPConnection thread has set the status value to either
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
                    // to the Monitor.  It is no longer permissible to access the connection                    // to the Monitor.  It is no longer permissible to access the connection
Line 621 
Line 950 
                    //}                    //}
 // end Added for PEP 183 // end Added for PEP 183
                 }                 }
                 else if( _entries[indx]._type == Monitor::INTERNAL){                      else if( entries[indx]._type == Monitor::INTERNAL)
                       {
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                         _entries[indx]._status == _MonitorEntry::BUSY;  {
                           AutoMutex automut(Monitor::_cout_mut);
                           cout << endl << " in - entries[indx]._type == Monitor::INTERNAL- "
                                << endl << endl;
   }
   #endif
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                           if (!entries[indx].namedPipeConnection)
                           {
   #endif
                               entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                              Socket::disableBlocking(entries[indx].socket);
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                              Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);                              Socket::enableBlocking(entries[indx].socket);
                         _entries[indx]._status == _MonitorEntry::IDLE;                              entries[indx]._status = _MonitorEntry::IDLE;
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                           }
   #endif
                 }                 }
                 else                 else
                 {                 {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                           {
                               AutoMutex automut(Monitor::_cout_mut);
                               cout << "In Monitor::run else clause of CONNECTION if statments" << endl;
                           }
   #endif
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                          Message *msg;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);  
                    _entries[indx]._status = _MonitorEntry::BUSY;  
                    _entry_mut.unlock();  
  
                    q->enqueue(msg);  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                    _entries[indx]._status = _MonitorEntry::IDLE;                          {
                    return true;                              AutoMutex automut(Monitor::_cout_mut);
                               cout << " In Monitor::run Just before checking if NamedPipeConnection"
                                    << "for Index "<<indx<< endl;
                 }                 }
   #endif
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                           if (entries[indx].namedPipeConnection)
                           {
                               if(!entries[indx].namedPipe.isConnectionPipe)
                               {
                                   /*if we enter this clasue it means that the named pipe that we are
                                     looking at has recived a connection but is not the pipe we get
                                     connection requests over. Therefore we need to change the _type
                                                                     to CONNECTION and wait for a CIM Operations request
                                                                    */
                                   entries[indx]._type = Monitor::CONNECTION;
   
                                   /* This is a test  - this shows that the read file needs to be done
                                      before we call wiatForMultipleObjects*/
   
                                   memset(entries[indx].namedPipe.raw,'\0',NAMEDPIPE_MAX_BUFFER_SIZE);
                                   BOOL rc = ::ReadFile(
                                       entries[indx].namedPipe.getPipe(),
                                       &entries[indx].namedPipe.raw,
                                       NAMEDPIPE_MAX_BUFFER_SIZE,
                                       &entries[indx].namedPipe.bytesRead,
                                       entries[indx].namedPipe.getOverlap());
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                                   {
                                       AutoMutex automut(Monitor::_cout_mut);
                                       cout << "Monitor::run just called read on index " << indx << endl;
              }              }
              catch(...)  #endif
                                   if(!rc)
              {              {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
              AutoMutex automut(Monitor::_cout_mut);
              cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;
              }              }
              handled_events = true;  #endif
           }           }
                                   continue;
        }        }
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                  {
                      AutoMutex automut(Monitor::_cout_mut);
                       cout << " In Monitor::run about to create a Pipe message" << endl;
     }     }
     _entry_mut.unlock();  #endif
     return(handled_events);                              events |= NamedPipeMessage::READ;
                               msg = new NamedPipeMessage(entries[indx].namedPipe, events);
 } }
                           else
 void Monitor::stopListeningForConnections(Boolean wait)  
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     // set boolean then tickle the server to recognize _stopConnections  
     _stopConnections = 1;  
     tickle();  
   
     if (wait)  
     {     {
       // Wait for the monitor to notice _stopConnections.  Otherwise the                 AutoMutex automut(Monitor::_cout_mut);
       // caller of this function may unbind the ports while the monitor                 cout << " In Monitor::run ..its a socket message" << endl;
       // is still accepting connections on them.                 }
       try  #endif
                               events |= SocketMessage::READ;
                               msg = new SocketMessage(entries[indx].socket, events);
   #endif
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                           }
   #endif
                           entries[indx]._status = _MonitorEntry::BUSY;
                           autoEntryMutex.unlock();
                           q->enqueue(msg);
                           autoEntryMutex.lock();
              // After enqueue a message and the autoEntryMutex has been released and locked again,
              // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
                           entries.reset(_entries);
                           entries[indx]._status = _MonitorEntry::IDLE;
   
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
                      {
                          AutoMutex automut(Monitor::_cout_mut);
                          PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
                      }
   #endif
                           return true;
                       }
                   }
                   catch(...)
         {         {
           _stopConnectionsSem.time_wait(10000);  
         }         }
       catch (TimeOut &)                  handled_events = true;
               }
           }
       }
   
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
       return(handled_events);
   }
   
   void Monitor::stopListeningForConnections(Boolean wait)
   {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
         {         {
           // The monitor is probably busy processng a very long request, and is          AutoMutex automut(Monitor::_cout_mut);
           // not accepting connections.  Let the caller unbind the ports.          PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
         }         }
   #endif
       PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
       // set boolean then tickle the server to recognize _stopConnections
       _stopConnections = 1;
       tickle();
   
       if (wait)
       {
         // Wait for the monitor to notice _stopConnections.  Otherwise the
         // caller of this function may unbind the ports while the monitor
         // is still accepting connections on them.
         _stopConnectionsSem.wait();
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     Sint32 socket,      PEGASUS_SOCKET socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)     int type)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
    AutoMutex autoMut(_entry_mut);    AutoMutex autoMut(_entry_mut);
    // Check to see if we need to dynamically grow the _entries array    // Check to see if we need to dynamically grow the _entries array
Line 699 
Line 1147 
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
    if(_solicitSocketCount >= (size-1)){     if((int)_solicitSocketCount >= (size-1)){
         for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){          for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries.append(entry);                 _entries.append(entry);
         }         }
Line 711 
Line 1159 
    {    {
       try       try
       {       {
          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
             _entries[index]._type = type;             _entries[index]._type = type;
             _entries[index]._status = _MonitorEntry::IDLE;             _entries[index]._status = _MonitorEntry::IDLE;
  
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
               {
                   AutoMutex automut(Monitor::_cout_mut);
                   PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
               }
   #endif
             return index;             return index;
          }          }
       }       }
Line 727 
Line 1181 
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
      {
          AutoMutex automut(Monitor::_cout_mut);
          PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
      }
   #endif
    return -1;    return -1;
  
 } }
  
 void Monitor::unsolicitSocketMessages(Sint32 socket)  void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
Line 741 
Line 1207 
         Start at index = 1 because _entries[0] is the tickle entry which never needs         Start at index = 1 because _entries[0] is the tickle entry which never needs
         to be EMPTY;         to be EMPTY;
     */     */
     int index;      unsigned int index;
     for(index = 1; index < _entries.size(); index++)     for(index = 1; index < _entries.size(); index++)
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;           _entries[index]._status = _MonitorEntry::EMPTY;
           _entries[index].socket = -1;            _entries[index].socket = PEGASUS_INVALID_SOCKET;
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 760 
Line 1226 
         This prevents the positions, of the NON EMPTY entries, from being changed.         This prevents the positions, of the NON EMPTY entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status == _MonitorEntry::EMPTY){      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
 } }
  
 // Note: this is no longer called with PEP 183. // Note: this is no longer called with PEP 183.
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
       {
           AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
       }
   #endif
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
Line 788 
Line 1265 
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
Line 805 
Line 1282 
    return 0;    return 0;
 } }
  
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
 ////************************* monitor 2 *****************************////  //This method is anlogus to solicitSocketMessages. It does the same thing for named Pipes
 ////************************* monitor 2 *****************************////  int  Monitor::solicitPipeMessages(
 ////************************* monitor 2 *****************************////      NamedPipe namedPipe,
 ////************************* monitor 2 *****************************////      Uint32 events,  //not sure what has to change for this enum
 ////************************* monitor 2 *****************************////      Uint32 queueId,
 ////************************* monitor 2 *****************************////      int type)
 ////************************* monitor 2 *****************************////  
   
   
   
   
   
 m2e_rep::m2e_rep(void)  
   :Base(), state(IDLE)  
   
 {  
 }  
   
 m2e_rep::m2e_rep(monitor_2_entry_type _type,  
                  pegasus_socket _sock,  
                  void* _accept,  
                  void* _dispatch)  
   : Base(), type(_type), state(IDLE), psock(_sock),  
     accept_parm(_accept), dispatch_parm(_dispatch)  
 {  
   
 }  
   
 m2e_rep::~m2e_rep(void)  
 {  
 }  
   
 m2e_rep::m2e_rep(const m2e_rep& r)  
   : Base()  
 {  
   if(this != &r){  
     type = r.type;  
     psock = r.psock;  
     accept_parm = r.accept_parm;  
     dispatch_parm = r.dispatch_parm;  
     state = IDLE;  
   
   }  
 }  
   
   
 m2e_rep& m2e_rep::operator =(const m2e_rep& r)  
 {  
   if(this != &r) {  
     type = r.type;  
     psock = r.psock;  
     accept_parm = r.accept_parm;  
     dispatch_parm = r.dispatch_parm;  
     state = IDLE;  
   }  
   return *this;  
 }  
   
 Boolean m2e_rep::operator ==(const m2e_rep& r)  
 {  
   if(this == &r)  
     return true;  
   return false;  
 }  
   
 Boolean m2e_rep::operator ==(void* r)  
 {  
   if((void*)this == r)  
     return true;  
   return false;  
 }  
   
 m2e_rep::operator pegasus_socket() const  
 {  
   return psock;  
 }  
   
   
 monitor_2_entry::monitor_2_entry(void)  
 {  
   _rep = new m2e_rep();  
 }  
   
 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,  
                                  monitor_2_entry_type _type,  
                                  void* _accept_parm, void* _dispatch_parm)  
 {  
   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);  
 }  
   
 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)  
 {  
   if(this != &e){  
     Inc(this->_rep = e._rep);  
   }  
 }  
   
 monitor_2_entry::~monitor_2_entry(void)  
 {  
   
   Dec(_rep);  
 }  
   
 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)  
 {  
   if(this != &e){  
     Dec(_rep);  
     Inc(this->_rep = e._rep);  
   }  
   return *this;  
 }  
   
 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const  
 {  
   if(this == &me)  
     return true;  
   return false;  
 }  
   
 Boolean monitor_2_entry::operator ==(void* k) const  
 {  
   if((void *)this == k)  
     return true;  
   return false;  
 }  
   
   
 monitor_2_entry_type monitor_2_entry::get_type(void) const  
 {  
   return _rep->type;  
 }  
   
 void monitor_2_entry::set_type(monitor_2_entry_type t)  
 {  
   _rep->type = t;  
 }  
   
   
 monitor_2_entry_state  monitor_2_entry::get_state(void) const  
 {  
   return (monitor_2_entry_state) _rep->state.value();  
 }  
   
 void monitor_2_entry::set_state(monitor_2_entry_state t)  
 {  
   _rep->state = t;  
 }  
   
 void* monitor_2_entry::get_accept(void) const  
 {  
   return _rep->accept_parm;  
 }  
   
 void monitor_2_entry::set_accept(void* a)  
 {  
   _rep->accept_parm = a;  
 }  
   
   
 void* monitor_2_entry::get_dispatch(void) const  
 {  
   return _rep->dispatch_parm;  
 }  
   
 void monitor_2_entry::set_dispatch(void* a)  
 {  
   _rep->dispatch_parm = a;  
 }  
   
 pegasus_socket monitor_2_entry::get_sock(void) const  
 { {
   return _rep->psock;     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
 }  
  
      AutoMutex autoMut(_entry_mut);
 void monitor_2_entry::set_sock(pegasus_socket& s)     // Check to see if we need to dynamically grow the _entries array
      // We always want the _entries array to 2 bigger than the
      // current connections requested
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
 { {
   _rep->psock = s;     AutoMutex automut(Monitor::_cout_mut);
      PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
 } }
   
 //static monitor_2* _m2_instance;  
   
 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);  
   
 monitor_2::monitor_2(void)  
   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),  
     _ready(true, 0), _die(0), _requestCount(0)  
 {  
   try {  
   
     bsd_socket_factory _factory;  
   
     // set up the listener/acceptor  
     pegasus_socket temp = pegasus_socket(&_factory);  
   
     temp.socket(PF_INET, SOCK_STREAM, 0);  
     // initialize the address  
     memset(&_tickle_addr, 0, sizeof(_tickle_addr));  
 #ifdef PEGASUS_OS_ZOS  
     _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
 #endif  
     _tickle_addr.sin_family = PF_INET;  
     _tickle_addr.sin_port = 0;  
   
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);  
   
     temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));  
     temp.listen(3);  
     temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);  
   
     // set up the connector  
   
     pegasus_socket tickler = pegasus_socket(&_factory);  
     tickler.socket(PF_INET, SOCK_STREAM, 0);  
     struct sockaddr_in _addr;  
     memset(&_addr, 0, sizeof(_addr));  
 #ifdef PEGASUS_OS_ZOS  
     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #endif #endif
     _addr.sin_family = PF_INET;  
     _addr.sin_port = 0;  
     tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));  
     tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));  
   
     _tickler.set_sock(tickler);  
     _tickler.set_type(INTERNAL);  
     _tickler.set_state(BUSY);  
   
     struct sockaddr_in peer;  
     memset(&peer, 0, sizeof(peer));  
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);  
   
     pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);  
   
     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);  
   
 // No need to set _tickle's state as BUSY, since monitor_2::run() now  
 // does a select only on sockets which are in IDLE (default) state.  
 //  _tickle->set_state(BUSY);  
   
     _listeners.insert_first(_tickle);  
  
      _solicitSocketCount++;  // bump the count
      int size = (int)_entries.size();
      if((int)_solicitSocketCount >= (size-1)){
           for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
                   _MonitorEntry entry(0, 0, 0);
                   _entries.append(entry);
   }   }
   catch(...){  }  
 } }
  
 monitor_2::~monitor_2(void)     int index;
      for(index = 1; index < (int)_entries.size(); index++)
 { {
   
    stop();  
   
   try {  
     monitor_2_entry* temp = _listeners.remove_first();  
     while(temp){  
       delete temp;  
       temp = _listeners.remove_first();  
     }  
   }  
   
   catch(...){  }  
   
   
   try   try
   {   {
      HTTPConnection2* temp = _connections.remove_first();           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
      while(temp)  
      {      {
         delete temp;              _entries[index].socket = NULL;
         temp = _connections.remove_first();              _entries[index].namedPipe = namedPipe;
      }              _entries[index].namedPipeConnection = true;
   }              _entries[index].queueId  = queueId;
   catch(...)              _entries[index]._type = type;
   {              _entries[index]._status = _MonitorEntry::IDLE;
   }  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   
   
 }  
   
   
 void monitor_2::run(void)  
 {  
   monitor_2_entry* temp;  
   int _nonIdle=0, _idleCount=0, events;  
   
   while(_die.value() == 0) {  
     _nonIdle=_idleCount=0;  
   
      struct timeval tv_idle = { 60, 0 };  
   
     // place all sockets in the select set  
     FD_ZERO(&rd_fd_set);  
     try {  
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "monitor_2::run:Creating New FD list for SELECT.");  
       while(temp != 0 ){  
         if(temp->get_state() == CLOSED ) {  
           monitor_2_entry* closed = temp;  
       temp = _listeners.next(closed);  
           _listeners.remove_no_lock(closed);  
   
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
        "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());  
   
           HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));  
           delete cn;  
           delete closed;  
         }  
         if(temp == 0)  
            break;  
   
   
         //Count the number if IDLE sockets  
         if(temp->get_state() != IDLE ) _nonIdle++;  
          else _idleCount++;  
   
         Sint32 fd = (Sint32) temp->get_sock();  
   
         //Select should be called ONLY on the FDs which are in IDLE state  
         if((fd >= 0) && (temp->get_state() == IDLE))  
         {  
           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
            "monitor_2::run:Adding FD %d to the list for SELECT.",fd);  
           FD_SET(fd , &rd_fd_set);  
         }  
            temp = _listeners.next(temp);  
       }  
       _listeners.unlock();  
     }  
     catch(...){  
       return;  
     }  
   
     // important -  the dispatch routine has pointers to all the  
     // entries that are readable. These entries can be changed but  
     // the pointer must not be tampered with.  
     if(_connections.count() )  
        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);  
     else  
        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);  
   
     if(_die.value())  
     {     {
        break;              AutoMutex automut(Monitor::_cout_mut);
               PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);
     }     }
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if(events == SOCKET_ERROR)  
 #else  
     if(events == -1)  
 #endif #endif
     {  
        Tracer::trace(TRC_HTTP, Tracer::LEVEL2,  
           "monitor_2:run:INVALID FD. errorno = %d on select.", errno);  
        // The EBADF error indicates that one or more or the file  
        // descriptions was not valid. This could indicate that  
        // the _entries structure has been corrupted or that  
        // we have a synchronization error.  
   
      // Keeping the line below commented for time being.  
      //  PEGASUS_ASSERT(errno != EBADF);  
     }  
     else if (events)  
     {  
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);  
   
   
     try {  
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       while(temp != 0 ){  
           Sint32 fd = (Sint32) temp->get_sock();  
           if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {  
           if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);  
           FD_CLR(fd,  &rd_fd_set);  
           monitor_2_entry* ready = new monitor_2_entry(*temp);  
           try  
           {  
              _ready.insert_first(ready);  
           }  
           catch(...)  
           {  
           }  
   
           _requestCount++;  
         }  
         temp = _listeners.next(temp);  
       }  
       _listeners.unlock();  
     }  
     catch(...){  
       return;  
     }  
     // now handle the sockets that are ready to read  
     if(_ready.count())  
        _dispatch();  
     else  
     {  
        if(_connections.count() == 0 )  
           _idle_dispatch(_idle_parm);  
     }  
    }  // if events  
   } // while alive  
   _die=0;  
   
 }  
   
 int  monitor_2::solicitSocketMessages(  
     Sint32 socket,  
     Uint32 events,  
     Uint32 queueId,  
     int type)  
 {  
   
    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");  
   
    AutoMutex autoMut(_entry_mut);  
   
    for(int index = 0; index < (int)_entries.size(); index++)  
    {  
       try  
       {  
          if(_entries[index]._status.value() == monitor_2_entry::EMPTY)  
          {  
             _entries[index].socket = socket;  
             //_entries[index].queueId  = queueId;  
             //_entries[index]._type = type;  
             _entries[index]._status = IDLE;  
  
             return index;             return index;
          }          }
Line 1257 
Line 1341 
       }       }
  
    }    }
    PEG_METHOD_EXIT();     _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
    return -1;     PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
 }  
   
   
 void monitor_2::unsolicitSocketMessages(Sint32 socket)  
 {  
   
     PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");  
     AutoMutex autoMut(_entry2_mut);  
  
     for(int index = 0; index < (int)_entries2.size(); index++)  
     {  
        if(_entries2[index].socket == socket)  
        {  
           _entries2[index]._status = monitor_2_entry::EMPTY;  
           _entries2[index].socket = -1;  
           break;  
        }  
     }  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 }     return -1;
  
 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))  
 {  
   void* old = (void *)_session_dispatch;  
   _session_dispatch = dp;  
   return old;  
 } }
  
 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))  void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)
 { {
   void* old = (void*)_accept_dispatch;  #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   _accept_dispatch = dp;  
   return old;  
 }  
   
 void* monitor_2::set_idle_dispatch(void (*dp)(void*))  
 { {
    void* old = (void*)_idle_dispatch;          AutoMutex automut(Monitor::_cout_mut);
    _idle_dispatch = dp;          PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
    return old;  
 } }
   
 void* monitor_2::set_idle_parm(void* parm)  
 {  
    void* old = _idle_parm;  
    _idle_parm = parm;  
    return old;  
 }  
   
   
   
 //-----------------------------------------------------------------  
 // Note on deleting the monitor_2_entry nodes:  
 //  Each case: in the switch statement needs to handle the deletion  
 //  of the monitor_2_entry * node differently. A SESSION dispatch  
 //  routine MUST DELETE the entry during its dispatch handling.  
 //  All other dispatch routines MUST NOT delete the entry during the  
 //  dispatch handling, but must allow monitor_2::_dispatch to delete  
 //   the entry.  
 //  
 //  The reason is pretty obscure and it is debatable whether or not  
 //  to even bother, but during cimserver shutdown the single monitor_2_entry*  
 //  will leak unless the _session_dispatch routine takes care of deleting it.  
 //  
 //  The reason is that a shutdown messages completely stops everything and  
 //  the _session_dispatch routine never returns. So monitor_2::_dispatch is  
 //  never able to do its own cleanup.  
 //  
 // << Mon Oct 13 09:33:33 2003 mdd >>  
 //-----------------------------------------------------------------  
   
 void monitor_2::_dispatch(void)  
 {  
    monitor_2_entry* entry;  
   
    try  
    {  
   
          entry = _ready.remove_first();  
    }  
    catch(...)  
    {  
    }  
   
   while(entry != 0 ) {  
     switch(entry->get_type()) {  
     case INTERNAL:  
       static char buffer[2];  
       entry->get_sock().disableBlocking();  
       entry->get_sock().read(&buffer, 2);  
       entry->get_sock().enableBlocking();  
       entry->set_state(IDLE);   // Set state of the socket to IDLE so that  
                                 // monitor_2::run can add to the list of FDs  
                                 // on which select would be called.  
   
   
   
       delete entry;  
   
       break;  
     case LISTEN:  
       {  
         static struct sockaddr peer;  
         static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);  
         entry->get_sock().disableBlocking();  
         pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);  
         entry->set_state(IDLE);  // Set state of the LISTEN socket to IDLE  
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     if((Sint32)connected  == SOCKET_ERROR)  
 #else  
         if((Sint32)connected == -1 )  
 #endif #endif
         {  
            delete entry;  
            break;  
         }  
  
         entry->get_sock().enableBlocking();      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");
         monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());      AutoMutex autoMut(_entry_mut);
         if(temp && _accept_dispatch != 0)  
            _accept_dispatch(temp);  
         delete entry;  
  
       }      /*
       break;          Start at index = 1 because _entries[0] is the tickle entry which never needs
     case SESSION:          to be EMPTY;
     case CLIENTSESSION:      */
        if(_session_dispatch != 0 )      unsigned int index;
       for(index = 1; index < _entries.size(); index++)
        {        {
           // NOTE: _session_dispatch will delete entry - do not do it here         if(_entries[index].namedPipe.getPipe() == namedPipe.getPipe())
              unsigned client=0;  
          if(entry->get_type() == CLIENTSESSION) client = 1;  
          Sint32 sock=(Sint32)(entry->get_sock());  
   
              _session_dispatch(entry);  
   
          if(client)  
          {          {
            HTTPConnection2 *cn = monitor_2::remove_connection(sock);            _entries[index]._status = _MonitorEntry::EMPTY;
                if(cn) delete cn;            //_entries[index].namedPipe = PEGASUS_INVALID_SOCKET;
            // stop();            _solicitSocketCount--;
            _die=1;  
          }  
        }  
   
       else {  
         static char buffer[4096];  
         int bytes = entry->get_sock().read(&buffer, 4096);  
         delete entry;  
       }  
   
       break;  
     case UNTYPED:  
     default:  
            delete entry;  
       break;  
     }  
     _requestCount--;  
   
     if(_ready.count() == 0 )  
        break;        break;
   
     try  
     {  
        entry = _ready.remove_first();  
     }  
     catch(...)  
     {  
     }  
   
   }  
 }  
   
 void monitor_2::stop(void)  
 {  
   _die = 1;  
   tickle();  
   // shut down the listener list, free the list nodes  
   _tickler.get_sock().close();  
   _listeners.shutdown_queue();  
 }  
   
 void monitor_2::tickle(void)  
 {  
   static char _buffer[] =  
     {  
       '0','0'  
     };  
   
   _tickler.get_sock().disableBlocking();  
   
   _tickler.get_sock().write(&_buffer, 2);  
   _tickler.get_sock().enableBlocking();  
   
 }  
   
   
 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps,  
                                        monitor_2_entry_type type,  
                                        void* accept_parm,  
                                        void* dispatch_parm)  
 {  
   Sint32 fd1,fd2;  
   
   fd2=(Sint32) ps;  
   
   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);  
   
 // The purpose of the following piece of code is to avoid duplicate entries in  
 // the _listeners list. Would it be too much of an overhead ?  
 try {  
   
      monitor_2_entry* temp;  
   
       _listeners.lock(pegasus_thread_self());  
       temp = _listeners.next(0);  
       while(temp != 0 )  
       {  
         fd1=(Sint32) temp->get_sock();  
   
         if(fd1 == fd2)  
         {  
   
            Tracer::trace(TRC_HTTP, Tracer::LEVEL3,  
           "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);  
             if(temp->get_state() == CLOSED)  
             {  
               temp->set_state(IDLE);  
               Tracer::trace(TRC_HTTP, Tracer::LEVEL3,  
               "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);  
              }  
              _listeners.unlock();  
             delete m2e;  
             return 0;  
         }  
        temp = _listeners.next(temp);  
       }  
    }  
    catch(...)  
    {  
       delete m2e;  
       return 0;  
    }  
   
   
   _listeners.unlock();  
   
   
   try{  
     _listeners.insert_first(m2e);  
   }  
   catch(...){  
     delete m2e;  
     return 0;  
   }  
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
       "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);  
   tickle();  
   return m2e;  
 }  
   
 Boolean monitor_2::remove_entry(Sint32 s)  
 {  
   monitor_2_entry* temp;  
   try {  
     _listeners.try_lock(pegasus_thread_self());  
     temp = _listeners.next(0);  
     while(temp != 0){  
       if(s == (Sint32)temp->_rep->psock ){  
         temp = _listeners.remove_no_lock(temp);  
         delete temp;  
         _listeners.unlock();  
         return true;  
       }       }
       temp = _listeners.next(temp);  
     }  
     _listeners.unlock();  
   }  
   catch(...){  
   }  
   return false;  
 } }
  
 Uint32 monitor_2::getOutstandingRequestCount(void)      /*
 {          Dynamic Contraction:
   return _requestCount.value();          To remove excess entries we will start from the end of the _entries array
           and remove all entries with EMPTY status until we find the first NON EMPTY.
 }          This prevents the positions, of the NON EMPTY entries, from being changed.
       */
       index = _entries.size() - 1;
 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
 {          if((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||
               (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))
    HTTPConnection2* temp;  
    try  
    {  
       monitor_2::_connections.lock(pegasus_thread_self());  
       temp = monitor_2::_connections.next(0);  
       while(temp != 0 )  
       {  
          if(sock == temp->getSocket())  
          {  
             temp = monitor_2::_connections.remove_no_lock(temp);  
             monitor_2::_connections.unlock();  
             return temp;  
          }  
          temp = monitor_2::_connections.next(temp);  
       }  
       monitor_2::_connections.unlock();  
    }  
    catch(...)  
    {    {
               _entries.remove(index);
    }    }
    return 0;          index--;
 }  
   
 Boolean monitor_2::insert_connection(HTTPConnection2* connection)  
 {  
    try  
    {  
       monitor_2::_connections.insert_first(connection);  
    }    }
    catch(...)      PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
    {    {
       return false;          AutoMutex automut(Monitor::_cout_mut);
           PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
    }    }
    return true;  #endif
 } }
  
   #endif
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.82  
changed lines
  Added in v.1.103.10.24

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2