(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
   2 mike  1.2   //
   3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mike  1.2   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.103 // 
  21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             // Author: Mike Brasher (mbrasher@bmc.com)
  33             //
  34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
  35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
  36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
  37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
  38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
  39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
  40 mike             1.2   //
  41                        //%/////////////////////////////////////////////////////////////////////////////
  42                        
  43                        #include <Pegasus/Common/Config.h>
  44 mday             1.40  
  45 mike             1.2   #include <cstring>
  46                        #include "Monitor.h"
  47                        #include "MessageQueue.h"
  48                        #include "Socket.h"
  49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
  50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
  51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
  52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
  53 mike             1.100 #include "ArrayIterator.h"
  54 mike             1.2   
  55 w.white          1.103.10.4 
  56                             
  57                             const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes
  58                             
  59 mike             1.2        #ifdef PEGASUS_OS_TYPE_WINDOWS
  60                             # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
  61                             #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
  62                             of <winsock.h>. It may have been indirectly included (e.g., by including \
  63 david.dillard    1.95       <windows.h>). Find inclusion of that header which is visible to this \
  64 mike             1.2        compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
  65                             otherwise, less than 64 clients (the default) will be able to connect to the \
  66                             CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
  67 mday             1.5        
  68 mike             1.2        # endif
  69                             # define FD_SETSIZE 1024
  70 mday             1.5        # include <windows.h>
  71 mike             1.2        #else
  72                             # include <sys/types.h>
  73                             # include <sys/socket.h>
  74                             # include <sys/time.h>
  75                             # include <netinet/in.h>
  76                             # include <netdb.h>
  77                             # include <arpa/inet.h>
  78                             #endif
  79                             
  80                             PEGASUS_USING_STD;
  81                             
  82                             PEGASUS_NAMESPACE_BEGIN
  83                             
  84 mike             1.96       static AtomicInt _connections(0);
  85 mday             1.25       
  86 w.white          1.103.10.1 
  87 j.alex           1.103.10.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
  88                              #define PIPE_INCREMENT 1
  89                             #endif
  90 w.white          1.103.10.1 
  91 mike             1.2        ////////////////////////////////////////////////////////////////////////////////
  92                             //
  93                             // Monitor
  94                             //
  95                             ////////////////////////////////////////////////////////////////////////////////
  96                             
  97 kumpf            1.54       #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
  98 mike             1.2        Monitor::Monitor()
  99 kumpf            1.87          : _stopConnections(0),
 100 a.arora          1.73            _stopConnectionsSem(0),
 101 a.dunfey         1.76            _solicitSocketCount(0),
 102 a.dunfey         1.77            _tickle_client_socket(-1),
 103                                  _tickle_server_socket(-1),
 104                                  _tickle_peer_socket(-1)
 105 mike             1.2        {
 106 kumpf            1.54           int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 107 mike             1.2            Socket::initializeInterface();
 108 kumpf            1.54           _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 109 a.arora          1.73       
 110                                 // setup the tickler
 111                                 initializeTickler();
 112                             
 113 r.kieninger      1.83           // Start the count at 1 because initilizeTickler()
 114 a.arora          1.73           // has added an entry in the first position of the
 115                                 // _entries array
 116                                 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 117 mday             1.37           {
 118                                    _MonitorEntry entry(0, 0, 0);
 119                                    _entries.append(entry);
 120                                 }
 121 mday             1.18       }
 122 mday             1.20       
 123 mike             1.2        Monitor::~Monitor()
 124                             {
 125 kumpf            1.11           Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 126 a.dunfey         1.76       
 127                                 try{
 128 a.dunfey         1.77               if(_tickle_peer_socket >= 0)
 129 a.dunfey         1.76               {
 130                                         Socket::close(_tickle_peer_socket);
 131                                     }
 132 a.dunfey         1.77               if(_tickle_client_socket >= 0)
 133 a.dunfey         1.76               {
 134                                         Socket::close(_tickle_client_socket);
 135                                     }
 136 a.dunfey         1.77               if(_tickle_server_socket >= 0)
 137 a.dunfey         1.76               {
 138                                         Socket::close(_tickle_server_socket);
 139                                     }
 140                                 }
 141                                 catch(...)
 142                                 {
 143                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 144                                               "Failed to close tickle sockets");
 145                                 }
 146                             
 147 mike             1.2            Socket::uninitializeInterface();
 148 kumpf            1.11           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 149                                               "returning from monitor destructor");
 150 mday             1.18       }
 151                             
 152 a.arora          1.73       void Monitor::initializeTickler(){
 153 r.kieninger      1.83           /*
 154                                    NOTE: On any errors trying to
 155                                          setup out tickle connection,
 156 a.arora          1.73                    throw an exception/end the server
 157                                 */
 158                             
 159                                 /* setup the tickle server/listener */
 160                             
 161                                 // get a socket for the server side
 162 david.dillard    1.95           if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
 163 a.arora          1.73       	//handle error
 164                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
 165                             				 "Received error number $0 while creating the internal socket.",
 166                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 167                             				 errno);
 168                             #else
 169                             				 WSAGetLastError());
 170                             #endif
 171                             	throw Exception(parms);
 172                                 }
 173                             
 174                                 // initialize the address
 175                                 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 176                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 177                             #pragma convert(37)
 178                             #endif
 179                                 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 180                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 181                             #pragma convert(0)
 182                             #endif
 183                                 _tickle_server_addr.sin_family = PF_INET;
 184 a.arora          1.73           _tickle_server_addr.sin_port = 0;
 185                             
 186 kumpf            1.86           PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
 187 a.arora          1.73       
 188                                 // bind server side to socket
 189                                 if((::bind(_tickle_server_socket,
 190 kumpf            1.88                      reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
 191 a.arora          1.73       	       sizeof(_tickle_server_addr))) < 0){
 192                             	// handle error
 193 r.kieninger      1.83       #ifdef PEGASUS_OS_ZOS
 194                                 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
 195                             				 "Received error:$0 while binding the internal socket.",strerror(errno));
 196                             #else
 197 a.arora          1.73       	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
 198                             				 "Received error number $0 while binding the internal socket.",
 199                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 200                             				 errno);
 201                             #else
 202                             				 WSAGetLastError());
 203                             #endif
 204 r.kieninger      1.83       #endif
 205 a.arora          1.73               throw Exception(parms);
 206                                 }
 207                             
 208                                 // tell the kernel we are a server
 209                                 if((::listen(_tickle_server_socket,3)) < 0){
 210                             	// handle error
 211                             	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
 212                             			 "Received error number $0 while listening to the internal socket.",
 213                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 214                             				 errno);
 215                             #else
 216                             				 WSAGetLastError());
 217                             #endif
 218                             	throw Exception(parms);
 219                                 }
 220 r.kieninger      1.83       
 221 a.arora          1.73           // make sure we have the correct socket for our server
 222                                 int sock = ::getsockname(_tickle_server_socket,
 223 kumpf            1.88                          reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
 224                                                &_addr_size);
 225 a.arora          1.73           if(sock < 0){
 226                             	// handle error
 227                             	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
 228                             			 "Received error number $0 while getting the internal socket name.",
 229                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 230                             				 errno);
 231                             #else
 232                             				 WSAGetLastError());
 233                             #endif
 234                             	throw Exception(parms);
 235                                 }
 236                             
 237                                 /* set up the tickle client/connector */
 238 r.kieninger      1.83       
 239 a.arora          1.73           // get a socket for our tickle client
 240 david.dillard    1.95           if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
 241 a.arora          1.73       	// handle error
 242                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
 243                             			 "Received error number $0 while creating the internal client socket.",
 244                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 245                             				 errno);
 246                             #else
 247                             				 WSAGetLastError());
 248                             #endif
 249                             	throw Exception(parms);
 250                                 }
 251                             
 252                                 // setup the address of the client
 253                                 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 254                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 255                             #pragma convert(37)
 256                             #endif
 257                                 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 258                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 259                             #pragma convert(0)
 260                             #endif
 261                                 _tickle_client_addr.sin_family = PF_INET;
 262 a.arora          1.73           _tickle_client_addr.sin_port = 0;
 263                             
 264                                 // bind socket to client side
 265                                 if((::bind(_tickle_client_socket,
 266 kumpf            1.88                      reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
 267 a.arora          1.73       	       sizeof(_tickle_client_addr))) < 0){
 268                             	// handle error
 269                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
 270                             			 "Received error number $0 while binding the internal client socket.",
 271                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 272                             				 errno);
 273                             #else
 274                             				 WSAGetLastError());
 275                             #endif
 276                             	throw Exception(parms);
 277                                 }
 278                             
 279                                 // connect to server side
 280                                 if((::connect(_tickle_client_socket,
 281 kumpf            1.88                         reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
 282 a.arora          1.73       		  sizeof(_tickle_server_addr))) < 0){
 283                             	// handle error
 284                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
 285                             			 "Received error number $0 while connecting the internal client socket.",
 286                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 287                             				 errno);
 288                             #else
 289                             				 WSAGetLastError());
 290                             #endif
 291                             	throw Exception(parms);
 292                                 }
 293                             
 294                                 /* set up the slave connection */
 295                                 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
 296 kumpf            1.86           PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
 297 r.kieninger      1.83           pegasus_sleep(1);
 298 a.arora          1.73       
 299                                 // this call may fail, we will try a max of 20 times to establish this peer connection
 300                                 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
 301 kumpf            1.88                   reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
 302                                         &peer_size)) < 0){
 303 a.arora          1.73       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 304                                     // Only retry on non-windows platforms.
 305                                     if(_tickle_peer_socket == -1 && errno == EAGAIN)
 306                                     {
 307 r.kieninger      1.83                 int retries = 0;
 308 a.arora          1.73                 do
 309                                       {
 310                                         pegasus_sleep(1);
 311                                         _tickle_peer_socket = ::accept(_tickle_server_socket,
 312 kumpf            1.88                       reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
 313                                             &peer_size);
 314 a.arora          1.73                   retries++;
 315                                       } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
 316                                     }
 317                             #endif
 318                                 }
 319                                 if(_tickle_peer_socket == -1){
 320                             	// handle error
 321                             	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
 322                             			 "Received error number $0 while accepting the internal socket connection.",
 323                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 324                             				 errno);
 325                             #else
 326                             				 WSAGetLastError());
 327                             #endif
 328                             	throw Exception(parms);
 329                                 }
 330                                 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
 331                                 // checks entries with IDLE state for events
 332                                 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
 333                                 entry._status = _MonitorEntry::IDLE;
 334                                 _entries.append(entry);
 335 a.arora          1.73       }
 336                             
 337                             void Monitor::tickle(void)
 338                             {
 339 sushma.fernandes 1.78           static char _buffer[] =
 340 a.arora          1.73           {
 341                                   '0','0'
 342                                 };
 343 r.kieninger      1.83       
 344 sushma.fernandes 1.78           AutoMutex autoMutex(_tickle_mutex);
 345 r.kieninger      1.83           Socket::disableBlocking(_tickle_client_socket);
 346 sushma.fernandes 1.78           Socket::write(_tickle_client_socket,&_buffer, 2);
 347 r.kieninger      1.83           Socket::enableBlocking(_tickle_client_socket);
 348 sushma.fernandes 1.78       }
 349                             
 350                             void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
 351                             {
 352                                 // Set the state to requested state
 353                                 _entries[index]._status = status;
 354 a.arora          1.73       }
 355                             
 356 mike             1.2        Boolean Monitor::run(Uint32 milliseconds)
 357                             {
 358 mday             1.18       
 359 mday             1.25           Boolean handled_events = false;
 360 a.arora          1.73           int i = 0;
 361 r.kieninger      1.83       
 362 kumpf            1.36           struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
 363 a.arora          1.73       
 364 mday             1.25           fd_set fdread;
 365                                 FD_ZERO(&fdread);
 366 a.arora          1.73       
 367 kumpf            1.94           AutoMutex autoEntryMutex(_entry_mut);
 368 r.kieninger      1.83       
 369 mike             1.100          ArrayIterator<_MonitorEntry> entries(_entries);
 370                             
 371 r.kieninger      1.83           // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
 372 mike             1.96           if (_stopConnections.get() == 1)
 373 kumpf            1.48           {
 374 mike             1.100              for ( int indx = 0; indx < (int)entries.size(); indx++)
 375 kumpf            1.48               {
 376 mike             1.100                  if (entries[indx]._type == Monitor::ACCEPTOR)
 377 kumpf            1.48                   {
 378 mike             1.100                      if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
 379 kumpf            1.48                       {
 380 mike             1.100                         if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
 381                                                     entries[indx]._status.get() == _MonitorEntry::DYING )
 382 kumpf            1.48                          {
 383                                                    // remove the entry
 384 mike             1.100      		       entries[indx]._status = _MonitorEntry::EMPTY;
 385 kumpf            1.48                          }
 386                                                else
 387                                                {
 388                                                    // set status to DYING
 389 mike             1.100                            entries[indx]._status = _MonitorEntry::DYING;
 390 kumpf            1.48                          }
 391                                            }
 392                                        }
 393                                     }
 394                                     _stopConnections = 0;
 395 a.arora          1.73       	_stopConnectionsSem.signal();
 396 kumpf            1.48           }
 397 kumpf            1.51       
 398 mike             1.100          for( int indx = 0; indx < (int)entries.size(); indx++)
 399 kumpf            1.68           {
 400 mike             1.100      			 const _MonitorEntry &entry = entries[indx];
 401 mike             1.96              if ((entry._status.get() == _MonitorEntry::DYING) &&
 402 brian.campbell   1.80       					 (entry._type == Monitor::CONNECTION))
 403 kumpf            1.68              {
 404 brian.campbell   1.80                 MessageQueue *q = MessageQueue::lookup(entry.queueId);
 405 kumpf            1.68                 PEGASUS_ASSERT(q != 0);
 406 brian.campbell   1.80                 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
 407 r.kieninger      1.83       
 408 brian.campbell   1.80       					if (h._connectionClosePending == false)
 409                             						continue;
 410                             
 411                             					// NOTE: do not attempt to delete while there are pending responses
 412 r.kieninger      1.83       					// coming thru. The last response to come thru after a
 413 brian.campbell   1.80       					// _connectionClosePending will reset _responsePending to false
 414                             					// and then cause the monitor to rerun this code and clean up.
 415                             					// (see HTTPConnection.cpp)
 416                             
 417                             					if (h._responsePending == true)
 418                             					{
 419                             						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
 420                             													"Ignoring connection delete request because "
 421                             													"responses are still pending. "
 422 r.kieninger      1.83       													"connection=0x%p, socket=%d\n",
 423 brian.campbell   1.81       													(void *)&h, h.getSocket());
 424 brian.campbell   1.80       						continue;
 425                             					}
 426                             					h._connectionClosePending = false;
 427                                       MessageQueue &o = h.get_owner();
 428                                       Message* message= new CloseConnectionMessage(entry.socket);
 429 kumpf            1.68                 message->dest = o.getQueueId();
 430                             
 431 r.kieninger      1.83                 // HTTPAcceptor is responsible for closing the connection.
 432 kumpf            1.68                 // The lock is released to allow HTTPAcceptor to call
 433 r.kieninger      1.83                 // unsolicitSocketMessages to free the entry.
 434 kumpf            1.68                 // Once HTTPAcceptor completes processing of the close
 435                                       // connection, the lock is re-requested and processing of
 436                                       // the for loop continues.  This is safe with the current
 437 mike             1.100                // implementation of the entries object.  Note that the
 438                                       // loop condition accesses the entries.size() on each
 439 kumpf            1.68                 // iteration, so that a change in size while the mutex is
 440                                       // unlocked will not result in an ArrayIndexOutOfBounds
 441                                       // exception.
 442                             
 443 kumpf            1.94                 autoEntryMutex.unlock();
 444 kumpf            1.68                 o.enqueue(message);
 445 kumpf            1.94                 autoEntryMutex.lock();
 446 r.kieninger      1.102                // After enqueue a message and the autoEntryMutex has been released and locked again,
 447                                       // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
 448                                       entries.reset(_entries);
 449 kumpf            1.68              }
 450                                 }
 451                             
 452 kumpf            1.51           Uint32 _idleEntries = 0;
 453 r.kieninger      1.83       
 454 a.arora          1.73           /*
 455 david.dillard    1.95               We will keep track of the maximum socket number and pass this value
 456                                     to the kernel as a parameter to SELECT.  This loop seems like a good
 457                                     place to calculate the max file descriptor (maximum socket number)
 458                                     because we have to traverse the entire array.
 459 r.kieninger      1.83           */
 460 w.white          1.103.10.1     //Array<HANDLE> pipeEventArray;
 461 j.alex           1.103.10.2         PEGASUS_SOCKET maxSocketCurrentPass = 0;
 462 w.white          1.103.10.1     int indx;
 463                             
 464 j.alex           1.103.10.2 
 465                             #ifdef PEGASUS_OS_TYPE_WINDOWS
 466                             
 467 w.white          1.103.10.1     //This array associates named pipe connections to their place in [indx]
 468                                 //in the entries array. The value in poition zero of the array is the
 469                                 //index of the fist named pipe connection in the entries array
 470                                 Array <Uint32> indexPipeCountAssociator;
 471 j.alex           1.103.10.2     int pipeEntryCount=0; 
 472                                 int MaxPipes = PIPE_INCREMENT;
 473                                 HANDLE* hEvents = new HANDLE[PIPE_INCREMENT]; 
 474                             
 475                             #endif
 476 w.white          1.103.10.1 
 477                                 for( indx = 0; indx < (int)entries.size(); indx++)
 478 mike             1.2            {
 479 a.arora          1.73       
 480 j.alex           1.103.10.2 
 481                             #ifdef PEGASUS_OS_TYPE_WINDOWS
 482                                    if(entries[indx].isNamedPipeConnection())
 483                                    {
 484                                        //entering this clause mean that a Named Pipe connection is at entries[indx]
 485 w.white          1.103.10.1            //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;
 486 j.alex           1.103.10.2 
 487 w.white          1.103.10.1            cout << "In Monitor::run - pipe being added to array is " <<
 488                                                entries[indx].namedPipe.getName() << endl;
 489                                        cout << " overlap event is "<<
 490                                             (int) (entries[indx].namedPipe.getOverlap()).hEvent << endl;
 491 j.alex           1.103.10.2 
 492                                         entries[indx].pipeSet = false;
 493                                        // We can Keep a counter in the Monitor class for the number of named pipes ...
 494                                        //  Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )
 495                                         if (pipeEntryCount >= MaxPipes)
 496                                         {
 497                                              MaxPipes += PIPE_INCREMENT;
 498                                              HANDLE* temp_hEvents = new HANDLE[MaxPipes]; 
 499                                              for (Uint32 i =0;i<pipeEntryCount;i++)
 500                                              {
 501                                                  temp_hEvents[i] = hEvents[i];
 502                                              }
 503                                              delete [] hEvents;
 504                                              hEvents = temp_hEvents;
 505                                         }         
 506                             
 507 w.white          1.103.10.1            //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);
 508 j.alex           1.103.10.2            hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap().hEvent;
 509 w.white          1.103.10.4            
 510 w.white          1.103.10.1            indexPipeCountAssociator.append(indx);
 511 w.white          1.103.10.4     
 512                                 pipeEntryCount++;
 513                             
 514                             
 515                             
 516                                        cout << "Monitor::run pipeEntrycount is " << pipeEntryCount <<
 517                                        " this is the type " << entries[indx]._type << " this is index " << indx << endl; 
 518 w.white          1.103.10.1 
 519 mday             1.25              }
 520 j.alex           1.103.10.2        else
 521                                   
 522                             #endif
 523                                    {
 524                                    
 525                                        if(maxSocketCurrentPass < entries[indx].socket)
 526                                         maxSocketCurrentPass = entries[indx].socket;
 527                             
 528                                        if(entries[indx]._status.get() == _MonitorEntry::IDLE)
 529                                        {
 530                                            _idleEntries++;
 531                                            FD_SET(entries[indx].socket, &fdread);
 532                                        }
 533                             
 534                                    }
 535                               }
 536 s.hills          1.62       
 537 a.arora          1.73           /*
 538 david.dillard    1.95               Add 1 then assign maxSocket accordingly. We add 1 to account for
 539                                     descriptors starting at 0.
 540 a.arora          1.73           */
 541                                 maxSocketCurrentPass++;
 542                             
 543 kumpf            1.94           autoEntryMutex.unlock();
 544 david.dillard    1.95       
 545                                 //
 546                                 // The first argument to select() is ignored on Windows and it is not
 547                                 // a socket value.  The original code assumed that the number of sockets
 548                                 // and a socket value have the same type.  On Windows they do not.
 549                                 //
 550                             #ifdef PEGASUS_OS_TYPE_WINDOWS
 551 w.white          1.103.10.1     //int events = select(0, &fdread, NULL, NULL, &tv);
 552 j.alex           1.103.10.2      int events = 0;
 553                                     DWORD dwWait=NULL;
 554                                 int pEvents = 0;
 555 w.white          1.103.10.1 
 556 j.alex           1.103.10.2         cout << "events after select" << events << endl;
 557 w.white          1.103.10.1     cout << "Calling WaitForMultipleObjects\n";
 558                             
 559                                 //this should be in a try block
 560                             
 561                                 dwWait = WaitForMultipleObjects(MaxPipes,    
 562                                                                hEvents,      //ABB:- array of event objects
 563                                                                FALSE,        // ABB:-does not wait for all
 564                                                                20000);        //ABB:- timeout value
 565                             
 566                                 if(dwWait == WAIT_TIMEOUT)
 567                                     {
 568                                     cout << "Wait WAIT_TIMEOUT\n";
 569 j.alex           1.103.10.2 
 570                                                // Sleep(2000);
 571 w.white          1.103.10.1             //continue; 
 572 j.alex           1.103.10.2              
 573                                          //return false;  // I think we do nothing.... Mybe there is a socket connection... so 
 574                                          // cant return.
 575 w.white          1.103.10.1         }
 576                                     else if (dwWait == WAIT_FAILED)
 577                                     {
 578                                         cout << "Wait Failed returned\n";
 579                                         cout << "failed with " << GetLastError() << "." << endl;
 580 j.alex           1.103.10.2             pEvents = -1;
 581 w.white          1.103.10.3             return false;
 582 w.white          1.103.10.1         }
 583                                     else
 584                                     {
 585                                         int pCount = dwWait - WAIT_OBJECT_0;  // determines which pipe
 586                                         cout << " WaitForMultiPleObject returned activity on server pipe: "<< 
 587                                             pCount<< endl;
 588                             
 589 j.alex           1.103.10.2             pEvents = 1;
 590 w.white          1.103.10.1 
 591                                         //this statment gets the pipe entry that was trigered
 592                                         entries[indexPipeCountAssociator[pCount]].pipeSet = true;
 593                                         
 594                                         if (pCount > 0) //this means activity on pipe is CIMOperation reques
 595                                         {
 596                                             cout << "In Monitor::run got Operation request" << endl;
 597                                             //entries[indx]._type = Monitor::CONNECTION;
 598                                         }
 599                                         else //this clause my not be needed in production but is used for testing
 600                                         {
 601                                           cout << "In Monitor::run got Connection request" << endl;
 602                                           
 603                                         }
 604                             
 605                                     }
 606                                             //
 607                             
 608 j.alex           1.103.10.2     
 609 w.white          1.103.10.1    // Sleep(2000);
 610                             
 611                                 //int events = 1;
 612                                 /*if (dwWait)
 613                                 {
 614                                     cout << "in Monitor::run about to call handlePipeConnectionEvent" << endl;
 615                                     _handlePipeConnectionEvent(dwWait);
 616                                 }*/
 617 david.dillard    1.95       #else
 618 a.arora          1.73           int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 619 david.dillard    1.95       #endif
 620 kumpf            1.94           autoEntryMutex.lock();
 621 r.kieninger      1.102          // After enqueue a message and the autoEntryMutex has been released and locked again,
 622                                 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
 623                                 entries.reset(_entries);
 624 w.white          1.103.10.1 
 625 mike             1.2        #ifdef PEGASUS_OS_TYPE_WINDOWS
 626 j.alex           1.103.10.2     if(pEvents == -1)
 627                                 {
 628                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 629                                       "Monitor::run - errorno = %d has occurred on select.",GetLastError() );
 630                                    // The EBADF error indicates that one or more or the file
 631                                    // descriptions was not valid. This could indicate that
 632                                    // the entries structure has been corrupted or that
 633                                    // we have a synchronization error.
 634                             
 635                                     // We need to generate an assert  here...
 636                                    PEGASUS_ASSERT(GetLastError()!= EBADF);
 637                             
 638                             
 639                                 }
 640                                
 641 kumpf            1.50           if(events == SOCKET_ERROR)
 642 mike             1.2        #else
 643 kumpf            1.50           if(events == -1)
 644 mike             1.2        #endif
 645 mday             1.13           {
 646 j.alex           1.103.10.2 
 647                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 648 kumpf            1.50                 "Monitor::run - errorno = %d has occurred on select.", errno);
 649                                    // The EBADF error indicates that one or more or the file
 650                                    // descriptions was not valid. This could indicate that
 651 mike             1.100             // the entries structure has been corrupted or that
 652 kumpf            1.50              // we have a synchronization error.
 653                             
 654                                    PEGASUS_ASSERT(errno != EBADF);
 655                                 }
 656 j.alex           1.103.10.2     else if ((events)||(pEvents))
 657 kumpf            1.50           {
 658 w.white          1.103.10.1 
 659                                    cout << "IN Monior::run 'else if (events)' clause - array size is " <<
 660                                         (int)entries.size() << endl;
 661 kumpf            1.51              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 662 r.kieninger      1.83                 "Monitor::run select event received events = %d, monitoring %d idle entries",
 663 david.dillard    1.95                  events, _idleEntries);
 664 mike             1.100             for( int indx = 0; indx < (int)entries.size(); indx++)
 665 mday             1.25              {
 666 w.white          1.103.10.4            cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl; 
 667 kumpf            1.53                 // The Monitor should only look at entries in the table that are IDLE (i.e.,
 668                                       // owned by the Monitor).
 669 w.white          1.103.10.1           if(((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
 670 j.alex           1.103.10.2              FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||
 671                                          (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents)))
 672 david.dillard    1.95                 {
 673 w.white          1.103.10.4               MessageQueue *q;
 674                                           cout << "IN Monior::run inside - for int indx = " <<indx <<
 675                                               "and queue ID is " << entries[indx].queueId << endl;
 676                                           try{
 677                                         
 678                                              q = MessageQueue::lookup(entries[indx].queueId);
 679                                           }
 680                                          catch (Exception e)
 681                                          {
 682                                              cout << " this is what lookup gives - " << e.getMessage() << endl;
 683                                              exit(1);
 684                                          }
 685                                          catch(...)
 686                                          {
 687                                              cout << "MessageQueue::lookup gives strange exception " << endl;
 688                                              exit(1);
 689                                          }
 690                             
 691                             
 692                             
 693                                          cout << "Monitor::run after MessageQueue::lookup(entries[indx].queueId)" << endl;
 694 w.white          1.103.10.4               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 695 kumpf            1.53                         "Monitor::run indx = %d, queueId =  %d, q = %p",
 696 mike             1.100                        indx, entries[indx].queueId, q);
 697 w.white          1.103.10.4              cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;
 698 kumpf            1.53                    PEGASUS_ASSERT(q !=0);
 699 mday             1.37       
 700 david.dillard    1.95                    try
 701                                          {
 702 w.white          1.103.10.1                   cout <<" this is the type " << entries[indx]._type <<
 703                                                   "for index " << indx << endl;
 704                                            cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;
 705 mike             1.100                      if(entries[indx]._type == Monitor::CONNECTION)
 706 david.dillard    1.95                       {
 707 w.white          1.103.10.4                     cout << "In Monitor::run Monitor::CONNECTION clause" << endl; 
 708                             
 709 w.white          1.103.10.3                     continue;
 710                             
 711 w.white          1.103.10.4                                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 712 mike             1.100                           "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
 713 david.dillard    1.95                          static_cast<HTTPConnection *>(q)->_entry_index = indx;
 714 sushma.fernandes 1.78       
 715                                                // Do not update the entry just yet. The entry gets updated once
 716 r.kieninger      1.83                          // the request has been read.
 717 mike             1.100                         //entries[indx]._status = _MonitorEntry::BUSY;
 718 sushma.fernandes 1.78       
 719 kumpf            1.66                          // If allocate_and_awaken failure, retry on next iteration
 720 a.arora          1.73       /* Removed for PEP 183.
 721 kumpf            1.69                          if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
 722                                                        (void *)q, _dispatch))
 723 kumpf            1.67                          {
 724                                                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 725                                                       "Monitor::run: Insufficient resources to process request.");
 726 mike             1.100                            entries[indx]._status = _MonitorEntry::IDLE;
 727 kumpf            1.67                             return true;
 728                                                }
 729 a.arora          1.73       */
 730                             // Added for PEP 183
 731 david.dillard    1.95                          HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
 732                                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 733 a.arora          1.73                                "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 734                                                dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 735                                                try
 736                                                {
 737 w.white          1.103.10.3                        cout << "In Monitor::run about to call 'dst->run(1)' "  << endl;
 738 a.arora          1.73                              dst->run(1);
 739                                                }
 740 david.dillard    1.95                          catch (...)
 741                                                {
 742                                                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 743                                                    "Monitor::_dispatch: exception received");
 744                                                }
 745                                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 746 a.arora          1.73                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 747                             
 748 sushma.fernandes 1.78                          // It is possible the entry status may not be set to busy.
 749 r.kieninger      1.83                          // The following will fail in that case.
 750 mike             1.96          		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
 751 a.arora          1.73       		   // Once the HTTPConnection thread has set the status value to either
 752                             		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 753                             		   // to the Monitor.  It is no longer permissible to access the connection
 754                             		   // or the entry in the _entries table.
 755 sushma.fernandes 1.78       
 756                                                // The following is not relevant as the worker thread or the
 757                                                // reader thread will update the status of the entry.
 758                             		   //if (dst->_connectionClosePending)
 759 r.kieninger      1.83       		   //{
 760 sushma.fernandes 1.78       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 761                             		   //}
 762                             		   //else
 763                             		   //{
 764                             		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 765 r.kieninger      1.83       		   //}
 766                             // end Added for PEP 183
 767 a.arora          1.73       		}
 768 mike             1.100      	        else if( entries[indx]._type == Monitor::INTERNAL){
 769 r.kieninger      1.83       			// set ourself to BUSY,
 770                                                     // read the data
 771 a.arora          1.73                               // and set ourself back to IDLE
 772 w.white          1.103.10.1             cout << " in - entries[indx]._type == Monitor::INTERNAL- " << endl; 
 773 r.kieninger      1.83       
 774 mike             1.100      		   	entries[indx]._status = _MonitorEntry::BUSY;
 775 a.arora          1.73       			static char buffer[2];
 776 mike             1.100            			Socket::disableBlocking(entries[indx].socket);
 777                                   			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
 778                                   			Socket::enableBlocking(entries[indx].socket);
 779                             			entries[indx]._status = _MonitorEntry::IDLE;
 780 mday             1.37       		}
 781                             		else
 782 mday             1.25       		{
 783 w.white          1.103.10.1             cout << "In Monitor::run else clause of CONNECTION if statments" << endl;
 784 kumpf            1.51                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 785                                                  "Non-connection entry, indx = %d, has been received.", indx);
 786 mday             1.37       		   int events = 0;
 787 w.white          1.103.10.1            Message *msg;
 788 j.alex           1.103.10.2            cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;
 789 w.white          1.103.10.1 
 790                                        if (entries[indx].isNamedPipeConnection())
 791                                        {
 792 w.white          1.103.10.4                if(!entries[indx].namedPipe.isConnectionPipe)
 793                                            { /*if we enter this clasue it means that the named pipe that we are 
 794                                                looking at has recived a connection but is not the pipe we get connection requests over. 
 795                                                therefore we neew to change the _type to CONNECTION and wait for a CIM Operations request*/
 796                                                entries[indx]._type = Monitor::CONNECTION;
 797                             
 798                             
 799                                                  /* This is a test  - this shows that the read file needs to be done
 800                                  before we call wiatForMultipleObjects*/
 801                                 /******************************************************
 802                                 ********************************************************/
 803                                
 804                                     string raw(MAX_BUFFER_SIZE, string::value_type(0));
 805                                     DWORD size = 0;
 806                             
 807                                     BOOL rc = ::ReadFile(
 808                                             entries[indx].namedPipe.getPipe(),
 809                                             (void *)raw.data(),
 810                                             raw.size(),
 811                                             &size,
 812                                             &entries[indx].namedPipe.getOverlap());
 813 w.white          1.103.10.4         if(!rc)
 814                                     {
 815                             
 816                                        cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;
 817                             
 818                                     }
 819                             
 820                               
 821                             
 822                                 /******************************************************
 823                                 ********************************************************/
 824                             
 825                             
 826                             
 827                             
 828                                                continue;
 829                             
 830                                            }
 831 w.white          1.103.10.1                cout << " In Monitor::run about to create a Pipe message" << endl;
 832                                            events |= NamedPipeMessage::READ;
 833                                            msg = new NamedPipeMessage(entries[indx].namedPipe, events);
 834                                        }
 835                                        else
 836                                        {
 837 j.alex           1.103.10.2                cout << " In Monitor::run ..its a socket message" << endl;
 838 w.white          1.103.10.1                events |= SocketMessage::READ;
 839                             		       msg = new SocketMessage(entries[indx].socket, events);
 840                                        }
 841                             
 842 mike             1.100      		   entries[indx]._status = _MonitorEntry::BUSY;
 843 kumpf            1.94                          autoEntryMutex.unlock();
 844 mday             1.37       		   q->enqueue(msg);
 845 kumpf            1.94                          autoEntryMutex.lock();
 846 r.kieninger      1.102                 // After enqueue a message and the autoEntryMutex has been released and locked again,
 847                                        // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
 848                                        entries.reset(_entries);
 849 mike             1.100      		   entries[indx]._status = _MonitorEntry::IDLE;
 850 kumpf            1.94       
 851 mday             1.25       		   return true;
 852                             		}
 853                             	     }
 854 mday             1.37       	     catch(...)
 855 mday             1.25       	     {
 856                             	     }
 857                             	     handled_events = true;
 858                             	  }
 859                                    }
 860 mday             1.24           }
 861 kumpf            1.94       
 862 mday             1.13           return(handled_events);
 863 mike             1.2        }
 864                             
 865 chuck            1.74       void Monitor::stopListeningForConnections(Boolean wait)
 866 kumpf            1.48       {
 867                                 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
 868 r.kieninger      1.83           // set boolean then tickle the server to recognize _stopConnections
 869 kumpf            1.48           _stopConnections = 1;
 870 a.arora          1.73           tickle();
 871 kumpf            1.48       
 872 chuck            1.74           if (wait)
 873 a.arora          1.73           {
 874 chuck            1.74             // Wait for the monitor to notice _stopConnections.  Otherwise the
 875                                   // caller of this function may unbind the ports while the monitor
 876                                   // is still accepting connections on them.
 877 kumpf            1.101            _stopConnectionsSem.wait();
 878 a.arora          1.73           }
 879 r.kieninger      1.83       
 880 kumpf            1.48           PEG_METHOD_EXIT();
 881                             }
 882 mday             1.25       
 883 mday             1.37       
 884 mday             1.25       int  Monitor::solicitSocketMessages(
 885 david.dillard    1.95           PEGASUS_SOCKET socket,
 886 mike             1.2            Uint32 events,
 887 r.kieninger      1.83           Uint32 queueId,
 888 mday             1.8            int type)
 889 mike             1.2        {
 890 r.kieninger      1.83          PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
 891 alagaraja        1.75          AutoMutex autoMut(_entry_mut);
 892 a.arora          1.73          // Check to see if we need to dynamically grow the _entries array
 893                                // We always want the _entries array to 2 bigger than the
 894                                // current connections requested
 895                                _solicitSocketCount++;  // bump the count
 896                                int size = (int)_entries.size();
 897 w.otsuka         1.89          if((int)_solicitSocketCount >= (size-1)){
 898                                     for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
 899 a.arora          1.73                       _MonitorEntry entry(0, 0, 0);
 900                                             _entries.append(entry);
 901                                     }
 902                                }
 903 kumpf            1.4        
 904 a.arora          1.73          int index;
 905                                for(index = 1; index < (int)_entries.size(); index++)
 906 mday             1.25          {
 907 a.arora          1.73             try
 908 mday             1.37             {
 909 mike             1.96                if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
 910 a.arora          1.73                {
 911                                         _entries[index].socket = socket;
 912                                         _entries[index].queueId  = queueId;
 913                                         _entries[index]._type = type;
 914                                         _entries[index]._status = _MonitorEntry::IDLE;
 915 r.kieninger      1.83       
 916 a.arora          1.73                   return index;
 917                                      }
 918 mday             1.37             }
 919                                   catch(...)
 920 mday             1.25             {
 921                                   }
 922                                }
 923 a.arora          1.73          _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
 924 mday             1.25          PEG_METHOD_EXIT();
 925 kumpf            1.50          return -1;
 926 a.arora          1.73       
 927 mike             1.2        }
 928                             
 929 david.dillard    1.95       void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
 930 mike             1.2        {
 931 kumpf            1.50       
 932 mday             1.25           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
 933 alagaraja        1.75           AutoMutex autoMut(_entry_mut);
 934 a.arora          1.73       
 935                                 /*
 936                                     Start at index = 1 because _entries[0] is the tickle entry which never needs
 937                                     to be EMPTY;
 938                                 */
 939 w.otsuka         1.89           unsigned int index;
 940 a.arora          1.73           for(index = 1; index < _entries.size(); index++)
 941 mike             1.2            {
 942 mday             1.25              if(_entries[index].socket == socket)
 943                                    {
 944 a.arora          1.73                 _entries[index]._status = _MonitorEntry::EMPTY;
 945 david.dillard    1.95                 _entries[index].socket = PEGASUS_INVALID_SOCKET;
 946 a.arora          1.73                 _solicitSocketCount--;
 947                                       break;
 948 mday             1.25              }
 949 mike             1.2            }
 950 a.arora          1.73       
 951                                 /*
 952                             	Dynamic Contraction:
 953                             	To remove excess entries we will start from the end of the _entries array
 954                             	and remove all entries with EMPTY status until we find the first NON EMPTY.
 955                             	This prevents the positions, of the NON EMPTY entries, from being changed.
 956 r.kieninger      1.83           */
 957 a.arora          1.73           index = _entries.size() - 1;
 958 mike             1.96           while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
 959 a.arora          1.73       	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
 960                                             _entries.remove(index);
 961                             	index--;
 962                                 }
 963 kumpf            1.4            PEG_METHOD_EXIT();
 964 mike             1.2        }
 965                             
 966 a.arora          1.73       // Note: this is no longer called with PEP 183.
 967 mday             1.7        PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 968                             {
 969 mday             1.8           HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
 970 kumpf            1.51          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 971 kumpf            1.53               "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 972                                     dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 973 kumpf            1.51          try
 974                                {
 975                                   dst->run(1);
 976                                }
 977                                catch (...)
 978                                {
 979                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 980                                       "Monitor::_dispatch: exception received");
 981                                }
 982                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 983                                       "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 984 r.kieninger      1.83       
 985 mike             1.96          PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
 986 kumpf            1.68       
 987                                // Once the HTTPConnection thread has set the status value to either
 988                                // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 989                                // to the Monitor.  It is no longer permissible to access the connection
 990                                // or the entry in the _entries table.
 991 kumpf            1.50          if (dst->_connectionClosePending)
 992                                {
 993 kumpf            1.68             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 994                                }
 995                                else
 996                                {
 997                                   dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 998 kumpf            1.50          }
 999 mday             1.8           return 0;
1000 mday             1.40       }
1001                             
1002 w.white          1.103.10.1 
1003                             //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes
1004                             int  Monitor::solicitPipeMessages(
1005                                 NamedPipe namedPipe,
1006                                 Uint32 events,  //not sure what has to change for this enum
1007                                 Uint32 queueId,
1008                                 int type)
1009                             {
1010                                PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
1011                                AutoMutex autoMut(_entry_mut);
1012                                // Check to see if we need to dynamically grow the _entries array
1013                                // We always want the _entries array to 2 bigger than the
1014                                // current connections requested
1015                                PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
1016                             
1017                             
1018                                
1019                                _solicitSocketCount++;  // bump the count
1020                                int size = (int)_entries.size();
1021                                if((int)_solicitSocketCount >= (size-1)){
1022                                     for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
1023 w.white          1.103.10.1                 _MonitorEntry entry(0, 0, 0);
1024                                             _entries.append(entry);
1025                                     }
1026                                }
1027                             
1028                                int index;
1029                                for(index = 1; index < (int)_entries.size(); index++)
1030                                {
1031                                   try
1032                                   {
1033                                      if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
1034                                      {
1035                                         _entries[index].socket = NULL;
1036                                         _entries[index].namedPipe = namedPipe;
1037                                         _entries[index].namedPipeConnection = true;
1038                                         _entries[index].queueId  = queueId;
1039                                         _entries[index]._type = type;
1040                                         _entries[index]._status = _MonitorEntry::IDLE;
1041                             
1042                                         PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);
1043                             
1044 w.white          1.103.10.1             return index;
1045                                      }
1046                                   }
1047                                   catch(...)
1048                                   {
1049                                   }
1050                             
1051                                }
1052                                _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
1053                                PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
1054                             
1055                                PEG_METHOD_EXIT();
1056                                return -1;
1057                             
1058                             }
1059                             
1060                             
1061 mike             1.2        PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2