(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.100 and 1.103.10.7

version 1.100, 2005/12/02 18:25:52 version 1.103.10.7, 2006/06/12 20:12:09
Line 1 
Line 1 
 //%2005////////////////////////////////////////////////////////////////////////  //%2006////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
Line 8 
Line 8 
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 // EMC Corporation; VERITAS Software Corporation; The Open Group. // EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; Symantec Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 50 
Line 52 
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
  
   
   
   const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \ #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
Line 77 
Line 83 
  
 static AtomicInt _connections(0); static AtomicInt _connections(0);
  
   
   #ifdef PEGASUS_OS_TYPE_WINDOWS
    #define PIPE_INCREMENT 1
   #endif
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor // Monitor
Line 432 
Line 443 
           autoEntryMutex.unlock();           autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           autoEntryMutex.lock();           autoEntryMutex.lock();
             // After enqueue a message and the autoEntryMutex has been released and locked again,
             // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
             entries.reset(_entries);
        }        }
     }     }
  
Line 443 
Line 457 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
       //Array<HANDLE> pipeEventArray;
     PEGASUS_SOCKET maxSocketCurrentPass = 0;     PEGASUS_SOCKET maxSocketCurrentPass = 0;
     for( int indx = 0; indx < (int)entries.size(); indx++)      int indx;
   
   
   #ifdef PEGASUS_OS_TYPE_WINDOWS
   
       //This array associates named pipe connections to their place in [indx]
       //in the entries array. The value in poition zero of the array is the
       //index of the fist named pipe connection in the entries array
       Array <Uint32> indexPipeCountAssociator;
       int pipeEntryCount=0;
       int MaxPipes = PIPE_INCREMENT;
       HANDLE* hEvents = new HANDLE[PIPE_INCREMENT];
   
   #endif
   
       for( indx = 0; indx < (int)entries.size(); indx++)
       {
   
   
   #ifdef PEGASUS_OS_TYPE_WINDOWS
          if(entries[indx].isNamedPipeConnection())
          {
   
              //entering this clause mean that a Named Pipe connection is at entries[indx]
              //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;
   
              //cout << "In Monitor::run - pipe being added to array is " << entries[indx].namedPipe.getName() << endl;
   
               entries[indx].pipeSet = false;
   
              // We can Keep a counter in the Monitor class for the number of named pipes ...
              //  Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )
               if (pipeEntryCount >= MaxPipes)
               {
                  // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' begining - pipeEntryCount=" <<
                      // pipeEntryCount << " MaxPipes=" << MaxPipes << endl;
                    MaxPipes += PIPE_INCREMENT;
                    HANDLE* temp_hEvents = new HANDLE[MaxPipes];
   
                    for (Uint32 i =0;i<pipeEntryCount;i++)
     {     {
                        temp_hEvents[i] = hEvents[i];
                    }
   
                    delete [] hEvents;
   
                    hEvents = temp_hEvents;
                   // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' ending"<< endl;
   
               }
   
              //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);
              hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap().hEvent;
   
              indexPipeCountAssociator.append(indx);
   
       pipeEntryCount++;
   
   
   
             // cout << "Monitor::run pipeEntrycount is " << pipeEntryCount <<
             // " this is the type " << entries[indx]._type << " this is index " << indx << endl;
   
          }
          else
   
   #endif
          {
   
        if(maxSocketCurrentPass < entries[indx].socket)        if(maxSocketCurrentPass < entries[indx].socket)
             maxSocketCurrentPass = entries[indx].socket;             maxSocketCurrentPass = entries[indx].socket;
  
Line 454 
Line 536 
            _idleEntries++;            _idleEntries++;
            FD_SET(entries[indx].socket, &fdread);            FD_SET(entries[indx].socket, &fdread);
        }        }
   
          }
     }     }
  
     /*     /*
Line 470 
Line 554 
     // and a socket value have the same type.  On Windows they do not.     // and a socket value have the same type.  On Windows they do not.
     //     //
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     int events = select(0, &fdread, NULL, NULL, &tv);      //int events = select(0, &fdread, NULL, NULL, &tv);
        int events = 0;
           DWORD dwWait=NULL;
       int pEvents = 0;
   
    //       cout << "events after select" << events << endl;
       cout << "Calling WaitForMultipleObjects\n";
   
       //this should be in a try block
   
       dwWait = WaitForMultipleObjects(MaxPipes,
                                      hEvents,      //ABB:- array of event objects
                                      FALSE,        // ABB:-does not wait for all
                                      20000);        //ABB:- timeout value
   
       if(dwWait == WAIT_TIMEOUT)
           {
           cout << "Wait WAIT_TIMEOUT\n";
   
                      // Sleep(2000);
               //continue;
   
                //return false;  // I think we do nothing.... Mybe there is a socket connection... so
                // cant return.
           }
           else if (dwWait == WAIT_FAILED)
           {
               cout << "Wait Failed returned\n";
               cout << "failed with " << GetLastError() << "." << endl;
               pEvents = -1;
               return false;
           }
           else
           {
               int pCount = dwWait - WAIT_OBJECT_0;  // determines which pipe
              // cout << " WaitForMultiPleObject returned activity on server pipe: "<<
              //     pCount<< endl;
   
               pEvents = 1;
   
               //this statment gets the pipe entry that was trigered
               entries[indexPipeCountAssociator[pCount]].pipeSet = true;
   
               if (pCount > 0) //this means activity on pipe is CIMOperation reques
               {
          //         cout << "In Monitor::run got Operation request" << endl;
                   //entries[indx]._type = Monitor::CONNECTION;
               }
               else //this clause my not be needed in production but is used for testing
               {
            //     cout << "In Monitor::run got Connection request" << endl;
   
               }
   
           }
                   //
   
   
      // Sleep(2000);
   
       //int events = 1;
       /*if (dwWait)
       {
           cout << "in Monitor::run about to call handlePipeConnectionEvent" << endl;
           _handlePipeConnectionEvent(dwWait);
       }*/
 #else #else
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
     autoEntryMutex.lock();     autoEntryMutex.lock();
       // After enqueue a message and the autoEntryMutex has been released and locked again,
       // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
       entries.reset(_entries);
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
       if(pEvents == -1)
       {
           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
             "Monitor::run - errorno = %d has occurred on select.",GetLastError() );
          // The EBADF error indicates that one or more or the file
          // descriptions was not valid. This could indicate that
          // the entries structure has been corrupted or that
          // we have a synchronization error.
   
           // We need to generate an assert  here...
          PEGASUS_ASSERT(GetLastError()!= EBADF);
   
   
       }
   
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
 #else #else
     if(events == -1)     if(events == -1)
 #endif #endif
     {     {
   
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
Line 491 
Line 659 
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
     }     }
     else if (events)      else if ((events)||(pEvents))
     {     {
   
        //  cout << "IN Monior::run 'else if (events)' clause - array size is " <<
        //       (int)entries.size() << endl;
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)entries.size(); indx++)        for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
              //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl;
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&            if(((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(entries[indx].socket, &fdread)))               FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||
                (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents)))
             {
                 MessageQueue *q;
                 cout << "IN Monior::run inside - for int indx = " <<indx <<
                     "and queue ID is " << entries[indx].queueId << endl;
                 try{
   
                    q = MessageQueue::lookup(entries[indx].queueId);
                 }
                catch (Exception e)
           {           {
              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                   cout << " this is what lookup gives - " << e.getMessage() << endl;
                    exit(1);
                }
                catch(...)
                {
                    cout << "MessageQueue::lookup gives strange exception " << endl;
                    exit(1);
                }
   
   
   
                cout << "Monitor::run after MessageQueue::lookup(entries[indx].queueId)" << endl;
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, entries[indx].queueId, q);                   indx, entries[indx].queueId, q);
                cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                     cout <<" this is the type " << entries[indx]._type <<
                         "for index " << indx << endl;
                  cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;
                 if(entries[indx]._type == Monitor::CONNECTION)                 if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                       cout << "In Monitor::run Monitor::CONNECTION clause" << endl;
   
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
Line 537 
Line 736 
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
   
                      /*In the case of named Pipes, the request has already been read from the pipe
                      therefor this section passed the request data to the HTTPConnection
                      NOTE: not sure if this would be better suited in a sparate private method
                      */
                      dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needd
                      cout << "In Monitor::run before dst->run number of bytes read is " <<
                          entries[indx].namedPipe.bytesRead << endl;
   
                    try                    try
                    {                    {
                          cout << "In Monitor::run about to call 'dst->run(1)' "  << endl;
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
Line 573 
Line 782 
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
               cout << " in - entries[indx]._type == Monitor::INTERNAL- " << endl;
  
                         entries[indx]._status = _MonitorEntry::BUSY;                         entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
Line 583 
Line 793 
                 }                 }
                 else                 else
                 {                 {
               cout << "In Monitor::run else clause of CONNECTION if statments" << endl;
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
                    int events = 0;                    int events = 0;
              Message *msg;
              cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;
   
              if (entries[indx].isNamedPipeConnection())
              {
                  if(!entries[indx].namedPipe.isConnectionPipe)
                  { /*if we enter this clasue it means that the named pipe that we are
                      looking at has recived a connection but is not the pipe we get connection requests over.
                      therefore we neew to change the _type to CONNECTION and wait for a CIM Operations request*/
                      entries[indx]._type = Monitor::CONNECTION;
   
   
                        /* This is a test  - this shows that the read file needs to be done
        before we call wiatForMultipleObjects*/
       /******************************************************
       ********************************************************/
   
                   //DWORD size = 0;
   
           BOOL rc = ::ReadFile(
                   entries[indx].namedPipe.getPipe(),
                   &entries[indx].namedPipe.raw,
                   MAX_BUFFER_SIZE,
                   &entries[indx].namedPipe.bytesRead,
                   &entries[indx].namedPipe.getOverlap());
   
           cout << "just called read on index " << indx << endl;
   
            //&entries[indx].namedPipe.bytesRead = &size;
           if(!rc)
           {
   
              cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;
   
           }
   
   
   
       /******************************************************
       ********************************************************/
   
   
   
   
                      continue;
   
                  }
                  cout << " In Monitor::run about to create a Pipe message" << endl;
                  events |= NamedPipeMessage::READ;
                  msg = new NamedPipeMessage(entries[indx].namedPipe, events);
              }
              else
              {
                  cout << " In Monitor::run ..its a socket message" << endl;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(entries[indx].socket, events);                         msg = new SocketMessage(entries[indx].socket, events);
              }
   
                    entries[indx]._status = _MonitorEntry::BUSY;                    entries[indx]._status = _MonitorEntry::BUSY;
                    autoEntryMutex.unlock();                    autoEntryMutex.unlock();
                    q->enqueue(msg);                    q->enqueue(msg);
                    autoEntryMutex.lock();                    autoEntryMutex.lock();
              // After enqueue a message and the autoEntryMutex has been released and locked again,
              // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
              entries.reset(_entries);
                    entries[indx]._status = _MonitorEntry::IDLE;                    entries[indx]._status = _MonitorEntry::IDLE;
  
                    return true;                    return true;
Line 620 
Line 890 
       // Wait for the monitor to notice _stopConnections.  Otherwise the       // Wait for the monitor to notice _stopConnections.  Otherwise the
       // caller of this function may unbind the ports while the monitor       // caller of this function may unbind the ports while the monitor
       // is still accepting connections on them.       // is still accepting connections on them.
       try        _stopConnectionsSem.wait();
         {  
           _stopConnectionsSem.time_wait(10000);  
         }  
       catch (TimeOut &)  
         {  
           // The monitor is probably busy processng a very long request, and is  
           // not accepting connections.  Let the caller unbind the ports.  
         }  
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 753 
Line 1015 
    return 0;    return 0;
 } }
  
   
   //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes
   int  Monitor::solicitPipeMessages(
       NamedPipe namedPipe,
       Uint32 events,  //not sure what has to change for this enum
       Uint32 queueId,
       int type)
   {
      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
      AutoMutex autoMut(_entry_mut);
      // Check to see if we need to dynamically grow the _entries array
      // We always want the _entries array to 2 bigger than the
      // current connections requested
      PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
   
   
   
      _solicitSocketCount++;  // bump the count
      int size = (int)_entries.size();
      if((int)_solicitSocketCount >= (size-1)){
           for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
                   _MonitorEntry entry(0, 0, 0);
                   _entries.append(entry);
           }
      }
   
      int index;
      for(index = 1; index < (int)_entries.size(); index++)
      {
         try
         {
            if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
            {
               _entries[index].socket = NULL;
               _entries[index].namedPipe = namedPipe;
               _entries[index].namedPipeConnection = true;
               _entries[index].queueId  = queueId;
               _entries[index]._type = type;
               _entries[index]._status = _MonitorEntry::IDLE;
   
               PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);
   
               return index;
            }
         }
         catch(...)
         {
         }
   
      }
      _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
      PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
   
      PEG_METHOD_EXIT();
      return -1;
   
   }
   
   
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.100  
changed lines
  Added in v.1.103.10.7

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2