(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
   2 mike  1.2   //
   3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mike  1.2   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.103 // 
  21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             // Author: Mike Brasher (mbrasher@bmc.com)
  33             //
  34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
  35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
  36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
  37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
  38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
  39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
  40 mike             1.2   //
  41                        //%/////////////////////////////////////////////////////////////////////////////
  42                        
  43                        #include <Pegasus/Common/Config.h>
  44 mday             1.40  
  45 mike             1.2   #include <cstring>
  46                        #include "Monitor.h"
  47                        #include "MessageQueue.h"
  48                        #include "Socket.h"
  49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
  50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
  51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
  52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
  53 mike             1.100 #include "ArrayIterator.h"
  54 mike             1.2   
  55 w.white          1.103.10.4 
  56                             
  57                             const static DWORD MAX_BUFFER_SIZE = 4096;  // 4 kilobytes
  58                             
  59 mike             1.2        #ifdef PEGASUS_OS_TYPE_WINDOWS
  60                             # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
  61                             #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
  62                             of <winsock.h>. It may have been indirectly included (e.g., by including \
  63 david.dillard    1.95       <windows.h>). Find inclusion of that header which is visible to this \
  64 mike             1.2        compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
  65                             otherwise, less than 64 clients (the default) will be able to connect to the \
  66                             CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
  67 mday             1.5        
  68 mike             1.2        # endif
  69                             # define FD_SETSIZE 1024
  70 mday             1.5        # include <windows.h>
  71 mike             1.2        #else
  72                             # include <sys/types.h>
  73                             # include <sys/socket.h>
  74                             # include <sys/time.h>
  75                             # include <netinet/in.h>
  76                             # include <netdb.h>
  77                             # include <arpa/inet.h>
  78                             #endif
  79                             
  80                             PEGASUS_USING_STD;
  81                             
  82                             PEGASUS_NAMESPACE_BEGIN
  83                             
  84 mike             1.96       static AtomicInt _connections(0);
  85 mreddy           1.103.10.9 Mutex Monitor::_cout_mut;
  86 w.white          1.103.10.1 
  87 j.alex           1.103.10.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
  88                              #define PIPE_INCREMENT 1
  89                             #endif
  90 w.white          1.103.10.1 
  91 mike             1.2        ////////////////////////////////////////////////////////////////////////////////
  92                             //
  93                             // Monitor
  94                             //
  95                             ////////////////////////////////////////////////////////////////////////////////
  96                             
  97 kumpf            1.54       #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
  98 mike             1.2        Monitor::Monitor()
  99 kumpf            1.87          : _stopConnections(0),
 100 a.arora          1.73            _stopConnectionsSem(0),
 101 a.dunfey         1.76            _solicitSocketCount(0),
 102 a.dunfey         1.77            _tickle_client_socket(-1),
 103                                  _tickle_server_socket(-1),
 104                                  _tickle_peer_socket(-1)
 105 mike             1.2        {
 106 mreddy           1.103.10.9     {
 107                                     AutoMutex automut(Monitor::_cout_mut);
 108                                     PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 109                                 }
 110 kumpf            1.54           int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 111 mike             1.2            Socket::initializeInterface();
 112 kumpf            1.54           _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 113 a.arora          1.73       
 114                                 // setup the tickler
 115                                 initializeTickler();
 116                             
 117 r.kieninger      1.83           // Start the count at 1 because initilizeTickler()
 118 a.arora          1.73           // has added an entry in the first position of the
 119                                 // _entries array
 120                                 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 121 mday             1.37           {
 122                                    _MonitorEntry entry(0, 0, 0);
 123                                    _entries.append(entry);
 124                                 }
 125 mreddy           1.103.10.9     {
 126                                     AutoMutex automut(Monitor::_cout_mut);
 127                                     PEGASUS_STD(cout) << "Exiting:  Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 128                                 }
 129 mday             1.18       }
 130 mday             1.20       
 131 mike             1.2        Monitor::~Monitor()
 132                             {
 133 mreddy           1.103.10.9     {
 134                                     AutoMutex automut(Monitor::_cout_mut);
 135                                     PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 136                                 }
 137 kumpf            1.11           Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 138 a.dunfey         1.76       
 139                                 try{
 140 a.dunfey         1.77               if(_tickle_peer_socket >= 0)
 141 a.dunfey         1.76               {
 142                                         Socket::close(_tickle_peer_socket);
 143                                     }
 144 a.dunfey         1.77               if(_tickle_client_socket >= 0)
 145 a.dunfey         1.76               {
 146                                         Socket::close(_tickle_client_socket);
 147                                     }
 148 a.dunfey         1.77               if(_tickle_server_socket >= 0)
 149 a.dunfey         1.76               {
 150                                         Socket::close(_tickle_server_socket);
 151                                     }
 152                                 }
 153                                 catch(...)
 154                                 {
 155                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 156                                               "Failed to close tickle sockets");
 157                                 }
 158                             
 159 mike             1.2            Socket::uninitializeInterface();
 160 kumpf            1.11           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 161                                               "returning from monitor destructor");
 162 mreddy           1.103.10.9     {
 163                                     AutoMutex automut(Monitor::_cout_mut);
 164                                     PEGASUS_STD(cout) << "Exiting:  Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 165                                 }
 166 mday             1.18       }
 167                             
 168 a.arora          1.73       void Monitor::initializeTickler(){
 169 mreddy           1.103.10.9     {
 170                                     AutoMutex automut(Monitor::_cout_mut);
 171                                     PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 172                                 }
 173 r.kieninger      1.83           /*
 174                                    NOTE: On any errors trying to
 175                                          setup out tickle connection,
 176 a.arora          1.73                    throw an exception/end the server
 177                                 */
 178                             
 179                                 /* setup the tickle server/listener */
 180                             
 181                                 // get a socket for the server side
 182 david.dillard    1.95           if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
 183 a.arora          1.73       	//handle error
 184                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
 185                             				 "Received error number $0 while creating the internal socket.",
 186                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 187                             				 errno);
 188                             #else
 189                             				 WSAGetLastError());
 190                             #endif
 191                             	throw Exception(parms);
 192                                 }
 193                             
 194                                 // initialize the address
 195                                 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 196                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 197                             #pragma convert(37)
 198                             #endif
 199                                 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 200                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 201                             #pragma convert(0)
 202                             #endif
 203                                 _tickle_server_addr.sin_family = PF_INET;
 204 a.arora          1.73           _tickle_server_addr.sin_port = 0;
 205                             
 206 kumpf            1.86           PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
 207 a.arora          1.73       
 208                                 // bind server side to socket
 209                                 if((::bind(_tickle_server_socket,
 210 kumpf            1.88                      reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
 211 a.arora          1.73       	       sizeof(_tickle_server_addr))) < 0){
 212                             	// handle error
 213 r.kieninger      1.83       #ifdef PEGASUS_OS_ZOS
 214                                 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
 215                             				 "Received error:$0 while binding the internal socket.",strerror(errno));
 216                             #else
 217 a.arora          1.73       	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
 218                             				 "Received error number $0 while binding the internal socket.",
 219                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 220                             				 errno);
 221                             #else
 222                             				 WSAGetLastError());
 223                             #endif
 224 r.kieninger      1.83       #endif
 225 a.arora          1.73               throw Exception(parms);
 226                                 }
 227                             
 228                                 // tell the kernel we are a server
 229                                 if((::listen(_tickle_server_socket,3)) < 0){
 230                             	// handle error
 231                             	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
 232                             			 "Received error number $0 while listening to the internal socket.",
 233                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 234                             				 errno);
 235                             #else
 236                             				 WSAGetLastError());
 237                             #endif
 238                             	throw Exception(parms);
 239                                 }
 240 r.kieninger      1.83       
 241 a.arora          1.73           // make sure we have the correct socket for our server
 242                                 int sock = ::getsockname(_tickle_server_socket,
 243 kumpf            1.88                          reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
 244                                                &_addr_size);
 245 a.arora          1.73           if(sock < 0){
 246                             	// handle error
 247                             	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
 248                             			 "Received error number $0 while getting the internal socket name.",
 249                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 250                             				 errno);
 251                             #else
 252                             				 WSAGetLastError());
 253                             #endif
 254                             	throw Exception(parms);
 255                                 }
 256                             
 257                                 /* set up the tickle client/connector */
 258 r.kieninger      1.83       
 259 a.arora          1.73           // get a socket for our tickle client
 260 david.dillard    1.95           if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
 261 a.arora          1.73       	// handle error
 262                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
 263                             			 "Received error number $0 while creating the internal client socket.",
 264                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 265                             				 errno);
 266                             #else
 267                             				 WSAGetLastError());
 268                             #endif
 269                             	throw Exception(parms);
 270                                 }
 271                             
 272                                 // setup the address of the client
 273                                 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 274                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 275                             #pragma convert(37)
 276                             #endif
 277                                 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 278                             #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 279                             #pragma convert(0)
 280                             #endif
 281                                 _tickle_client_addr.sin_family = PF_INET;
 282 a.arora          1.73           _tickle_client_addr.sin_port = 0;
 283                             
 284                                 // bind socket to client side
 285                                 if((::bind(_tickle_client_socket,
 286 kumpf            1.88                      reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
 287 a.arora          1.73       	       sizeof(_tickle_client_addr))) < 0){
 288                             	// handle error
 289                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
 290                             			 "Received error number $0 while binding the internal client socket.",
 291                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 292                             				 errno);
 293                             #else
 294                             				 WSAGetLastError());
 295                             #endif
 296                             	throw Exception(parms);
 297                                 }
 298                             
 299                                 // connect to server side
 300                                 if((::connect(_tickle_client_socket,
 301 kumpf            1.88                         reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
 302 a.arora          1.73       		  sizeof(_tickle_server_addr))) < 0){
 303                             	// handle error
 304                             	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
 305                             			 "Received error number $0 while connecting the internal client socket.",
 306                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 307                             				 errno);
 308                             #else
 309                             				 WSAGetLastError());
 310                             #endif
 311                             	throw Exception(parms);
 312                                 }
 313                             
 314                                 /* set up the slave connection */
 315                                 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
 316 kumpf            1.86           PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
 317 r.kieninger      1.83           pegasus_sleep(1);
 318 a.arora          1.73       
 319                                 // this call may fail, we will try a max of 20 times to establish this peer connection
 320                                 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
 321 kumpf            1.88                   reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
 322                                         &peer_size)) < 0){
 323 a.arora          1.73       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 324                                     // Only retry on non-windows platforms.
 325                                     if(_tickle_peer_socket == -1 && errno == EAGAIN)
 326                                     {
 327 r.kieninger      1.83                 int retries = 0;
 328 a.arora          1.73                 do
 329                                       {
 330                                         pegasus_sleep(1);
 331                                         _tickle_peer_socket = ::accept(_tickle_server_socket,
 332 kumpf            1.88                       reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
 333                                             &peer_size);
 334 a.arora          1.73                   retries++;
 335                                       } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
 336                                     }
 337                             #endif
 338                                 }
 339                                 if(_tickle_peer_socket == -1){
 340                             	// handle error
 341                             	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
 342                             			 "Received error number $0 while accepting the internal socket connection.",
 343                             #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 344                             				 errno);
 345                             #else
 346                             				 WSAGetLastError());
 347                             #endif
 348                             	throw Exception(parms);
 349                                 }
 350                                 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
 351                                 // checks entries with IDLE state for events
 352                                 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
 353                                 entry._status = _MonitorEntry::IDLE;
 354                                 _entries.append(entry);
 355 mreddy           1.103.10.9     {
 356                                     AutoMutex automut(Monitor::_cout_mut);
 357                                     PEGASUS_STD(cout) << "Exiting:  Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 358                                 }
 359 a.arora          1.73       }
 360                             
 361                             void Monitor::tickle(void)
 362                             {
 363 mreddy           1.103.10.9     {
 364                                     AutoMutex automut(Monitor::_cout_mut);
 365                                     PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 366                                 }
 367 sushma.fernandes 1.78           static char _buffer[] =
 368 a.arora          1.73           {
 369                                   '0','0'
 370                                 };
 371 r.kieninger      1.83       
 372 sushma.fernandes 1.78           AutoMutex autoMutex(_tickle_mutex);
 373 r.kieninger      1.83           Socket::disableBlocking(_tickle_client_socket);
 374 sushma.fernandes 1.78           Socket::write(_tickle_client_socket,&_buffer, 2);
 375 r.kieninger      1.83           Socket::enableBlocking(_tickle_client_socket);
 376 mreddy           1.103.10.9     {
 377                                     AutoMutex automut(Monitor::_cout_mut);
 378                                     PEGASUS_STD(cout) << "Exiting:  Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 379                                 }
 380 sushma.fernandes 1.78       }
 381                             
 382                             void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
 383                             {
 384                                 // Set the state to requested state
 385                                 _entries[index]._status = status;
 386 a.arora          1.73       }
 387                             
 388 mike             1.2        Boolean Monitor::run(Uint32 milliseconds)
 389                             {
 390 mreddy           1.103.10.9     {
 391                                     AutoMutex automut(Monitor::_cout_mut);
 392                                     PEGASUS_STD(cout) << "Entering: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 393                                 }
 394 mday             1.18       
 395 mday             1.25           Boolean handled_events = false;
 396 a.arora          1.73           int i = 0;
 397 r.kieninger      1.83       
 398 kumpf            1.36           struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
 399 a.arora          1.73       
 400 mday             1.25           fd_set fdread;
 401                                 FD_ZERO(&fdread);
 402 a.arora          1.73       
 403 kumpf            1.94           AutoMutex autoEntryMutex(_entry_mut);
 404 r.kieninger      1.83       
 405 mike             1.100          ArrayIterator<_MonitorEntry> entries(_entries);
 406                             
 407 r.kieninger      1.83           // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
 408 mike             1.96           if (_stopConnections.get() == 1)
 409 kumpf            1.48           {
 410 mike             1.100              for ( int indx = 0; indx < (int)entries.size(); indx++)
 411 kumpf            1.48               {
 412 mike             1.100                  if (entries[indx]._type == Monitor::ACCEPTOR)
 413 kumpf            1.48                   {
 414 mike             1.100                      if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
 415 kumpf            1.48                       {
 416 mike             1.100                         if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
 417                                                     entries[indx]._status.get() == _MonitorEntry::DYING )
 418 kumpf            1.48                          {
 419                                                    // remove the entry
 420 mike             1.100      		       entries[indx]._status = _MonitorEntry::EMPTY;
 421 kumpf            1.48                          }
 422                                                else
 423                                                {
 424                                                    // set status to DYING
 425 mike             1.100                            entries[indx]._status = _MonitorEntry::DYING;
 426 kumpf            1.48                          }
 427                                            }
 428                                        }
 429                                     }
 430                                     _stopConnections = 0;
 431 a.arora          1.73       	_stopConnectionsSem.signal();
 432 kumpf            1.48           }
 433 kumpf            1.51       
 434 mike             1.100          for( int indx = 0; indx < (int)entries.size(); indx++)
 435 kumpf            1.68           {
 436 mike             1.100      			 const _MonitorEntry &entry = entries[indx];
 437 mike             1.96              if ((entry._status.get() == _MonitorEntry::DYING) &&
 438 brian.campbell   1.80       					 (entry._type == Monitor::CONNECTION))
 439 kumpf            1.68              {
 440 brian.campbell   1.80                 MessageQueue *q = MessageQueue::lookup(entry.queueId);
 441 kumpf            1.68                 PEGASUS_ASSERT(q != 0);
 442 brian.campbell   1.80                 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
 443 r.kieninger      1.83       
 444 brian.campbell   1.80       					if (h._connectionClosePending == false)
 445                             						continue;
 446                             
 447                             					// NOTE: do not attempt to delete while there are pending responses
 448 r.kieninger      1.83       					// coming thru. The last response to come thru after a
 449 brian.campbell   1.80       					// _connectionClosePending will reset _responsePending to false
 450                             					// and then cause the monitor to rerun this code and clean up.
 451                             					// (see HTTPConnection.cpp)
 452                             
 453                             					if (h._responsePending == true)
 454                             					{
 455                             						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
 456                             													"Ignoring connection delete request because "
 457                             													"responses are still pending. "
 458 r.kieninger      1.83       													"connection=0x%p, socket=%d\n",
 459 brian.campbell   1.81       													(void *)&h, h.getSocket());
 460 brian.campbell   1.80       						continue;
 461                             					}
 462                             					h._connectionClosePending = false;
 463                                       MessageQueue &o = h.get_owner();
 464                                       Message* message= new CloseConnectionMessage(entry.socket);
 465 kumpf            1.68                 message->dest = o.getQueueId();
 466                             
 467 r.kieninger      1.83                 // HTTPAcceptor is responsible for closing the connection.
 468 kumpf            1.68                 // The lock is released to allow HTTPAcceptor to call
 469 r.kieninger      1.83                 // unsolicitSocketMessages to free the entry.
 470 kumpf            1.68                 // Once HTTPAcceptor completes processing of the close
 471                                       // connection, the lock is re-requested and processing of
 472                                       // the for loop continues.  This is safe with the current
 473 mike             1.100                // implementation of the entries object.  Note that the
 474                                       // loop condition accesses the entries.size() on each
 475 kumpf            1.68                 // iteration, so that a change in size while the mutex is
 476                                       // unlocked will not result in an ArrayIndexOutOfBounds
 477                                       // exception.
 478                             
 479 kumpf            1.94                 autoEntryMutex.unlock();
 480 kumpf            1.68                 o.enqueue(message);
 481 kumpf            1.94                 autoEntryMutex.lock();
 482 r.kieninger      1.102                // After enqueue a message and the autoEntryMutex has been released and locked again,
 483                                       // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
 484                                       entries.reset(_entries);
 485 kumpf            1.68              }
 486                                 }
 487                             
 488 kumpf            1.51           Uint32 _idleEntries = 0;
 489 r.kieninger      1.83       
 490 a.arora          1.73           /*
 491 david.dillard    1.95               We will keep track of the maximum socket number and pass this value
 492                                     to the kernel as a parameter to SELECT.  This loop seems like a good
 493                                     place to calculate the max file descriptor (maximum socket number)
 494                                     because we have to traverse the entire array.
 495 r.kieninger      1.83           */
 496 w.white          1.103.10.1     //Array<HANDLE> pipeEventArray;
 497 j.alex           1.103.10.2         PEGASUS_SOCKET maxSocketCurrentPass = 0;
 498 w.white          1.103.10.1     int indx;
 499                             
 500 j.alex           1.103.10.2 
 501                             #ifdef PEGASUS_OS_TYPE_WINDOWS
 502                             
 503 w.white          1.103.10.1     //This array associates named pipe connections to their place in [indx]
 504                                 //in the entries array. The value in poition zero of the array is the
 505                                 //index of the fist named pipe connection in the entries array
 506                                 Array <Uint32> indexPipeCountAssociator;
 507 j.alex           1.103.10.2     int pipeEntryCount=0; 
 508                                 int MaxPipes = PIPE_INCREMENT;
 509                                 HANDLE* hEvents = new HANDLE[PIPE_INCREMENT]; 
 510                             
 511                             #endif
 512 w.white          1.103.10.1 
 513                                 for( indx = 0; indx < (int)entries.size(); indx++)
 514 mike             1.2            {
 515 a.arora          1.73       
 516 j.alex           1.103.10.2 
 517                             #ifdef PEGASUS_OS_TYPE_WINDOWS
 518                                    if(entries[indx].isNamedPipeConnection())
 519                                    {
 520 w.white          1.103.10.5 
 521 j.alex           1.103.10.2            //entering this clause mean that a Named Pipe connection is at entries[indx]
 522 w.white          1.103.10.1            //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;
 523 j.alex           1.103.10.2 
 524 w.white          1.103.10.6            //cout << "In Monitor::run - pipe being added to array is " << entries[indx].namedPipe.getName() << endl;
 525 w.white          1.103.10.5           
 526 j.alex           1.103.10.2             entries[indx].pipeSet = false;
 527 w.white          1.103.10.5 
 528 j.alex           1.103.10.2            // We can Keep a counter in the Monitor class for the number of named pipes ...
 529                                        //  Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )
 530                                         if (pipeEntryCount >= MaxPipes)
 531                                         {
 532 w.white          1.103.10.6                // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' begining - pipeEntryCount=" <<
 533                                                // pipeEntryCount << " MaxPipes=" << MaxPipes << endl;
 534 j.alex           1.103.10.2                  MaxPipes += PIPE_INCREMENT;
 535 w.white          1.103.10.5                  HANDLE* temp_hEvents = new HANDLE[MaxPipes];
 536                             
 537 j.alex           1.103.10.2                  for (Uint32 i =0;i<pipeEntryCount;i++)
 538                                              {
 539                                                  temp_hEvents[i] = hEvents[i];
 540                                              }
 541 w.white          1.103.10.5 
 542 j.alex           1.103.10.2                  delete [] hEvents;
 543 w.white          1.103.10.5 
 544 j.alex           1.103.10.2                  hEvents = temp_hEvents;
 545 w.white          1.103.10.6                 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' ending"<< endl;
 546 w.white          1.103.10.5 
 547 j.alex           1.103.10.2             }         
 548                             
 549 w.white          1.103.10.1            //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);
 550 j.alex           1.103.10.2            hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap().hEvent;
 551 w.white          1.103.10.4            
 552 w.white          1.103.10.1            indexPipeCountAssociator.append(indx);
 553 w.white          1.103.10.4     
 554                                 pipeEntryCount++;
 555                             
 556                             
 557                             
 558 w.white          1.103.10.7           // cout << "Monitor::run pipeEntrycount is " << pipeEntryCount <<
 559                                       // " this is the type " << entries[indx]._type << " this is index " << indx << endl; 
 560 w.white          1.103.10.1 
 561 mday             1.25              }
 562 j.alex           1.103.10.2        else
 563                                   
 564                             #endif
 565                                    {
 566                                    
 567                                        if(maxSocketCurrentPass < entries[indx].socket)
 568                                         maxSocketCurrentPass = entries[indx].socket;
 569                             
 570                                        if(entries[indx]._status.get() == _MonitorEntry::IDLE)
 571                                        {
 572                                            _idleEntries++;
 573                                            FD_SET(entries[indx].socket, &fdread);
 574                                        }
 575                             
 576                                    }
 577                               }
 578 s.hills          1.62       
 579 a.arora          1.73           /*
 580 david.dillard    1.95               Add 1 then assign maxSocket accordingly. We add 1 to account for
 581                                     descriptors starting at 0.
 582 a.arora          1.73           */
 583                                 maxSocketCurrentPass++;
 584                             
 585 kumpf            1.94           autoEntryMutex.unlock();
 586 david.dillard    1.95       
 587                                 //
 588                                 // The first argument to select() is ignored on Windows and it is not
 589                                 // a socket value.  The original code assumed that the number of sockets
 590                                 // and a socket value have the same type.  On Windows they do not.
 591                                 //
 592                             #ifdef PEGASUS_OS_TYPE_WINDOWS
 593 w.white          1.103.10.1     //int events = select(0, &fdread, NULL, NULL, &tv);
 594 j.alex           1.103.10.2      int events = 0;
 595                                     DWORD dwWait=NULL;
 596                                 int pEvents = 0;
 597 w.white          1.103.10.1 
 598 w.white          1.103.10.7  //       cout << "events after select" << events << endl;
 599 w.white          1.103.10.1     cout << "Calling WaitForMultipleObjects\n";
 600                             
 601                                 //this should be in a try block
 602                             
 603                                 dwWait = WaitForMultipleObjects(MaxPipes,    
 604                                                                hEvents,      //ABB:- array of event objects
 605                                                                FALSE,        // ABB:-does not wait for all
 606                                                                20000);        //ABB:- timeout value
 607                             
 608                                 if(dwWait == WAIT_TIMEOUT)
 609                                     {
 610                                     cout << "Wait WAIT_TIMEOUT\n";
 611 j.alex           1.103.10.2 
 612                                                // Sleep(2000);
 613 w.white          1.103.10.1             //continue; 
 614 j.alex           1.103.10.2              
 615                                          //return false;  // I think we do nothing.... Mybe there is a socket connection... so 
 616                                          // cant return.
 617 w.white          1.103.10.1         }
 618                                     else if (dwWait == WAIT_FAILED)
 619                                     {
 620                                         cout << "Wait Failed returned\n";
 621                                         cout << "failed with " << GetLastError() << "." << endl;
 622 j.alex           1.103.10.2             pEvents = -1;
 623 w.white          1.103.10.3             return false;
 624 w.white          1.103.10.1         }
 625                                     else
 626                                     {
 627                                         int pCount = dwWait - WAIT_OBJECT_0;  // determines which pipe
 628 w.white          1.103.10.7            // cout << " WaitForMultiPleObject returned activity on server pipe: "<< 
 629                                        //     pCount<< endl;
 630 w.white          1.103.10.1 
 631 j.alex           1.103.10.2             pEvents = 1;
 632 w.white          1.103.10.1 
 633                                         //this statment gets the pipe entry that was trigered
 634                                         entries[indexPipeCountAssociator[pCount]].pipeSet = true;
 635                                         
 636                                         if (pCount > 0) //this means activity on pipe is CIMOperation reques
 637                                         {
 638 w.white          1.103.10.7        //         cout << "In Monitor::run got Operation request" << endl;
 639 w.white          1.103.10.1                 //entries[indx]._type = Monitor::CONNECTION;
 640                                         }
 641                                         else //this clause my not be needed in production but is used for testing
 642                                         {
 643 w.white          1.103.10.7          //     cout << "In Monitor::run got Connection request" << endl;
 644 w.white          1.103.10.1               
 645                                         }
 646                             
 647                                     }
 648                                             //
 649                             
 650 j.alex           1.103.10.2     
 651 w.white          1.103.10.1    // Sleep(2000);
 652                             
 653                                 //int events = 1;
 654                                 /*if (dwWait)
 655                                 {
 656                                     cout << "in Monitor::run about to call handlePipeConnectionEvent" << endl;
 657                                     _handlePipeConnectionEvent(dwWait);
 658                                 }*/
 659 david.dillard    1.95       #else
 660 a.arora          1.73           int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 661 david.dillard    1.95       #endif
 662 kumpf            1.94           autoEntryMutex.lock();
 663 r.kieninger      1.102          // After enqueue a message and the autoEntryMutex has been released and locked again,
 664                                 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
 665                                 entries.reset(_entries);
 666 w.white          1.103.10.1 
 667 mike             1.2        #ifdef PEGASUS_OS_TYPE_WINDOWS
 668 j.alex           1.103.10.2     if(pEvents == -1)
 669                                 {
 670                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 671                                       "Monitor::run - errorno = %d has occurred on select.",GetLastError() );
 672                                    // The EBADF error indicates that one or more or the file
 673                                    // descriptions was not valid. This could indicate that
 674                                    // the entries structure has been corrupted or that
 675                                    // we have a synchronization error.
 676                             
 677                                     // We need to generate an assert  here...
 678                                    PEGASUS_ASSERT(GetLastError()!= EBADF);
 679                             
 680                             
 681                                 }
 682                                
 683 kumpf            1.50           if(events == SOCKET_ERROR)
 684 mike             1.2        #else
 685 kumpf            1.50           if(events == -1)
 686 mike             1.2        #endif
 687 mday             1.13           {
 688 j.alex           1.103.10.2 
 689                                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 690 kumpf            1.50                 "Monitor::run - errorno = %d has occurred on select.", errno);
 691                                    // The EBADF error indicates that one or more or the file
 692                                    // descriptions was not valid. This could indicate that
 693 mike             1.100             // the entries structure has been corrupted or that
 694 kumpf            1.50              // we have a synchronization error.
 695                             
 696                                    PEGASUS_ASSERT(errno != EBADF);
 697                                 }
 698 j.alex           1.103.10.2     else if ((events)||(pEvents))
 699 kumpf            1.50           {
 700 w.white          1.103.10.1 
 701 w.white          1.103.10.7      //  cout << "IN Monior::run 'else if (events)' clause - array size is " <<
 702                                  //       (int)entries.size() << endl;
 703 kumpf            1.51              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 704 r.kieninger      1.83                 "Monitor::run select event received events = %d, monitoring %d idle entries",
 705 david.dillard    1.95                  events, _idleEntries);
 706 mike             1.100             for( int indx = 0; indx < (int)entries.size(); indx++)
 707 mday             1.25              {
 708 w.white          1.103.10.5            //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl; 
 709 kumpf            1.53                 // The Monitor should only look at entries in the table that are IDLE (i.e.,
 710                                       // owned by the Monitor).
 711 w.white          1.103.10.1           if(((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
 712 j.alex           1.103.10.2              FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||
 713                                          (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents)))
 714 david.dillard    1.95                 {
 715 w.white          1.103.10.4               MessageQueue *q;
 716                                           cout << "IN Monior::run inside - for int indx = " <<indx <<
 717                                               "and queue ID is " << entries[indx].queueId << endl;
 718                                           try{
 719                                         
 720                                              q = MessageQueue::lookup(entries[indx].queueId);
 721                                           }
 722                                          catch (Exception e)
 723                                          {
 724                                              cout << " this is what lookup gives - " << e.getMessage() << endl;
 725                                              exit(1);
 726                                          }
 727                                          catch(...)
 728                                          {
 729                                              cout << "MessageQueue::lookup gives strange exception " << endl;
 730                                              exit(1);
 731                                          }
 732                             
 733                             
 734                             
 735                                          cout << "Monitor::run after MessageQueue::lookup(entries[indx].queueId)" << endl;
 736 w.white          1.103.10.4               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 737 kumpf            1.53                         "Monitor::run indx = %d, queueId =  %d, q = %p",
 738 mike             1.100                        indx, entries[indx].queueId, q);
 739 w.white          1.103.10.4              cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;
 740 kumpf            1.53                    PEGASUS_ASSERT(q !=0);
 741 mday             1.37       
 742 david.dillard    1.95                    try
 743                                          {
 744 w.white          1.103.10.1                   cout <<" this is the type " << entries[indx]._type <<
 745                                                   "for index " << indx << endl;
 746                                            cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;
 747 mike             1.100                      if(entries[indx]._type == Monitor::CONNECTION)
 748 david.dillard    1.95                       {
 749 w.white          1.103.10.4                     cout << "In Monitor::run Monitor::CONNECTION clause" << endl; 
 750                             
 751                                                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 752 mike             1.100                           "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
 753 david.dillard    1.95                          static_cast<HTTPConnection *>(q)->_entry_index = indx;
 754 sushma.fernandes 1.78       
 755                                                // Do not update the entry just yet. The entry gets updated once
 756 r.kieninger      1.83                          // the request has been read.
 757 mike             1.100                         //entries[indx]._status = _MonitorEntry::BUSY;
 758 sushma.fernandes 1.78       
 759 kumpf            1.66                          // If allocate_and_awaken failure, retry on next iteration
 760 a.arora          1.73       /* Removed for PEP 183.
 761 kumpf            1.69                          if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
 762                                                        (void *)q, _dispatch))
 763 kumpf            1.67                          {
 764                                                   Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 765                                                       "Monitor::run: Insufficient resources to process request.");
 766 mike             1.100                            entries[indx]._status = _MonitorEntry::IDLE;
 767 kumpf            1.67                             return true;
 768                                                }
 769 a.arora          1.73       */
 770                             // Added for PEP 183
 771 david.dillard    1.95                          HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
 772                                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 773 a.arora          1.73                                "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 774                                                dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 775 w.white          1.103.10.6 
 776                                                /*In the case of named Pipes, the request has already been read from the pipe
 777                                                therefor this section passed the request data to the HTTPConnection
 778                                                NOTE: not sure if this would be better suited in a sparate private method
 779                                                */
 780 w.white          1.103.10.7                    dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needd
 781 w.white          1.103.10.8                    cout << "In Monitor::run after dst->setNamedPipe string read is " <<
 782                                                    entries[indx].namedPipe.raw << endl;
 783 w.white          1.103.10.6 
 784 a.arora          1.73                          try
 785                                                {
 786 w.white          1.103.10.3                        cout << "In Monitor::run about to call 'dst->run(1)' "  << endl;
 787 a.arora          1.73                              dst->run(1);
 788                                                }
 789 david.dillard    1.95                          catch (...)
 790                                                {
 791                                                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 792                                                    "Monitor::_dispatch: exception received");
 793                                                }
 794                                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 795 a.arora          1.73                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 796                             
 797 sushma.fernandes 1.78                          // It is possible the entry status may not be set to busy.
 798 r.kieninger      1.83                          // The following will fail in that case.
 799 mike             1.96          		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
 800 a.arora          1.73       		   // Once the HTTPConnection thread has set the status value to either
 801                             		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 802                             		   // to the Monitor.  It is no longer permissible to access the connection
 803                             		   // or the entry in the _entries table.
 804 sushma.fernandes 1.78       
 805                                                // The following is not relevant as the worker thread or the
 806                                                // reader thread will update the status of the entry.
 807                             		   //if (dst->_connectionClosePending)
 808 r.kieninger      1.83       		   //{
 809 sushma.fernandes 1.78       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 810                             		   //}
 811                             		   //else
 812                             		   //{
 813                             		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 814 r.kieninger      1.83       		   //}
 815                             // end Added for PEP 183
 816 a.arora          1.73       		}
 817 mike             1.100      	        else if( entries[indx]._type == Monitor::INTERNAL){
 818 r.kieninger      1.83       			// set ourself to BUSY,
 819                                                     // read the data
 820 a.arora          1.73                               // and set ourself back to IDLE
 821 w.white          1.103.10.1             cout << " in - entries[indx]._type == Monitor::INTERNAL- " << endl; 
 822 r.kieninger      1.83       
 823 mike             1.100      		   	entries[indx]._status = _MonitorEntry::BUSY;
 824 a.arora          1.73       			static char buffer[2];
 825 mike             1.100            			Socket::disableBlocking(entries[indx].socket);
 826                                   			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
 827                                   			Socket::enableBlocking(entries[indx].socket);
 828                             			entries[indx]._status = _MonitorEntry::IDLE;
 829 mday             1.37       		}
 830                             		else
 831 mday             1.25       		{
 832 w.white          1.103.10.1             cout << "In Monitor::run else clause of CONNECTION if statments" << endl;
 833 kumpf            1.51                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 834                                                  "Non-connection entry, indx = %d, has been received.", indx);
 835 mday             1.37       		   int events = 0;
 836 w.white          1.103.10.1            Message *msg;
 837 j.alex           1.103.10.2            cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;
 838 w.white          1.103.10.1 
 839                                        if (entries[indx].isNamedPipeConnection())
 840                                        {
 841 w.white          1.103.10.4                if(!entries[indx].namedPipe.isConnectionPipe)
 842                                            { /*if we enter this clasue it means that the named pipe that we are 
 843                                                looking at has recived a connection but is not the pipe we get connection requests over. 
 844                                                therefore we neew to change the _type to CONNECTION and wait for a CIM Operations request*/
 845                                                entries[indx]._type = Monitor::CONNECTION;
 846                             
 847                             
 848 w.white          1.103.10.8      /* This is a test  - this shows that the read file needs to be done
 849 w.white          1.103.10.4      before we call wiatForMultipleObjects*/
 850                                 /******************************************************
 851                                 ********************************************************/
 852                                
 853 w.white          1.103.10.7                 //DWORD size = 0;
 854 w.white          1.103.10.4 
 855                                     BOOL rc = ::ReadFile(
 856                                             entries[indx].namedPipe.getPipe(),
 857 w.white          1.103.10.6                 &entries[indx].namedPipe.raw,
 858                                             MAX_BUFFER_SIZE,
 859 w.white          1.103.10.7                 &entries[indx].namedPipe.bytesRead,
 860 w.white          1.103.10.4                 &entries[indx].namedPipe.getOverlap());
 861 w.white          1.103.10.6 
 862                                     cout << "just called read on index " << indx << endl;
 863                             
 864 w.white          1.103.10.7          //&entries[indx].namedPipe.bytesRead = &size; 
 865 w.white          1.103.10.4         if(!rc)
 866                                     {
 867                             
 868                                        cout << "ReadFile failed for : "  << GetLastError() << "."<< endl;
 869                             
 870                                     }
 871                             
 872                               
 873                             
 874                                 /******************************************************
 875                                 ********************************************************/
 876                             
 877                             
 878                             
 879                             
 880                                                continue;
 881                             
 882                                            }
 883 w.white          1.103.10.1                cout << " In Monitor::run about to create a Pipe message" << endl;
 884                                            events |= NamedPipeMessage::READ;
 885                                            msg = new NamedPipeMessage(entries[indx].namedPipe, events);
 886                                        }
 887                                        else
 888                                        {
 889 j.alex           1.103.10.2                cout << " In Monitor::run ..its a socket message" << endl;
 890 w.white          1.103.10.1                events |= SocketMessage::READ;
 891                             		       msg = new SocketMessage(entries[indx].socket, events);
 892                                        }
 893                             
 894 mike             1.100      		   entries[indx]._status = _MonitorEntry::BUSY;
 895 kumpf            1.94                          autoEntryMutex.unlock();
 896 mday             1.37       		   q->enqueue(msg);
 897 kumpf            1.94                          autoEntryMutex.lock();
 898 r.kieninger      1.102                 // After enqueue a message and the autoEntryMutex has been released and locked again,
 899                                        // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
 900                                        entries.reset(_entries);
 901 mike             1.100      		   entries[indx]._status = _MonitorEntry::IDLE;
 902 kumpf            1.94       
 903 mreddy           1.103.10.9                    {
 904                                                    AutoMutex automut(Monitor::_cout_mut);
 905                                                    PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 906                                                }
 907 mday             1.25       		   return true;
 908                             		}
 909                             	     }
 910 mday             1.37       	     catch(...)
 911 mday             1.25       	     {
 912                             	     }
 913                             	     handled_events = true;
 914                             	  }
 915                                    }
 916 mday             1.24           }
 917 kumpf            1.94       
 918 mreddy           1.103.10.9     {
 919                                     AutoMutex automut(Monitor::_cout_mut);
 920                                     PEGASUS_STD(cout) << "Exiting:  Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 921                                 }
 922 mday             1.13           return(handled_events);
 923 mike             1.2        }
 924                             
 925 chuck            1.74       void Monitor::stopListeningForConnections(Boolean wait)
 926 kumpf            1.48       {
 927 mreddy           1.103.10.9     {
 928                                     AutoMutex automut(Monitor::_cout_mut);
 929                                     PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 930                                 }
 931 kumpf            1.48           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
 932 r.kieninger      1.83           // set boolean then tickle the server to recognize _stopConnections
 933 kumpf            1.48           _stopConnections = 1;
 934 a.arora          1.73           tickle();
 935 kumpf            1.48       
 936 chuck            1.74           if (wait)
 937 a.arora          1.73           {
 938 chuck            1.74             // Wait for the monitor to notice _stopConnections.  Otherwise the
 939                                   // caller of this function may unbind the ports while the monitor
 940                                   // is still accepting connections on them.
 941 kumpf            1.101            _stopConnectionsSem.wait();
 942 a.arora          1.73           }
 943 r.kieninger      1.83       
 944 kumpf            1.48           PEG_METHOD_EXIT();
 945 mreddy           1.103.10.9     {
 946                                     AutoMutex automut(Monitor::_cout_mut);
 947                                     PEGASUS_STD(cout) << "Exiting:  Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 948                                 }
 949 kumpf            1.48       }
 950 mday             1.25       
 951 mday             1.37       
 952 mday             1.25       int  Monitor::solicitSocketMessages(
 953 david.dillard    1.95           PEGASUS_SOCKET socket,
 954 mike             1.2            Uint32 events,
 955 r.kieninger      1.83           Uint32 queueId,
 956 mday             1.8            int type)
 957 mike             1.2        {
 958 mreddy           1.103.10.9     {
 959                                     AutoMutex automut(Monitor::_cout_mut);
 960                                     PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 961                                 }
 962 r.kieninger      1.83          PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
 963 alagaraja        1.75          AutoMutex autoMut(_entry_mut);
 964 a.arora          1.73          // Check to see if we need to dynamically grow the _entries array
 965                                // We always want the _entries array to 2 bigger than the
 966                                // current connections requested
 967                                _solicitSocketCount++;  // bump the count
 968                                int size = (int)_entries.size();
 969 w.otsuka         1.89          if((int)_solicitSocketCount >= (size-1)){
 970                                     for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
 971 a.arora          1.73                       _MonitorEntry entry(0, 0, 0);
 972                                             _entries.append(entry);
 973                                     }
 974                                }
 975 kumpf            1.4        
 976 a.arora          1.73          int index;
 977                                for(index = 1; index < (int)_entries.size(); index++)
 978 mday             1.25          {
 979 a.arora          1.73             try
 980 mday             1.37             {
 981 mike             1.96                if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
 982 a.arora          1.73                {
 983                                         _entries[index].socket = socket;
 984                                         _entries[index].queueId  = queueId;
 985                                         _entries[index]._type = type;
 986                                         _entries[index]._status = _MonitorEntry::IDLE;
 987 r.kieninger      1.83       
 988 mreddy           1.103.10.9             {
 989                                             AutoMutex automut(Monitor::_cout_mut);
 990                                             PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
 991                                         }
 992 a.arora          1.73                   return index;
 993                                      }
 994 mday             1.37             }
 995                                   catch(...)
 996 mday             1.25             {
 997                                   }
 998                                }
 999 a.arora          1.73          _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
1000 mday             1.25          PEG_METHOD_EXIT();
1001 mreddy           1.103.10.9    {
1002                                    AutoMutex automut(Monitor::_cout_mut);
1003                                    PEGASUS_STD(cout) << "Exiting:  Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1004                                }
1005 kumpf            1.50          return -1;
1006 a.arora          1.73       
1007 mike             1.2        }
1008                             
1009 david.dillard    1.95       void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
1010 mike             1.2        {
1011 mreddy           1.103.10.9     {
1012                                     AutoMutex automut(Monitor::_cout_mut);
1013                                     PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1014                                 }
1015 kumpf            1.50       
1016 mday             1.25           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
1017 alagaraja        1.75           AutoMutex autoMut(_entry_mut);
1018 a.arora          1.73       
1019                                 /*
1020                                     Start at index = 1 because _entries[0] is the tickle entry which never needs
1021                                     to be EMPTY;
1022                                 */
1023 w.otsuka         1.89           unsigned int index;
1024 a.arora          1.73           for(index = 1; index < _entries.size(); index++)
1025 mike             1.2            {
1026 mday             1.25              if(_entries[index].socket == socket)
1027                                    {
1028 a.arora          1.73                 _entries[index]._status = _MonitorEntry::EMPTY;
1029 david.dillard    1.95                 _entries[index].socket = PEGASUS_INVALID_SOCKET;
1030 a.arora          1.73                 _solicitSocketCount--;
1031                                       break;
1032 mday             1.25              }
1033 mike             1.2            }
1034 a.arora          1.73       
1035                                 /*
1036                             	Dynamic Contraction:
1037                             	To remove excess entries we will start from the end of the _entries array
1038                             	and remove all entries with EMPTY status until we find the first NON EMPTY.
1039                             	This prevents the positions, of the NON EMPTY entries, from being changed.
1040 r.kieninger      1.83           */
1041 a.arora          1.73           index = _entries.size() - 1;
1042 mike             1.96           while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
1043 a.arora          1.73       	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
1044                                             _entries.remove(index);
1045                             	index--;
1046                                 }
1047 kumpf            1.4            PEG_METHOD_EXIT();
1048 mreddy           1.103.10.9     {
1049                                     AutoMutex automut(Monitor::_cout_mut);
1050                                     PEGASUS_STD(cout) << "Exiting:  Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1051                                 }
1052 mike             1.2        }
1053                             
1054 a.arora          1.73       // Note: this is no longer called with PEP 183.
1055 mday             1.7        PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
1056                             {
1057 mreddy           1.103.10.9     {
1058                                     AutoMutex automut(Monitor::_cout_mut);
1059                                     PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1060                                 }
1061 mday             1.8           HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
1062 kumpf            1.51          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1063 kumpf            1.53               "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
1064                                     dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
1065 kumpf            1.51          try
1066                                {
1067                                   dst->run(1);
1068                                }
1069                                catch (...)
1070                                {
1071                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1072                                       "Monitor::_dispatch: exception received");
1073                                }
1074                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1075                                       "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
1076 r.kieninger      1.83       
1077 mike             1.96          PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
1078 kumpf            1.68       
1079                                // Once the HTTPConnection thread has set the status value to either
1080                                // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
1081                                // to the Monitor.  It is no longer permissible to access the connection
1082                                // or the entry in the _entries table.
1083 kumpf            1.50          if (dst->_connectionClosePending)
1084                                {
1085 kumpf            1.68             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
1086                                }
1087                                else
1088                                {
1089                                   dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
1090 kumpf            1.50          }
1091 mday             1.8           return 0;
1092 mday             1.40       }
1093                             
1094 w.white          1.103.10.1 
1095                             //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes
1096                             int  Monitor::solicitPipeMessages(
1097                                 NamedPipe namedPipe,
1098                                 Uint32 events,  //not sure what has to change for this enum
1099                                 Uint32 queueId,
1100                                 int type)
1101                             {
1102                                PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
1103                                AutoMutex autoMut(_entry_mut);
1104                                // Check to see if we need to dynamically grow the _entries array
1105                                // We always want the _entries array to 2 bigger than the
1106                                // current connections requested
1107                                PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
1108                             
1109                             
1110                                
1111                                _solicitSocketCount++;  // bump the count
1112                                int size = (int)_entries.size();
1113                                if((int)_solicitSocketCount >= (size-1)){
1114                                     for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
1115 w.white          1.103.10.1                 _MonitorEntry entry(0, 0, 0);
1116                                             _entries.append(entry);
1117                                     }
1118                                }
1119                             
1120                                int index;
1121                                for(index = 1; index < (int)_entries.size(); index++)
1122                                {
1123                                   try
1124                                   {
1125                                      if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
1126                                      {
1127                                         _entries[index].socket = NULL;
1128                                         _entries[index].namedPipe = namedPipe;
1129                                         _entries[index].namedPipeConnection = true;
1130                                         _entries[index].queueId  = queueId;
1131                                         _entries[index]._type = type;
1132                                         _entries[index]._status = _MonitorEntry::IDLE;
1133                             
1134                                         PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up  _entries[index] index = " << index << PEGASUS_STD(endl);
1135                             
1136 w.white          1.103.10.1             return index;
1137                                      }
1138                                   }
1139                                   catch(...)
1140                                   {
1141                                   }
1142                             
1143                                }
1144                                _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
1145                                PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
1146                             
1147                                PEG_METHOD_EXIT();
1148                                return -1;
1149                             
1150                             }
1151                             
1152                             
1153 mike             1.2        PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2