(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.103.10.16 and 1.103.10.27

version 1.103.10.16, 2006/07/05 14:28:20 version 1.103.10.27, 2006/10/18 04:24:42
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com  
 //              Amit K Arora (Bug#1153) amita@in.ibm.com  
 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090  
 //              Sushma Fernandes (sushma@hp.com) for Bug#2057  
 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101  
 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)  
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
  
 #include <cstring> #include <cstring>
 #include "Monitor.h"  #include <Pegasus/Common/Monitor.h>
 #include "MessageQueue.h"  #include <Pegasus/Common/MessageQueue.h>
 #include "Socket.h"  #include <Pegasus/Common/Socket.h>
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h"  #include <Pegasus/Common/ArrayIterator.h>
  
   //const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes
  
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
   // Maximum iterations of Pipe processing in Monitor::run
   const Uint32 maxIterations = 3;
  
 const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes  #endif
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
Line 82 
Line 78 
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0); static AtomicInt _connections(0);
   
 Mutex Monitor::_cout_mut; Mutex Monitor::_cout_mut;
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
  #define PIPE_INCREMENT 1  #define PIPE_INCREMENT 1
 #endif #endif
  
Line 95 
Line 94 
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   
 Monitor::Monitor() Monitor::Monitor()
    : _stopConnections(0),    : _stopConnections(0),
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
Line 103 
Line 103 
      _tickle_server_socket(-1),      _tickle_server_socket(-1),
      _tickle_peer_socket(-1)      _tickle_peer_socket(-1)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
Line 122 
Line 124 
        _MonitorEntry entry(0, 0, 0);        _MonitorEntry entry(0, 0, 0);
        _entries.append(entry);        _entries.append(entry);
     }     }
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{     try{
Line 159 
Line 165 
     Socket::uninitializeInterface();     Socket::uninitializeInterface();
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "returning from monitor destructor");                   "returning from monitor destructor");
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
 void Monitor::initializeTickler(){ void Monitor::initializeTickler(){
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
     /*     /*
        NOTE: On any errors trying to        NOTE: On any errors trying to
              setup out tickle connection,              setup out tickle connection,
Line 350 
Line 360 
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
     // checks entries with IDLE state for events     // checks entries with IDLE state for events
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
           Tracer::trace(TRC_HTTP,Tracer::LEVEL2,"!!!!!!!! TICKLE SOCKET-ID = %u",_tickle_peer_socket);
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
     _entries.append(entry);     _entries.append(entry);
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
 void Monitor::tickle(void) void Monitor::tickle(void)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
     static char _buffer[] =     static char _buffer[] =
     {     {
       '0','0'       '0','0'
     };     };
                  Tracer::trace (TRC_HTTP, Tracer::LEVEL2,
                                      "Now Monitor::Tickle ");
     AutoMutex autoMutex(_tickle_mutex);     AutoMutex autoMutex(_tickle_mutex);
     Socket::disableBlocking(_tickle_client_socket);     Socket::disableBlocking(_tickle_client_socket);
                          Tracer::trace (TRC_HTTP, Tracer::LEVEL2,
                                              "Now Monitor::Tickle::Write() ");
   
     Socket::write(_tickle_client_socket,&_buffer, 2);     Socket::write(_tickle_client_socket,&_buffer, 2);
     Socket::enableBlocking(_tickle_client_socket);     Socket::enableBlocking(_tickle_client_socket);
                          Tracer::trace (TRC_HTTP, Tracer::LEVEL2,
                                      "Now Monitor::Tickled ");
   
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ) void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
Line 387 
Line 411 
  
 Boolean Monitor::run(Uint32 milliseconds) Boolean Monitor::run(Uint32 milliseconds)
 { {
     {  
         AutoMutex automut(Monitor::_cout_mut);  
         PEGASUS_STD(cout) << "Entering: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
     }  
  
     Boolean handled_events = false;     Boolean handled_events = false;
     int i = 0;     int i = 0;
  
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};  
  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
Line 437 
Line 456 
        if ((entry._status.get() == _MonitorEntry::DYING) &&        if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))                                          (entry._type == Monitor::CONNECTION))
        {        {
   
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
           PEGASUS_ASSERT(q != 0);           PEGASUS_ASSERT(q != 0);
           HTTPConnection &h = *static_cast<HTTPConnection *>(q);           HTTPConnection &h = *static_cast<HTTPConnection *>(q);
  
                                         if (h._connectionClosePending == false)                                         if (h._connectionClosePending == false)
                           {
                                                 continue;                                                 continue;
                           }
   
  
                                         // NOTE: do not attempt to delete while there are pending responses                                         // NOTE: do not attempt to delete while there are pending responses
                                         // coming thru. The last response to come thru after a                                         // coming thru. The last response to come thru after a
Line 452 
Line 475 
  
                                         if (h._responsePending == true)                                         if (h._responsePending == true)
                                         {                                         {
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                         if (!entry.namedPipeConnection)                         if (!entry.namedPipeConnection)
                         {                         {
   #endif
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
                                                                                                         "Ignoring connection delete request because "                                                                                                         "Ignoring connection delete request because "
                                                                                                         "responses are still pending. "                                                                                                         "responses are still pending. "
                                                                                                         "connection=0x%p, socket=%d\n",                                                                                                         "connection=0x%p, socket=%d\n",
                                                                                                         (void *)&h, h.getSocket());                                                                                                         (void *)&h, h.getSocket());
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
                         }                         }
                         else                         else
                         {                         {
Line 468 
Line 497 
                                                                                                         "connection=0x%p, NamedPipe=%d\n",                                                                                                         "connection=0x%p, NamedPipe=%d\n",
                                                                                                         (void *)&h, h.getNamedPipe().getPipe());                                                                                                         (void *)&h, h.getNamedPipe().getPipe());
                         }                         }
   #endif
                                                 continue;                                                 continue;
                                         }                                         }
                                         h._connectionClosePending = false;                                         h._connectionClosePending = false;
           MessageQueue &o = h.get_owner();           MessageQueue &o = h.get_owner();
           Message* message;                      Message* message = 0;
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
           if (!entry.namedPipeConnection)           if (!entry.namedPipeConnection)
           {           {
   #endif
               message= new CloseConnectionMessage(entry.socket);               message= new CloseConnectionMessage(entry.socket);
   
   // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
           }           }
           else           else
           {           {
   
               message= new CloseConnectionMessage(entry.namedPipe);               message= new CloseConnectionMessage(entry.namedPipe);
  
           }           }
   #endif
           message->dest = o.getQueueId();           message->dest = o.getQueueId();
  
           // HTTPAcceptor is responsible for closing the connection.           // HTTPAcceptor is responsible for closing the connection.
Line 501 
Line 540 
           autoEntryMutex.lock();           autoEntryMutex.lock();
           // After enqueue a message and the autoEntryMutex has been released and locked again,           // After enqueue a message and the autoEntryMutex has been released and locked again,
           // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.           // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
   
           entries.reset(_entries);           entries.reset(_entries);
        }        }
     }     }
Line 513 
Line 553 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     //Array<HANDLE> pipeEventArray;  
         PEGASUS_SOCKET maxSocketCurrentPass = 0;         PEGASUS_SOCKET maxSocketCurrentPass = 0;
     int indx;     int indx;
  
           // Record the indexes at which Sockets are available
           Array <Uint32> socketCountAssociator;
       int socketEntryCount=0;
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS       // Added for NamedPipe implementation for windows
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
     //This array associates named pipe connections to their place in [indx]     //This array associates named pipe connections to their place in [indx]
     //in the entries array. The value in poition zero of the array is the      //in the entries array. The value in portion zero of the array is the
     //index of the fist named pipe connection in the entries array     //index of the fist named pipe connection in the entries array
   
           // Record the indexes at which Pipes are available
     Array <Uint32> indexPipeCountAssociator;     Array <Uint32> indexPipeCountAssociator;
     int pipeEntryCount=0;     int pipeEntryCount=0;
     int MaxPipes = PIPE_INCREMENT;     int MaxPipes = PIPE_INCREMENT;
     HANDLE* hEvents = new HANDLE[PIPE_INCREMENT];      // List of Pipe Handlers
       HANDLE * hPipeList = new HANDLE[PIPE_INCREMENT];
 #endif #endif
  
     for( indx = 0; indx < (int)entries.size(); indx++)      // This loop takes care of setting the namedpipe which has to be used from the list....
       for ( indx = 0,socketEntryCount=0 ;
                                indx < (int)entries.size(); indx++)
     {     {
  
   // Added for NamedPipe implementation for windows
 #ifdef PEGASUS_OS_TYPE_WINDOWS  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
        if(entries[indx].isNamedPipeConnection())                  if (!entries[indx].namedPipeConnection)
        {        {
   #endif
               if (maxSocketCurrentPass < entries[indx].socket)
                           {
                                   maxSocketCurrentPass = entries[indx].socket;
                           }
               if(entries[indx]._status.get() == _MonitorEntry::IDLE)
               {
                   _idleEntries++;
                   FD_SET(entries[indx].socket, &fdread);
                   socketCountAssociator.append(indx);
                                   socketEntryCount++;
               }
  
            //entering this clause mean that a Named Pipe connection is at entries[indx]  // Added for NamedPipe implementation for windows
            //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
           }
            //cout << "In Monitor::run - pipe being added to array is " << entries[indx].namedPipe.getName() << endl;                  else
                   {
             entries[indx].pipeSet = false;             entries[indx].pipeSet = false;
   
            // We can Keep a counter in the Monitor class for the number of named pipes ...  
            //  Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )  
             if (pipeEntryCount >= MaxPipes)             if (pipeEntryCount >= MaxPipes)
             {             {
                // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' begining - pipeEntryCount=" <<  
                    // pipeEntryCount << " MaxPipes=" << MaxPipes << endl;  
                  MaxPipes += PIPE_INCREMENT;                  MaxPipes += PIPE_INCREMENT;
                  HANDLE* temp_hEvents = new HANDLE[MaxPipes];                                  HANDLE* temp_pList = new HANDLE[MaxPipes];
   
                  for (Uint32 i =0;i<pipeEntryCount;i++)                  for (Uint32 i =0;i<pipeEntryCount;i++)
                  {                  {
                      temp_hEvents[i] = hEvents[i];                                      temp_pList[i] = hPipeList[i];
                  }                  }
                                   delete [] hPipeList;
                  delete [] hEvents;                                  hPipeList = temp_pList;
   
                  hEvents = temp_hEvents;  
                 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' ending"<< endl;  
   
             }             }
                           hPipeList[pipeEntryCount] = entries[indx].namedPipe.getPipe();
            //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);  
            hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap()->hEvent;  
   
            indexPipeCountAssociator.append(indx);            indexPipeCountAssociator.append(indx);
            /*  
            if(!entries[indx].namedPipe.isConnectionPipe)  
            {  
                BOOL rc = ::ReadFile(  
                        entries[indx].namedPipe.getPipe(),  
                        &entries[indx].namedPipe.raw,  
                        MAX_BUFFER_SIZE,  
                        &entries[indx].namedPipe.bytesRead,  
                        &entries[indx].namedPipe.getOverlap());  
   
                cout << "just called read on index " << indx << endl;  
   
                 //&entries[indx].namedPipe.bytesRead = &size;  
                if(!rc)  
                {  
   
                   cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;  
   
                }  
   
            }  
           */  
   
     pipeEntryCount++;     pipeEntryCount++;
   
   
   
           // cout << "Monitor::run pipeEntrycount is " << pipeEntryCount <<  
           // " this is the type " << entries[indx]._type << " this is index " << indx << endl;  
   
        }        }
        else  
  
 #endif #endif
        {  
   
            if(maxSocketCurrentPass < entries[indx].socket)  
             maxSocketCurrentPass = entries[indx].socket;  
   
            if(entries[indx]._status.get() == _MonitorEntry::IDLE)  
            {  
                _idleEntries++;  
                FD_SET(entries[indx].socket, &fdread);  
            }  
   
        }  
   }   }
  
     /*     /*
Line 626 
Line 630 
  
     autoEntryMutex.unlock();     autoEntryMutex.unlock();
  
     //      int events = -1;
     // The first argument to select() is ignored on Windows and it is not          // Since the pipes have been introduced, the ratio of procesing
     // a socket value.  The original code assumed that the number of sockets          // time Socket:Pipe :: 3/4:1/4 respectively
     // and a socket value have the same type.  On Windows they do not.  
     //  
  
     int events;          Uint32 newMilliseconds = milliseconds;
     int pEvents;          #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS          newMilliseconds = (milliseconds * 3)/4 ;
  
    // events = select(0, &fdread, NULL, NULL, &tv);      #endif
   
     //if (events == NULL)  
     //{  // This connection uses namedPipes  
   
         events = 0;  
         DWORD dwWait=NULL;  
         pEvents = 0;  
   
   
         cout << "Calling WaitForMultipleObjects\n";  
   
         //this should be in a try block  
   
         dwWait = WaitForMultipleObjects(MaxPipes,  
                                    hEvents,      //ABB:- array of event objects  
                                    FALSE,        // ABB:-does not wait for all  
                                    2000);        //ABB:- timeout value   //WW this may need be shorter  
   
     if(dwWait == WAIT_TIMEOUT)  
         {  
         cout << "Wait WAIT_TIMEOUT\n";  
   
         events = select(0, &fdread, NULL, NULL, &tv);  
  
           struct timeval tv = {newMilliseconds/1000, newMilliseconds%1000*1000};
  
                    // Sleep(2000);  
             //continue;  
  
              //return false;  // I think we do nothing.... Mybe there is a socket connection... so          #ifdef PEGASUS_OS_TYPE_WINDOWS
              // cant return.  
         }  
         else if (dwWait == WAIT_FAILED)  
         {  
             if (GetLastError() == 6) //WW this may be too specific  
             {  
                 AutoMutex automut(Monitor::_cout_mut);  
                 cout << "Monitor::run about to call 'select since waitForMultipleObjects failed\n";  
                 events = select(0, &fdread, NULL, NULL, &tv);                 events = select(0, &fdread, NULL, NULL, &tv);
   
             }  
             else  
             {  
                 AutoMutex automut(Monitor::_cout_mut);  
                 cout << "Wait Failed returned\n";  
                 cout << "failed with " << GetLastError() << "." << endl;  
                 pEvents = -1;  
                 return false;  
             }  
         }  
         else  
         {  
             int pCount = dwWait - WAIT_OBJECT_0;  // determines which pipe  
             {  
                AutoMutex automut(Monitor::_cout_mut);  
                cout << " WaitForMultiPleObject returned activity on server pipe: "<<  
                 pCount<< endl;  
             }  
   
             pEvents = 1;  
   
             //this statment gets the pipe entry that was trigered  
             entries[indexPipeCountAssociator[pCount]].pipeSet = true;  
   
             if (pCount > 0) //this means activity on pipe is CIMOperation reques  
             {  
        //         cout << "In Monitor::run got Operation request" << endl;  
                 //entries[indx]._type = Monitor::CONNECTION;  
             }  
             else //this clause my not be needed in production but is used for testing  
             {  
          //     cout << "In Monitor::run got Connection request" << endl;  
   
             }  
   
         }  
                 //  
   
   
    // Sleep(2000);  
   
     //int events = 1;  
     /*if (dwWait)  
     {  
         cout << "in Monitor::run about to call handlePipeConnectionEvent" << endl;  
         _handlePipeConnectionEvent(dwWait);  
     }*/  
    // }  
 #else #else
     events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
   
     autoEntryMutex.lock();     autoEntryMutex.lock();
     // After enqueue a message and the autoEntryMutex has been released and locked again,     // After enqueue a message and the autoEntryMutex has been released and locked again,
     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
     entries.reset(_entries);     entries.reset(_entries);
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(pEvents == -1)  
     {  
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
           "Monitor::run - errorno = %d has occurred on select.",GetLastError() );  
        // The EBADF error indicates that one or more or the file  
        // descriptions was not valid. This could indicate that  
        // the entries structure has been corrupted or that  
        // we have a synchronization error.  
   
         // We need to generate an assert  here...  
        PEGASUS_ASSERT(GetLastError()!= EBADF);  
   
   
     }  
   
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
 #else #else
     if(events == -1)     if(events == -1)
 #endif #endif
     {     {
   
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
Line 764 
Line 670 
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
     }     }
     else if ((events)||(pEvents))      else if (events)
     {     {
   
      //  cout << "IN Monior::run 'else if (events)' clause - array size is " <<  
      //       (int)entries.size() << endl;  
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)entries.size(); indx++)           for ( int sindx = 0; sindx < socketEntryCount; sindx++)
        {        {
            //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl;  
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if(((entries[indx]._status.get() == _MonitorEntry::IDLE) &&                       indx = socketCountAssociator[sindx];
              FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||  
              (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents)))  
           {  
               MessageQueue *q;  
            try{  
  
                  q = MessageQueue::lookup(entries[indx].queueId);               if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
               }                    (FD_ISSET(entries[indx].socket, &fdread)))
              catch (Exception e)  
              {  
                  cout << " this is what lookup gives - " << e.getMessage() << endl;  
                  exit(1);  
              }  
              catch(...)  
              {              {
                  cout << "MessageQueue::lookup gives strange exception " << endl;                   MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                  exit(1);  
              }  
   
   
   
   
               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, entries[indx].queueId, q);                   indx, entries[indx].queueId, q);
              cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;  
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                  {  
                  AutoMutex automut(Monitor::_cout_mut);  
                   cout <<" this is the type " << entries[indx]._type <<  
                       "for index " << indx << endl;  
                cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;  
                  }  
                if(entries[indx]._type == Monitor::CONNECTION)                if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                     {  
                     cout << "In Monitor::run Monitor::CONNECTION clause" << endl;  
                     AutoMutex automut(Monitor::_cout_mut);  
                     }  
   
                                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);  
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                    // Do not update the entry just yet. The entry gets updated once
Line 830 
Line 702 
                    //entries[indx]._status = _MonitorEntry::BUSY;                    //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                    // If allocate_and_awaken failure, retry on next iteration
 /* Removed for PEP 183.  
                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(  
                            (void *)q, _dispatch))  
                    {  
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                           "Monitor::run: Insufficient resources to process request.");  
                       entries[indx]._status = _MonitorEntry::IDLE;  
                       return true;  
                    }  
 */  
 // Added for PEP 183  
                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
   
                    /*In the case of named Pipes, the request has already been read from the pipe  
                    therefor this section passed the request data to the HTTPConnection  
                    NOTE: not sure if this would be better suited in a sparate private method  
                    */  
                    dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needd  
                    //cout << "In Monitor::run after dst->setNamedPipe string read is " <<  entries[indx].namedPipe.raw << endl;  
   
                    try                    try
                    {                    {
                        {  
                        AutoMutex automut(Monitor::_cout_mut);  
                        cout << "In Monitor::run about to call 'dst->run(1)' "  << endl;  
                        }  
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
Line 867 
Line 716 
                        "Monitor::_dispatch: exception received");                        "Monitor::_dispatch: exception received");
                    }                    }
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                              "Monitor::_dispatch: exited run() for index %d",
                                                       dst->_entry_index);
  
                    if (entries[indx].isNamedPipeConnection())  
                    {  
                        entries[indx]._type = Monitor::ACCEPTOR;  
                    }                    }
                                           else if (entries[indx]._type == Monitor::INTERNAL)
                    // It is possible the entry status may not be set to busy.                                          {
                    // The following will fail in that case.  
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);  
                    // Once the HTTPConnection thread has set the status value to either  
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection  
                    // to the Monitor.  It is no longer permissible to access the connection  
                    // or the entry in the _entries table.  
   
                    // The following is not relevant as the worker thread or the  
                    // reader thread will update the status of the entry.  
                    //if (dst->_connectionClosePending)  
                    //{  
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;  
                    //}  
                    //else  
                    //{  
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;  
                    //}  
 // end Added for PEP 183  
                 }  
                 else if( entries[indx]._type == Monitor::INTERNAL){  
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
             cout << " in - entries[indx]._type == Monitor::INTERNAL- " << endl;  
   
                         entries[indx]._status = _MonitorEntry::BUSY;  
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(entries[indx].socket);                         Socket::disableBlocking(entries[indx].socket);
   
                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(entries[indx].socket);                         Socket::enableBlocking(entries[indx].socket);
                         entries[indx]._status = _MonitorEntry::IDLE;                         entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
             {  
   
             AutoMutex automut(Monitor::_cout_mut);  
             cout << "In Monitor::run else clause of CONNECTION if statments" << endl;  
             }  
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
   
                    int events = 0;                    int events = 0;
            Message *msg;                                                  events |= SocketMessage::READ;
            {                                                  Message *msg = new SocketMessage(entries[indx].socket, events);
                                                   entries[indx]._status = _MonitorEntry::BUSY;
                                                   autoEntryMutex.unlock();
                                                   q->enqueue(msg);
                                                   autoEntryMutex.lock();
                                                   // After enqueue a message and the autoEntryMutex has been released and locked again,
                                                   // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
                                                   entries.reset(_entries);
                                                   entries[indx]._status = _MonitorEntry::IDLE;
                                                   handled_events = true;
                                                   delete [] hPipeList;
                                                   return handled_events;
  
            AutoMutex automut(Monitor::_cout_mut);  
            cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;  
            }            }
            if (entries[indx].isNamedPipeConnection())                                  }
            {                                  catch(...)
                if(!entries[indx].namedPipe.isConnectionPipe)  
                { /*if we enter this clasue it means that the named pipe that we are  
                    looking at has recived a connection but is not the pipe we get connection requests over.  
                    therefore we need to change the _type to CONNECTION and wait for a CIM Operations request*/  
                    entries[indx]._type = Monitor::CONNECTION;  
   
   
      /* This is a test  - this shows that the read file needs to be done  
      before we call wiatForMultipleObjects*/  
     /******************************************************  
     ********************************************************/  
         //memset is to clear the 'entries[indx].namedPipe.raw', which avoids junk characters.  
         memset(entries[indx].namedPipe.raw,'\0',4096);  
         BOOL rc = ::ReadFile(  
                 entries[indx].namedPipe.getPipe(),  
                 &entries[indx].namedPipe.raw,  
                 MAX_BUFFER_SIZE,  
                 &entries[indx].namedPipe.bytesRead,  
                 entries[indx].namedPipe.getOverlap());  
   
         {         {
          AutoMutex automut(Monitor::_cout_mut);  
          cout << "Monitor::run just called read on index " << indx << endl;  
         }         }
                                   handled_events = true;
                           }
           }
                   delete [] hPipeList;
                   return handled_events;
       }
   
   
   #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
   
           //if no pipes are registered return immediately
   
           int pEvents = -1;
           int pCount = -1;
           BOOL bPeekPipe = 0;
           DWORD dwBytesAvail=0;
           // The pipe is sniffed and check if there are any data. If available, the
           // message is picked from the Queue and appropriate methods are invoked.
  
  
          //&entries[indx].namedPipe.bytesRead = &size;          // pipeProcessCount records the number of requests that are processed.
         if(!rc)          // At the end of loop this is verified against the count of request
           // on local connection . If there are any pipes which needs to be
           // processed we would apply delay and then proceed to iterate.
   
       Uint32 pipeProcessCount =0;
   
       for (int counter = 1; counter < maxIterations ; counter ++)
         {         {
             AutoMutex automut(Monitor::_cout_mut);  
            cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;  
  
         }  
  
                   // pipeIndex is used to index into indexPipeCountAssociator to fetch
                   // index of the _MonitorEntry of Monitor
           for (int pipeIndex = 0; pipeIndex < pipeEntryCount; pipeIndex++)
               {
               dwBytesAvail = 0;
                       bPeekPipe = ::PeekNamedPipe(hPipeList[pipeIndex],
                                                       NULL,
                                                                       NULL,
                                                                           NULL,
                                           &dwBytesAvail,
                                                                           NULL
                                                                          );
  
                           // If peek on NamedPipe was successfull and data is available
               if (bPeekPipe && dwBytesAvail)
                   {
  
     /******************************************************                              Tracer::trace(TRC_HTTP,Tracer::LEVEL4," PIPE_PEEKING FOUND = %u BYTES", dwBytesAvail);
     ********************************************************/  
  
                               pEvents = 1;
                       entries[indexPipeCountAssociator[pipeIndex]].pipeSet = true;
                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                       "Monitor::run select event received events = %d, \
                                           monitoring %d idle entries",
                       pEvents,
                                           _idleEntries);
  
                                   int pIndx = indexPipeCountAssociator[pipeIndex];
  
                                   if ((entries[pIndx]._status.get() == _MonitorEntry::IDLE) &&
                                            entries[pIndx].namedPipe.isConnected() &&
                                            (pEvents))
                           {
  
                    continue;                                  MessageQueue *q = 0;
  
                }                      try
                {                {
                    AutoMutex automut(Monitor::_cout_mut);  
                     cout << " In Monitor::run about to create a Pipe message" << endl;  
  
                                           q = MessageQueue::lookup (entries[pIndx].queueId);
                }                }
                       catch (Exception e)
                events |= NamedPipeMessage::READ;                      {
                msg = new NamedPipeMessage(entries[indx].namedPipe, events);                                          e.getMessage();
            }            }
            else                      catch(...)
                       {
                                   }
   
                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                     "Monitor::run indx = %d, queueId =  %d,\
                                                                     q = %p",pIndx, entries[pIndx].queueId, q);
                       try
            {            {
                                           if (entries[pIndx]._type == Monitor::CONNECTION)
                {                {
                AutoMutex automut(Monitor::_cout_mut);  
                cout << " In Monitor::run ..its a socket message" << endl;                                                      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                         "entries[indx].type for indx = \
                                                                                 %d is Monitor::CONNECTION",
                                                                                     pIndx);
                                                       static_cast<HTTPConnection *>(q)->_entry_index = pIndx;
                                                   HTTPConnection *dst = reinterpret_cast \
                                                                                              <HTTPConnection *>(q);
                                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                         "Monitor::_dispatch: entering run() \
                                                                                     for indx  = %d, queueId = %d, \
                                                                                     q = %p",\
                                                                 dst->_entry_index,
                                                                                     dst->_monitor->_entries\
                                                                                     [dst->_entry_index].queueId, dst);
   
                                                   try
                                                   {
   
                                                           dst->run(1);
   
                                                           // Record that the requested data is read/Written
                                                           pipeProcessCount++;
   
                }                }
                events |= SocketMessage::READ;                                                  catch (...)
                        msg = new SocketMessage(entries[indx].socket, events);                                                  {
                                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                                 "Monitor::_dispatch: \
                                                                                              exception received");
            }            }
  
                    entries[indx]._status = _MonitorEntry::BUSY;                                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                 "Monitor::_dispatch: exited \
                                                                          \run() index %d",
                                                                                      dst->_entry_index);
   
   
                                           }
                                           else
                                           {
                                                   /* The condition
                                                              entries[indx]._type == Monitor::INTERNAL can be
                                                              ignored for pipes as the tickler is of
                                                              Monitor::INTERNAL type. The tickler is
                                                              a socket.
                                                   */
   
                                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                                                                                     "Non-connection entry, indx = %d,\
                                                                                             has been received.", pIndx);
                                                   int events = 0;
                                                   Message *msg = 0;
   
                                                       pEvents |= NamedPipeMessage::READ;
                                                       msg = new NamedPipeMessage(entries[pIndx].namedPipe, pEvents);
                                       entries[pIndx]._status = _MonitorEntry::BUSY;
                    autoEntryMutex.unlock();                    autoEntryMutex.unlock();
                    q->enqueue(msg);                    q->enqueue(msg);
                    autoEntryMutex.lock();                    autoEntryMutex.lock();
            // After enqueue a message and the autoEntryMutex has been released and locked again,  
            // the array of entries can be changed. The ArrayIterator has be reset with the original _entries  
            entries.reset(_entries);            entries.reset(_entries);
                    entries[indx]._status = _MonitorEntry::IDLE;                                      entries[pIndx]._status = _MonitorEntry::IDLE;
                                                           delete [] hPipeList;
                                   return(handled_events);
  
                    {  
                        AutoMutex automut(Monitor::_cout_mut);  
                        PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);  
                    }  
                    return true;  
                 }                 }
   
   
              }              }
              catch(...)              catch(...)
              {              {
   
              }              }
              handled_events = true;  
           }           }
   
        }        }
     }     }
  
                   //Check if all the pipes had recieved the data, If no then try again
           if (pipeEntryCount == pipeProcessCount)
     {     {
         AutoMutex automut(Monitor::_cout_mut);                      break;
         PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);                  }
   
   
     }     }
   
           delete [] hPipeList;
   
   #endif
   
     return(handled_events);     return(handled_events);
 } }
  
 void Monitor::stopListeningForConnections(Boolean wait) void Monitor::stopListeningForConnections(Boolean wait)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
     // set boolean then tickle the server to recognize _stopConnections     // set boolean then tickle the server to recognize _stopConnections
     _stopConnections = 1;     _stopConnections = 1;
Line 1040 
Line 960 
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
  
Line 1053 
Line 975 
     Uint32 queueId,     Uint32 queueId,
     int type)     int type)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
    AutoMutex autoMut(_entry_mut);    AutoMutex autoMut(_entry_mut);
    // Check to see if we need to dynamically grow the _entries array    // Check to see if we need to dynamically grow the _entries array
Line 1083 
Line 1007 
             _entries[index]._type = type;             _entries[index]._type = type;
             _entries[index]._status = _MonitorEntry::IDLE;             _entries[index]._status = _MonitorEntry::IDLE;
  
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
             {             {
                 AutoMutex automut(Monitor::_cout_mut);                 AutoMutex automut(Monitor::_cout_mut);
                 PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);                 PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
             }             }
   #endif
             return index;             return index;
          }          }
       }       }
Line 1096 
Line 1022 
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
    {    {
        AutoMutex automut(Monitor::_cout_mut);        AutoMutex automut(Monitor::_cout_mut);
        PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);        PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
    }    }
   #endif
    return -1;    return -1;
  
 } }
  
 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket) void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
Line 1143 
Line 1073 
         index--;         index--;
     }     }
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
 // Note: this is no longer called with PEP 183. // Note: this is no longer called with PEP 183.
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
Line 1189 
Line 1123 
    return 0;    return 0;
 } }
  
   // Added for NamedPipe implementation for windows
 //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes  #if defined PEGASUS_OS_TYPE_WINDOWS && !defined(PEGASUS_DISABLE_LOCAL_DOMAIN_SOCKET)
   //This method is anlogus to solicitSocketMessages. It does the same thing for named Pipes
 int  Monitor::solicitPipeMessages( int  Monitor::solicitPipeMessages(
     NamedPipe namedPipe,     NamedPipe namedPipe,
     Uint32 events,  //not sure what has to change for this enum     Uint32 events,  //not sure what has to change for this enum
Line 1198 
Line 1133 
     int type)     int type)
 { {
    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
   
    AutoMutex autoMut(_entry_mut);    AutoMutex autoMut(_entry_mut);
    // Check to see if we need to dynamically grow the _entries array    // Check to see if we need to dynamically grow the _entries array
    // We always want the _entries array to 2 bigger than the    // We always want the _entries array to 2 bigger than the
    // current connections requested    // current connections requested
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
   {
      AutoMutex automut(Monitor::_cout_mut);
    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);    PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
   }
   #endif
  
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
Line 1228 
Line 1167 
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
             _entries[index]._type = type;             _entries[index]._type = type;
             _entries[index]._status = _MonitorEntry::IDLE;             _entries[index]._status = _MonitorEntry::IDLE;
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {
               AutoMutex automut(Monitor::_cout_mut);
             PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);             PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);
     }
   #endif
  
             return index;             return index;
          }          }
Line 1247 
Line 1190 
  
 } }
  
   //////////////////////////////////////////////////////////////////////////////
   // Method Name      : unsolicitPipeMessages
   // Input Parameter  : namedPipe  - type NamedPipe
   // Return Type      : void
   //============================================================================
   // This method is invoked from HTTPAcceptor::handleEnqueue for server
   // when the CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked
   // from HTTPAcceptor::destroyConnections method when the CIMServer is shutdown.
   // For the CIMClient, this is invoked from HTTPConnector::handleEnqueue when the
   // CLOSE_CONNECTION_MESSAGE is recieved. This method is also invoked from
   // HTTPConnector::disconnect when CIMClient requests a disconnect request.
   // The list of _MonitorEntry is searched for the matching pipe.
   // The Handle of the identified is closed and _MonitorEntry for the
   // requested pipe is removed.
   ///////////////////////////////////////////////////////////////////////////////
   
 void Monitor::unsolicitPipeMessages(NamedPipe namedPipe) void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)
 { {
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
Line 1267 
Line 1228 
        if(_entries[index].namedPipe.getPipe() == namedPipe.getPipe())        if(_entries[index].namedPipe.getPipe() == namedPipe.getPipe())
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;           _entries[index]._status = _MonitorEntry::EMPTY;
           //_entries[index].namedPipe = PEGASUS_INVALID_SOCKET;              // Ensure that the client has read the data
                       ::FlushFileBuffers (namedPipe.getPipe());
                       //Disconnect to release the pipe. This doesn't release Pipe Handle
                       ::DisconnectNamedPipe (_entries[index].namedPipe.getPipe());
               // Must use CloseHandle to Close Pipe
                           ::CloseHandle(_entries[index].namedPipe.getPipe());
                       _entries[index].namedPipe.disconnect();
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 1280 
Line 1247 
         This prevents the positions, of the NON EMPTY entries, from being changed.         This prevents the positions, of the NON EMPTY entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status.get() == _MonitorEntry::EMPTY){      while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)          {
           if ((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||
               (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))
           {
                 _entries.remove(index);                 _entries.remove(index);
           }
         index--;         index--;
     }     }
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
   #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
     {     {
         AutoMutex automut(Monitor::_cout_mut);         AutoMutex automut(Monitor::_cout_mut);
         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);         PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
     }     }
   #endif
 } }
  
   #endif
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.103.10.16  
changed lines
  Added in v.1.103.10.27

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2