(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.79 //%2004////////////////////////////////////////////////////////////////////////
   2 mike  1.2  //
   3 karl  1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.64 // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 mike  1.2  //
  10            // Permission is hereby granted, free of charge, to any person obtaining a copy
  11 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  12            // deal in the Software without restriction, including without limitation the
  13            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  14 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
  15            // furnished to do so, subject to the following conditions:
  16            // 
  17 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  18 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  19            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  20 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  21            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  22            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  23 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  24            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  25            //
  26            //==============================================================================
  27            //
  28            // Author: Mike Brasher (mbrasher@bmc.com)
  29            //
  30 mday  1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com 
  31 a.arora 1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
  32 alagaraja 1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
  33 sushma.fernandes 1.78 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
  34 mike             1.2  //
  35                       //%/////////////////////////////////////////////////////////////////////////////
  36                       
  37                       #include <Pegasus/Common/Config.h>
  38 mday             1.40 
  39 mike             1.2  #include <cstring>
  40                       #include "Monitor.h"
  41                       #include "MessageQueue.h"
  42                       #include "Socket.h"
  43 kumpf            1.4  #include <Pegasus/Common/Tracer.h>
  44 mday             1.7  #include <Pegasus/Common/HTTPConnection.h>
  45 kumpf            1.69 #include <Pegasus/Common/MessageQueueService.h>
  46 a.arora          1.73 #include <Pegasus/Common/Exception.h>
  47 mike             1.2  
  48                       #ifdef PEGASUS_OS_TYPE_WINDOWS
  49                       # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
  50                       #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
  51                       of <winsock.h>. It may have been indirectly included (e.g., by including \
  52 mday             1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
  53 mike             1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
  54                       otherwise, less than 64 clients (the default) will be able to connect to the \
  55                       CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
  56 mday             1.5  
  57 mike             1.2  # endif
  58                       # define FD_SETSIZE 1024
  59 mday             1.5  # include <windows.h>
  60 mike             1.2  #else
  61                       # include <sys/types.h>
  62                       # include <sys/socket.h>
  63                       # include <sys/time.h>
  64                       # include <netinet/in.h>
  65                       # include <netdb.h>
  66                       # include <arpa/inet.h>
  67                       #endif
  68                       
  69                       PEGASUS_USING_STD;
  70                       
  71                       PEGASUS_NAMESPACE_BEGIN
  72                       
  73 mday             1.18 
  74 mday             1.25 static AtomicInt _connections = 0;
  75                       
  76                       static struct timeval create_time = {0, 1};
  77 mday             1.38 static struct timeval destroy_time = {300, 0};
  78 mday             1.26 static struct timeval deadlock_time = {0, 0};
  79 mday             1.18 
  80 mike             1.2  ////////////////////////////////////////////////////////////////////////////////
  81                       //
  82                       // MonitorRep
  83                       //
  84                       ////////////////////////////////////////////////////////////////////////////////
  85                       
  86                       struct MonitorRep
  87                       {
  88                           fd_set rd_fd_set;
  89                           fd_set wr_fd_set;
  90                           fd_set ex_fd_set;
  91                           fd_set active_rd_fd_set;
  92                           fd_set active_wr_fd_set;
  93                           fd_set active_ex_fd_set;
  94                       };
  95                       
  96                       ////////////////////////////////////////////////////////////////////////////////
  97                       //
  98                       // Monitor
  99                       //
 100                       ////////////////////////////////////////////////////////////////////////////////
 101 mike             1.2  
 102 kumpf            1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 103 mike             1.2  Monitor::Monitor()
 104 a.arora          1.73    : _module_handle(0), 
 105                            _controller(0),
 106                            _async(false),
 107                            _stopConnections(0),
 108                            _stopConnectionsSem(0),
 109 a.dunfey         1.76      _solicitSocketCount(0),
 110 a.dunfey         1.77      _tickle_client_socket(-1),
 111                            _tickle_server_socket(-1),
 112                            _tickle_peer_socket(-1)
 113 mike             1.2  {
 114 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 115 mike             1.2      Socket::initializeInterface();
 116 mday             1.25     _rep = 0;
 117 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 118 a.arora          1.73 
 119                           // setup the tickler
 120                           initializeTickler();
 121                           
 122                           // Start the count at 1 because initilizeTickler() 
 123                           // has added an entry in the first position of the 
 124                           // _entries array
 125                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 126 mday             1.37     {
 127                              _MonitorEntry entry(0, 0, 0);
 128                              _entries.append(entry);
 129                           }
 130 mike             1.2  }
 131                       
 132 mday             1.18 Monitor::Monitor(Boolean async)
 133 a.arora          1.73    : _module_handle(0),
 134                            _controller(0),
 135                            _async(async),
 136                            _stopConnections(0),
 137                            _stopConnectionsSem(0),
 138 a.dunfey         1.76      _solicitSocketCount(0),
 139 a.dunfey         1.77      _tickle_client_socket(-1),
 140                            _tickle_server_socket(-1),
 141                            _tickle_peer_socket(-1)
 142 mday             1.18 {
 143 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 144 mday             1.18     Socket::initializeInterface();
 145 mday             1.25     _rep = 0;
 146 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 147 a.arora          1.73 
 148                           // setup the tickler
 149                           initializeTickler();
 150                       
 151                           // Start the count at 1 because initilizeTickler() 
 152                           // has added an entry in the first position of the
 153                           // _entries array
 154                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 155 mday             1.37     {
 156                              _MonitorEntry entry(0, 0, 0);
 157                              _entries.append(entry);
 158                           }
 159 mday             1.18 }
 160 mday             1.20 
 161 mike             1.2  Monitor::~Monitor()
 162                       {
 163 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 164                                         "deregistering with module controller");
 165 kumpf            1.10 
 166 kumpf            1.11     if(_module_handle != NULL)
 167 mday             1.8      {
 168                              _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
 169                              _controller = 0;
 170 kumpf            1.10        delete _module_handle;
 171 mday             1.8      }
 172 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
 173 kumpf            1.48 
 174 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 175 a.dunfey         1.76 
 176                           try{
 177 a.dunfey         1.77         if(_tickle_peer_socket >= 0)
 178 a.dunfey         1.76         {
 179                                   Socket::close(_tickle_peer_socket);
 180                               }
 181 a.dunfey         1.77         if(_tickle_client_socket >= 0)
 182 a.dunfey         1.76         {
 183                                   Socket::close(_tickle_client_socket);
 184                               }
 185 a.dunfey         1.77         if(_tickle_server_socket >= 0)
 186 a.dunfey         1.76         {
 187                                   Socket::close(_tickle_server_socket);
 188                               }
 189                           }
 190                           catch(...)
 191                           {
 192                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 193                                         "Failed to close tickle sockets");
 194                           }
 195                       
 196 mike             1.2      Socket::uninitializeInterface();
 197 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 198                                         "returning from monitor destructor");
 199 mday             1.18 }
 200                       
 201 a.arora          1.73 void Monitor::initializeTickler(){
 202                           /* 
 203                              NOTE: On any errors trying to 
 204                                    setup out tickle connection, 
 205                                    throw an exception/end the server
 206                           */
 207                       
 208                           /* setup the tickle server/listener */
 209                       
 210                           // get a socket for the server side
 211                           if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
 212                       	//handle error
 213                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
 214                       				 "Received error number $0 while creating the internal socket.",
 215                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 216                       				 errno);
 217                       #else
 218                       				 WSAGetLastError());
 219                       #endif
 220                       	throw Exception(parms);
 221                           }
 222 a.arora          1.73 
 223                           // initialize the address
 224                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 225                       #ifdef PEGASUS_OS_ZOS
 226                           _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 227                       #else
 228                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 229                       #pragma convert(37)
 230                       #endif
 231                           _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 232                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 233                       #pragma convert(0)
 234                       #endif
 235                       #endif
 236                           _tickle_server_addr.sin_family = PF_INET;
 237                           _tickle_server_addr.sin_port = 0;
 238                       
 239                           PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);
 240                       
 241                           // bind server side to socket
 242                           if((::bind(_tickle_server_socket,
 243 a.arora          1.73 	       (struct sockaddr *)&_tickle_server_addr, 
 244                       	       sizeof(_tickle_server_addr))) < 0){
 245                       	// handle error
 246                       	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
 247                       				 "Received error number $0 while binding the internal socket.",
 248                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 249                       				 errno);
 250                       #else
 251                       				 WSAGetLastError());
 252                       #endif
 253                               throw Exception(parms);
 254                           }
 255                       
 256                           // tell the kernel we are a server
 257                           if((::listen(_tickle_server_socket,3)) < 0){
 258                       	// handle error
 259                       	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
 260                       			 "Received error number $0 while listening to the internal socket.",
 261                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 262                       				 errno);
 263                       #else
 264 a.arora          1.73 				 WSAGetLastError());
 265                       #endif
 266                       	throw Exception(parms);
 267                           }
 268                           
 269                           // make sure we have the correct socket for our server
 270                           int sock = ::getsockname(_tickle_server_socket,
 271                       			     (struct sockaddr*)&_tickle_server_addr,
 272                       			     &_addr_size); 
 273                           if(sock < 0){
 274                       	// handle error
 275                       	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
 276                       			 "Received error number $0 while getting the internal socket name.",
 277                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 278                       				 errno);
 279                       #else
 280                       				 WSAGetLastError());
 281                       #endif
 282                       	throw Exception(parms);
 283                           }
 284                       
 285 a.arora          1.73     /* set up the tickle client/connector */
 286                            
 287                           // get a socket for our tickle client
 288                           if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
 289                       	// handle error
 290                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
 291                       			 "Received error number $0 while creating the internal client socket.",
 292                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 293                       				 errno);
 294                       #else
 295                       				 WSAGetLastError());
 296                       #endif
 297                       	throw Exception(parms);
 298                           }
 299                       
 300                           // setup the address of the client
 301                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 302                       #ifdef PEGASUS_OS_ZOS
 303                           _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 304                       #else
 305                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 306 a.arora          1.73 #pragma convert(37)
 307                       #endif
 308                           _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 309                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 310                       #pragma convert(0)
 311                       #endif
 312                       #endif
 313                           _tickle_client_addr.sin_family = PF_INET;
 314                           _tickle_client_addr.sin_port = 0;
 315                       
 316                           // bind socket to client side
 317                           if((::bind(_tickle_client_socket,
 318                       	       (struct sockaddr*)&_tickle_client_addr,
 319                       	       sizeof(_tickle_client_addr))) < 0){
 320                       	// handle error
 321                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
 322                       			 "Received error number $0 while binding the internal client socket.",
 323                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 324                       				 errno);
 325                       #else
 326                       				 WSAGetLastError());
 327 a.arora          1.73 #endif
 328                       	throw Exception(parms);
 329                           }
 330                       
 331                           // connect to server side
 332                           if((::connect(_tickle_client_socket,
 333                       		  (struct sockaddr*)&_tickle_server_addr,
 334                       		  sizeof(_tickle_server_addr))) < 0){
 335                       	// handle error
 336                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
 337                       			 "Received error number $0 while connecting the internal client socket.",
 338                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 339                       				 errno);
 340                       #else
 341                       				 WSAGetLastError());
 342                       #endif
 343                       	throw Exception(parms);
 344                           }
 345                       
 346                           /* set up the slave connection */
 347                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
 348 a.arora          1.73     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);
 349                           pegasus_sleep(1); 
 350                       
 351                           // this call may fail, we will try a max of 20 times to establish this peer connection
 352                           if((_tickle_peer_socket = ::accept(_tickle_server_socket,
 353                       				       (struct sockaddr*)&_tickle_peer_addr,
 354                       				       &peer_size)) < 0){
 355                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 356                               // Only retry on non-windows platforms.
 357                               if(_tickle_peer_socket == -1 && errno == EAGAIN)
 358                               {
 359                                 int retries = 0;                                                                        
 360                                 do
 361                                 {
 362                                   pegasus_sleep(1);
 363                                   _tickle_peer_socket = ::accept(_tickle_server_socket,
 364                       					   (struct sockaddr*)&_tickle_peer_addr,
 365                       					   &peer_size);
 366                                   retries++;
 367                                 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
 368                               }
 369 a.arora          1.73 #endif
 370                           }
 371                           if(_tickle_peer_socket == -1){
 372                       	// handle error
 373                       	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
 374                       			 "Received error number $0 while accepting the internal socket connection.",
 375                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 376                       				 errno);
 377                       #else
 378                       				 WSAGetLastError());
 379                       #endif
 380                       	throw Exception(parms);
 381                           }
 382                           // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
 383                           // checks entries with IDLE state for events
 384                           _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
 385                           entry._status = _MonitorEntry::IDLE;
 386                           _entries.append(entry);
 387                       }
 388                       
 389                       void Monitor::tickle(void)
 390 a.arora          1.73 {
 391 sushma.fernandes 1.78     static char _buffer[] =
 392 a.arora          1.73     {
 393                             '0','0'
 394                           };
 395                                    
 396 sushma.fernandes 1.78     AutoMutex autoMutex(_tickle_mutex);
 397                           Socket::disableBlocking(_tickle_client_socket);    
 398                           Socket::write(_tickle_client_socket,&_buffer, 2);
 399                           Socket::enableBlocking(_tickle_client_socket); 
 400                       }
 401                       
 402                       void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
 403                       {
 404                           // Set the state to requested state
 405                           _entries[index]._status = status;
 406 a.arora          1.73 }
 407                       
 408 mike             1.2  Boolean Monitor::run(Uint32 milliseconds)
 409                       {
 410 mday             1.18 
 411 mday             1.25     Boolean handled_events = false;
 412 a.arora          1.73     int i = 0;
 413                            
 414 kumpf            1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
 415 a.arora          1.73 
 416 mday             1.25     fd_set fdread;
 417                           FD_ZERO(&fdread);
 418 a.arora          1.73 
 419 mday             1.37     _entry_mut.lock(pegasus_thread_self());
 420 mday             1.13     
 421 kumpf            1.48     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries  
 422                           if (_stopConnections == 1) 
 423                           {
 424                               for ( int indx = 0; indx < (int)_entries.size(); indx++)
 425                               {
 426                                   if (_entries[indx]._type == Monitor::ACCEPTOR)
 427                                   {
 428                                       if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
 429                                       {
 430                                          if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
 431                                               _entries[indx]._status.value() == _MonitorEntry::DYING )
 432                                          {
 433                                              // remove the entry
 434                       		       _entries[indx]._status = _MonitorEntry::EMPTY;
 435                                          }
 436                                          else
 437                                          {
 438                                              // set status to DYING
 439 kumpf            1.52                       _entries[indx]._status = _MonitorEntry::DYING;
 440 kumpf            1.48                    }
 441                                      }
 442                                  }
 443                               }
 444                               _stopConnections = 0;
 445 a.arora          1.73 	_stopConnectionsSem.signal();
 446 kumpf            1.48     }
 447 kumpf            1.51 
 448 kumpf            1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
 449                           {
 450                              if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&
 451                                       (_entries[indx]._type == Monitor::CONNECTION))
 452                              {
 453                                 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
 454                                 PEGASUS_ASSERT(q != 0);
 455                                 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
 456                                 Message* message= new CloseConnectionMessage(_entries[indx].socket);
 457                                 message->dest = o.getQueueId();
 458                       
 459                                 // HTTPAcceptor is responsible for closing the connection. 
 460                                 // The lock is released to allow HTTPAcceptor to call
 461                                 // unsolicitSocketMessages to free the entry. 
 462                                 // Once HTTPAcceptor completes processing of the close
 463                                 // connection, the lock is re-requested and processing of
 464                                 // the for loop continues.  This is safe with the current
 465                                 // implementation of the _entries object.  Note that the
 466                                 // loop condition accesses the _entries.size() on each
 467                                 // iteration, so that a change in size while the mutex is
 468                                 // unlocked will not result in an ArrayIndexOutOfBounds
 469 kumpf            1.68           // exception.
 470                       
 471                                 _entry_mut.unlock();
 472                                 o.enqueue(message);
 473                                 _entry_mut.lock(pegasus_thread_self());
 474                              }
 475                           }
 476                       
 477 kumpf            1.51     Uint32 _idleEntries = 0;
 478 a.arora          1.73    
 479                           /*
 480                       	We will keep track of the maximum socket number and pass this value
 481                       	to the kernel as a parameter to SELECT.  This loop seems like a good 
 482                       	place to calculate the max file descriptor (maximum socket number)
 483                       	because we have to traverse the entire array.
 484                           */ 
 485                           int maxSocketCurrentPass = 0;
 486 mday             1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
 487 mike             1.2      {
 488 a.arora          1.73        if(maxSocketCurrentPass < _entries[indx].socket)
 489                       	  maxSocketCurrentPass = _entries[indx].socket;
 490                       
 491 mday             1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
 492 mday             1.25        {
 493 kumpf            1.51 	  _idleEntries++;
 494 mday             1.25 	  FD_SET(_entries[indx].socket, &fdread);
 495                              }
 496 mday             1.13     }
 497 s.hills          1.62 
 498 a.arora          1.73     /*
 499                       	Add 1 then assign maxSocket accordingly. We add 1 to account for
 500                       	descriptors starting at 0.
 501                           */
 502                           maxSocketCurrentPass++;
 503                       
 504 kumpf            1.51     _entry_mut.unlock(); 
 505 a.arora          1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 506 kumpf            1.51    _entry_mut.lock(pegasus_thread_self());
 507 mday             1.25 
 508 mike             1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
 509 kumpf            1.50     if(events == SOCKET_ERROR)
 510 mike             1.2  #else
 511 kumpf            1.50     if(events == -1)
 512 mike             1.2  #endif
 513 mday             1.13     {
 514 kumpf            1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 515                                 "Monitor::run - errorno = %d has occurred on select.", errno);
 516                              // The EBADF error indicates that one or more or the file
 517                              // descriptions was not valid. This could indicate that
 518                              // the _entries structure has been corrupted or that
 519                              // we have a synchronization error.
 520                       
 521                              PEGASUS_ASSERT(errno != EBADF);
 522                           }
 523                           else if (events)
 524                           {
 525 kumpf            1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 526                                 "Monitor::run select event received events = %d, monitoring %d idle entries", 
 527                       	   events, _idleEntries);
 528 mday             1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
 529                              {
 530 kumpf            1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
 531                                 // owned by the Monitor).
 532                       	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) && 
 533                       	     (FD_ISSET(_entries[indx].socket, &fdread)))
 534 mday             1.25 	  {
 535                       	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
 536 kumpf            1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 537                                         "Monitor::run indx = %d, queueId =  %d, q = %p",
 538                                         indx, _entries[indx].queueId, q);
 539                                    PEGASUS_ASSERT(q !=0);
 540 mday             1.37 
 541                       	     try 
 542 mday             1.25 	     {
 543 mday             1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
 544                       		{
 545 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 546                                            "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
 547 mday             1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
 548 sushma.fernandes 1.78 
 549                                          // Do not update the entry just yet. The entry gets updated once
 550                                          // the request has been read. 
 551                       		   //_entries[indx]._status = _MonitorEntry::BUSY;
 552                       
 553 kumpf            1.66                    // If allocate_and_awaken failure, retry on next iteration
 554 a.arora          1.73 /* Removed for PEP 183.
 555 kumpf            1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
 556                                                  (void *)q, _dispatch))
 557 kumpf            1.67                    {
 558                                             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 559                                                 "Monitor::run: Insufficient resources to process request.");
 560                                             _entries[indx]._status = _MonitorEntry::IDLE;
 561                                             _entry_mut.unlock();
 562                                             return true;
 563                                          }
 564 a.arora          1.73 */
 565                       // Added for PEP 183
 566                       		   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
 567                         			 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 568                                                "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 569                                          dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 570                                          try
 571                                          {
 572                                              dst->run(1);
 573                                          }
 574                          		   catch (...)
 575                          		   {
 576                             			Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 577                                 		"Monitor::_dispatch: exception received");
 578                          		   }
 579                          		   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 580                                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 581                       
 582 sushma.fernandes 1.78                    // It is possible the entry status may not be set to busy.
 583                                          // The following will fail in that case. 
 584                          		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                                                         
 585 a.arora          1.73 		   // Once the HTTPConnection thread has set the status value to either
 586                       		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 587                       		   // to the Monitor.  It is no longer permissible to access the connection
 588                       		   // or the entry in the _entries table.
 589 sushma.fernandes 1.78 
 590                                          // The following is not relevant as the worker thread or the
 591                                          // reader thread will update the status of the entry.
 592                       		   //if (dst->_connectionClosePending)
 593                       		   //{  
 594                       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 595                       		   //}
 596                       		   //else
 597                       		   //{
 598                       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 599                       		   //}	
 600                       // end Added for PEP 183 
 601 a.arora          1.73 		}
 602                       	        else if( _entries[indx]._type == Monitor::INTERNAL){
 603                       			// set ourself to BUSY, 
 604                                               // read the data  
 605                                               // and set ourself back to IDLE
 606                       		
 607                       		   	_entries[indx]._status == _MonitorEntry::BUSY;
 608                       			static char buffer[2];
 609                             			Socket::disableBlocking(_entries[indx].socket);
 610                             			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
 611                             			Socket::enableBlocking(_entries[indx].socket);
 612                       			_entries[indx]._status == _MonitorEntry::IDLE;
 613 mday             1.37 		}
 614                       		else
 615 mday             1.25 		{
 616 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 617                                            "Non-connection entry, indx = %d, has been received.", indx);
 618 mday             1.37 		   int events = 0;
 619                       		   events |= SocketMessage::READ;
 620                       		   Message *msg = new SocketMessage(_entries[indx].socket, events);
 621                       		   _entries[indx]._status = _MonitorEntry::BUSY;
 622                       		   _entry_mut.unlock();
 623 mday             1.27 
 624 mday             1.37 		   q->enqueue(msg);
 625                       		   _entries[indx]._status = _MonitorEntry::IDLE;
 626 mday             1.25 		   return true;
 627                       		}
 628                       	     }
 629 mday             1.37 	     catch(...)
 630 mday             1.25 	     {
 631                       	     }
 632                       	     handled_events = true;
 633                       	  }
 634                              }
 635 mday             1.24     }
 636 mday             1.37     _entry_mut.unlock();
 637 mday             1.13     return(handled_events);
 638 mike             1.2  }
 639                       
 640 chuck            1.74 void Monitor::stopListeningForConnections(Boolean wait)
 641 kumpf            1.48 {
 642                           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
 643 a.arora          1.73     // set boolean then tickle the server to recognize _stopConnections 
 644 kumpf            1.48     _stopConnections = 1;
 645 a.arora          1.73     tickle();
 646 kumpf            1.48 
 647 chuck            1.74     if (wait)
 648 a.arora          1.73     {
 649 chuck            1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
 650                             // caller of this function may unbind the ports while the monitor
 651                             // is still accepting connections on them.
 652                             try
 653                       	{
 654                       	  _stopConnectionsSem.time_wait(10000);
 655                       	}
 656                             catch (TimeOut &)
 657                       	{
 658                       	  // The monitor is probably busy processng a very long request, and is
 659                       	  // not accepting connections.  Let the caller unbind the ports.
 660                       	}
 661 a.arora          1.73     }
 662                           
 663 kumpf            1.48     PEG_METHOD_EXIT();
 664                       }
 665 mday             1.25 
 666 mday             1.37 
 667 mday             1.25 int  Monitor::solicitSocketMessages(
 668 mike             1.2      Sint32 socket, 
 669                           Uint32 events,
 670 mday             1.8      Uint32 queueId, 
 671                           int type)
 672 mike             1.2  {
 673 a.arora          1.73    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");                                        
 674 alagaraja        1.75    AutoMutex autoMut(_entry_mut);
 675 a.arora          1.73    // Check to see if we need to dynamically grow the _entries array
 676                          // We always want the _entries array to 2 bigger than the
 677                          // current connections requested
 678                          _solicitSocketCount++;  // bump the count
 679                          int size = (int)_entries.size();
 680                          if(_solicitSocketCount >= (size-1)){
 681                               for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
 682                                       _MonitorEntry entry(0, 0, 0);
 683                                       _entries.append(entry);
 684                               }
 685                          }
 686 kumpf            1.4  
 687 a.arora          1.73    int index;
 688                          for(index = 1; index < (int)_entries.size(); index++)
 689 mday             1.25    {
 690 a.arora          1.73       try
 691 mday             1.37       {
 692 a.arora          1.73          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
 693                                {
 694                                   _entries[index].socket = socket;
 695                                   _entries[index].queueId  = queueId;
 696                                   _entries[index]._type = type;
 697                                   _entries[index]._status = _MonitorEntry::IDLE;
 698 alagaraja        1.75                                        
 699 a.arora          1.73             return index;
 700                                }
 701 mday             1.37       }
 702                             catch(...)
 703 mday             1.25       {
 704                             }
 705                          }
 706 a.arora          1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
 707 mday             1.25    PEG_METHOD_EXIT();
 708 kumpf            1.50    return -1;
 709 a.arora          1.73 
 710 mike             1.2  }
 711                       
 712 mday             1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
 713 mike             1.2  {
 714 kumpf            1.50 
 715 mday             1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
 716 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
 717 a.arora          1.73 
 718                           /*
 719                               Start at index = 1 because _entries[0] is the tickle entry which never needs
 720                               to be EMPTY;
 721                           */
 722                           int index;
 723                           for(index = 1; index < _entries.size(); index++)
 724 mike             1.2      {
 725 mday             1.25        if(_entries[index].socket == socket)
 726                              {
 727 a.arora          1.73           _entries[index]._status = _MonitorEntry::EMPTY;
 728                                 _entries[index].socket = -1;
 729                                 _solicitSocketCount--;
 730                                 break;
 731 mday             1.25        }
 732 mike             1.2      }
 733 a.arora          1.73 
 734                           /*
 735                       	Dynamic Contraction:
 736                       	To remove excess entries we will start from the end of the _entries array
 737                       	and remove all entries with EMPTY status until we find the first NON EMPTY.
 738                       	This prevents the positions, of the NON EMPTY entries, from being changed.
 739                           */ 
 740                           index = _entries.size() - 1;
 741                           while(_entries[index]._status == _MonitorEntry::EMPTY){
 742                       	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
 743                                       _entries.remove(index);
 744                       	index--;
 745                           }
 746                       
 747 kumpf            1.4      PEG_METHOD_EXIT();
 748 mike             1.2  }
 749                       
 750 a.arora          1.73 // Note: this is no longer called with PEP 183.
 751 mday             1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 752                       {
 753 mday             1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
 754 kumpf            1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 755 kumpf            1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 756                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 757 kumpf            1.51    try
 758                          {
 759                             dst->run(1);
 760                          }
 761                          catch (...)
 762                          {
 763                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 764                                 "Monitor::_dispatch: exception received");
 765                          }
 766                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 767                                 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 768                          
 769 kumpf            1.53    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
 770 kumpf            1.68 
 771                          // Once the HTTPConnection thread has set the status value to either
 772                          // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 773                          // to the Monitor.  It is no longer permissible to access the connection
 774                          // or the entry in the _entries table.
 775 kumpf            1.50    if (dst->_connectionClosePending)
 776                          {
 777 kumpf            1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 778                          }
 779                          else
 780                          {
 781                             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 782 kumpf            1.50    }
 783 mday             1.8     return 0;
 784 mday             1.40 }
 785                       
 786                       
 787                       
 788                       ////************************* monitor 2 *****************************////
 789 mday             1.43 ////************************* monitor 2 *****************************////
 790                       ////************************* monitor 2 *****************************////
 791                       ////************************* monitor 2 *****************************////
 792                       ////************************* monitor 2 *****************************////
 793                       ////************************* monitor 2 *****************************////
 794                       ////************************* monitor 2 *****************************////
 795 mday             1.40 
 796                       
 797 mday             1.56 
 798                       
 799                       
 800 mday             1.42 m2e_rep::m2e_rep(void)
 801 mday             1.43   :Base(), state(IDLE)
 802                       
 803 mday             1.42 {
 804                       }
 805                       
 806                       m2e_rep::m2e_rep(monitor_2_entry_type _type, 
 807                       		 pegasus_socket _sock, 
 808                       		 void* _accept, 
 809                       		 void* _dispatch)
 810 mday             1.43   : Base(), type(_type), state(IDLE), psock(_sock), 
 811 mday             1.42     accept_parm(_accept), dispatch_parm(_dispatch)
 812                       {
 813                         
 814                       }
 815                       
 816                       m2e_rep::~m2e_rep(void)
 817                       {
 818                       }
 819                       
 820                       m2e_rep::m2e_rep(const m2e_rep& r)
 821                         : Base()
 822                       {
 823                         if(this != &r){
 824                           type = r.type;
 825                           psock = r.psock;
 826                           accept_parm = r.accept_parm;
 827                           dispatch_parm = r.dispatch_parm;
 828 mday             1.43     state = IDLE;
 829                           
 830 mday             1.42   }
 831                       }
 832                       
 833                       
 834                       m2e_rep& m2e_rep::operator =(const m2e_rep& r)
 835                       {
 836                         if(this != &r) {
 837                           type = r.type;
 838                           psock = r.psock;
 839                           accept_parm = r.accept_parm;
 840                           dispatch_parm = r.dispatch_parm;
 841 mday             1.43     state = IDLE;
 842 mday             1.42   }
 843                         return *this;
 844                       }
 845                       
 846                       Boolean m2e_rep::operator ==(const m2e_rep& r)
 847                       {
 848                         if(this == &r)
 849                           return true;
 850                         return false;
 851                       }
 852                       
 853                       Boolean m2e_rep::operator ==(void* r)
 854                       {
 855                         if((void*)this == r)
 856                           return true;
 857                         return false;
 858                       }
 859                       
 860                       m2e_rep::operator pegasus_socket() const 
 861                       {
 862                         return psock;
 863 mday             1.42 }
 864                       
 865                       
 866 mday             1.40 monitor_2_entry::monitor_2_entry(void)
 867                       {
 868 mday             1.42   _rep = new m2e_rep();
 869 mday             1.40 }
 870                       
 871 mday             1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, 
 872                       				 monitor_2_entry_type _type, 
 873                       				 void* _accept_parm, void* _dispatch_parm)
 874 mday             1.40 {
 875 mday             1.42   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
 876 mday             1.40 }
 877                       
 878                       monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
 879                       {
 880                         if(this != &e){
 881 mday             1.42     Inc(this->_rep = e._rep);
 882 mday             1.40   }
 883                       }
 884                       
 885                       monitor_2_entry::~monitor_2_entry(void)
 886                       {
 887 a.dunfey         1.76 
 888 mday             1.42   Dec(_rep);
 889 mday             1.40 }
 890                       
 891                       monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
 892                       {
 893                         if(this != &e){
 894 mday             1.42     Dec(_rep);
 895                           Inc(this->_rep = e._rep);
 896 mday             1.40   }
 897                         return *this;
 898                       }
 899                       
 900 mday             1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
 901 mday             1.40 {
 902                         if(this == &me)
 903                           return true;
 904                         return false;
 905                       }
 906                       
 907 mday             1.42 Boolean monitor_2_entry::operator ==(void* k) const
 908 mday             1.40 {
 909                         if((void *)this == k)
 910                           return true;
 911                         return false;
 912                       }
 913                       
 914                       
 915 mday             1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
 916 mday             1.40 {
 917 mday             1.42   return _rep->type;
 918                       }
 919                       
 920                       void monitor_2_entry::set_type(monitor_2_entry_type t)
 921                       {
 922                         _rep->type = t;
 923                       }
 924                       
 925                       
 926 mday             1.43 monitor_2_entry_state  monitor_2_entry::get_state(void) const
 927                       {
 928                         return (monitor_2_entry_state) _rep->state.value();
 929                       }
 930                       
 931                       void monitor_2_entry::set_state(monitor_2_entry_state t)
 932                       {
 933                         _rep->state = t;
 934                       }
 935                       
 936 mday             1.42 void* monitor_2_entry::get_accept(void) const
 937                       {
 938                         return _rep->accept_parm;
 939                       }
 940                       
 941                       void monitor_2_entry::set_accept(void* a)
 942                       {
 943                         _rep->accept_parm = a;
 944                       }
 945                       
 946                       
 947                       void* monitor_2_entry::get_dispatch(void) const
 948                       {
 949                         return _rep->dispatch_parm;
 950                       }
 951                       
 952                       void monitor_2_entry::set_dispatch(void* a)
 953                       {
 954                         _rep->dispatch_parm = a;
 955                       }
 956                       
 957 mday             1.42 pegasus_socket monitor_2_entry::get_sock(void) const
 958                       {
 959                         return _rep->psock;
 960                       }
 961                       
 962                       
 963                       void monitor_2_entry::set_sock(pegasus_socket& s)
 964                       {
 965                         _rep->psock = s;
 966                         
 967 mday             1.40 }
 968                       
 969 mday             1.59 //static monitor_2* _m2_instance;
 970 mday             1.40 
 971 mday             1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
 972                       
 973 mday             1.40 monitor_2::monitor_2(void)
 974 mday             1.42   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0), 
 975 mday             1.49     _ready(true, 0), _die(0), _requestCount(0)
 976 mday             1.40 {
 977                         try {
 978                           
 979                           bsd_socket_factory _factory;
 980                       
 981                           // set up the listener/acceptor 
 982                           pegasus_socket temp = pegasus_socket(&_factory);
 983                           
 984                           temp.socket(PF_INET, SOCK_STREAM, 0);
 985                           // initialize the address
 986                           memset(&_tickle_addr, 0, sizeof(_tickle_addr));
 987 marek            1.47 #ifdef PEGASUS_OS_ZOS
 988                           _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 989                       #else
 990 chuck            1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 991                       #pragma convert(37)
 992                       #endif
 993 mday             1.40     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 994 chuck            1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 995                       #pragma convert(0)
 996                       #endif
 997 marek            1.47 #endif
 998 mday             1.40     _tickle_addr.sin_family = PF_INET;
 999                           _tickle_addr.sin_port = 0;
1000                       
1001                           PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
1002                           
1003                           temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
1004                           temp.listen(3);  
1005                           temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
1006                       
1007                           // set up the connector
1008                       
1009                           pegasus_socket tickler = pegasus_socket(&_factory);
1010                           tickler.socket(PF_INET, SOCK_STREAM, 0);
1011                           struct sockaddr_in _addr;
1012                           memset(&_addr, 0, sizeof(_addr));
1013 kumpf            1.48 #ifdef PEGASUS_OS_ZOS
1014 marek            1.47     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
1015                       #else
1016 mday             1.40     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1017 marek            1.47 #endif
1018 mday             1.40     _addr.sin_family = PF_INET;
1019                           _addr.sin_port = 0;
1020                           tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
1021                           tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
1022                       
1023 mday             1.42     _tickler.set_sock(tickler);
1024                           _tickler.set_type(INTERNAL);
1025 mday             1.43     _tickler.set_state(BUSY);
1026                           
1027 mday             1.40     struct sockaddr_in peer;
1028                           memset(&peer, 0, sizeof(peer));
1029                           PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1030                       
1031                           pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
1032 mday             1.57     
1033 mday             1.42     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
1034 a.arora          1.71 
1035                       // No need to set _tickle's state as BUSY, since monitor_2::run() now
1036                       // does a select only on sockets which are in IDLE (default) state.
1037                       //  _tickle->set_state(BUSY);
1038 mday             1.43     
1039 mday             1.40     _listeners.insert_first(_tickle);
1040                       
1041                         }
1042                         catch(...){  }
1043                       }
1044                       
1045                       monitor_2::~monitor_2(void)
1046                       {
1047 mday             1.60 
1048                          stop();
1049                       
1050 mday             1.41   try {
1051                           monitor_2_entry* temp = _listeners.remove_first();
1052                           while(temp){
1053                             delete temp;
1054                             temp = _listeners.remove_first();
1055                           }
1056                         }
1057 mday             1.60 
1058 mday             1.41   catch(...){  }
1059 mday             1.60   
1060                       
1061                         try 
1062                         {
1063                            HTTPConnection2* temp = _connections.remove_first();
1064                            while(temp)
1065                            {
1066                       	delete temp;
1067                       	temp = _connections.remove_first();
1068                            }
1069                         }
1070                         catch(...)
1071                         {
1072                         }
1073                         
1074                       
1075 mday             1.40 }
1076                       
1077                       
1078                       void monitor_2::run(void)
1079                       {
1080                         monitor_2_entry* temp;
1081 a.arora          1.71   int _nonIdle=0, _idleCount=0, events;
1082                       
1083 mday             1.40   while(_die.value() == 0) {
1084 a.arora          1.71     _nonIdle=_idleCount=0;
1085 mday             1.49      
1086 mday             1.57      struct timeval tv_idle = { 60, 0 };
1087 mday             1.56      
1088 mday             1.40     // place all sockets in the select set 
1089                           FD_ZERO(&rd_fd_set);
1090                           try {
1091                             _listeners.lock(pegasus_thread_self());
1092                             temp = _listeners.next(0);
1093 a.arora          1.71       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1094                              "monitor_2::run:Creating New FD list for SELECT.");
1095 mday             1.40       while(temp != 0 ){
1096 mday             1.57 	if(temp->get_state() == CLOSED ) {
1097 mday             1.43 	  monitor_2_entry* closed = temp;
1098 a.arora          1.72       temp = _listeners.next(closed);
1099 mday             1.43 	  _listeners.remove_no_lock(closed);
1100 a.arora          1.71       
1101                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1102                              "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());
1103 mday             1.60 	  
1104 mday             1.49 	  HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
1105                       	  delete cn;
1106 mday             1.43 	  delete closed;
1107                       	}
1108 mday             1.45 	if(temp == 0)
1109                       	   break;
1110 a.arora          1.71 
1111                       
1112                               //Count the number if IDLE sockets
1113                               if(temp->get_state() != IDLE ) _nonIdle++;
1114                                else _idleCount++;
1115                       
1116 mday             1.46 	Sint32 fd = (Sint32) temp->get_sock();
1117 a.arora          1.71 
1118                               //Select should be called ONLY on the FDs which are in IDLE state
1119                       	if((fd >= 0) && (temp->get_state() == IDLE))
1120                               {
1121                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1122                                  "monitor_2::run:Adding FD %d to the list for SELECT.",fd);
1123                       	  FD_SET(fd , &rd_fd_set);
1124                               }
1125                       	   temp = _listeners.next(temp);
1126 mday             1.40       }
1127                             _listeners.unlock();
1128                           } 
1129                           catch(...){
1130                             return;
1131                           }
1132 a.arora          1.71 
1133 mday             1.42     // important -  the dispatch routine has pointers to all the 
1134                           // entries that are readable. These entries can be changed but 
1135                           // the pointer must not be tampered with. 
1136 mday             1.56     if(_connections.count() )
1137 a.arora          1.71        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
1138 mday             1.56     else
1139 a.arora          1.71        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);
1140 mday             1.57     
1141                           if(_die.value())
1142                           {
1143                              break;
1144                           }
1145 a.arora          1.71 
1146                       #ifdef PEGASUS_OS_TYPE_WINDOWS
1147                           if(events == SOCKET_ERROR)
1148                       #else
1149                           if(events == -1)
1150                       #endif
1151                           {
1152                              Tracer::trace(TRC_HTTP, Tracer::LEVEL2,
1153                                 "monitor_2:run:INVALID FD. errorno = %d on select.", errno);
1154                              // The EBADF error indicates that one or more or the file
1155                              // descriptions was not valid. This could indicate that
1156                              // the _entries structure has been corrupted or that
1157                              // we have a synchronization error.
1158                       
1159                            // Keeping the line below commented for time being.
1160                            //  PEGASUS_ASSERT(errno != EBADF);
1161                           }
1162                           else if (events)
1163                           {
1164                              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1165                                 "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);
1166 a.arora          1.71    
1167 mday             1.57     
1168 mday             1.40     try {
1169                             _listeners.lock(pegasus_thread_self());
1170                             temp = _listeners.next(0);
1171                             while(temp != 0 ){
1172 a.arora          1.72 	  Sint32 fd = (Sint32) temp->get_sock();
1173                       	  if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
1174                       	  if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);
1175 mday             1.42 	  FD_CLR(fd,  &rd_fd_set);
1176                       	  monitor_2_entry* ready = new monitor_2_entry(*temp);
1177 mday             1.49 	  try 
1178                       	  {
1179                       	     _ready.insert_first(ready);
1180                       	  }
1181                       	  catch(...)
1182                       	  {
1183                       	  }
1184                       	  
1185 mday             1.42 	  _requestCount++;
1186 mday             1.40 	}
1187                       	temp = _listeners.next(temp);
1188                             }
1189                             _listeners.unlock();
1190                           } 
1191                           catch(...){
1192                             return;
1193                           }
1194                           // now handle the sockets that are ready to read 
1195 mday             1.56     if(_ready.count())
1196                              _dispatch();
1197                           else
1198                           {
1199                              if(_connections.count() == 0 )
1200                       	  _idle_dispatch(_idle_parm);
1201                           }
1202 a.arora          1.71    }  // if events
1203 mday             1.40   } // while alive 
1204 a.arora          1.72   _die=0;
1205 mday             1.57 
1206 mday             1.40 }
1207                       
1208 dj.gorey         1.70 int  monitor_2::solicitSocketMessages(
1209                           Sint32 socket,
1210                           Uint32 events,
1211                           Uint32 queueId,
1212                           int type)
1213                       {
1214                       
1215 a.arora          1.71    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");
1216 dj.gorey         1.70 
1217 alagaraja        1.75    AutoMutex autoMut(_entry_mut);
1218 dj.gorey         1.70 
1219                          for(int index = 0; index < (int)_entries.size(); index++)
1220                          {
1221                             try
1222                             {
1223                       	 if(_entries[index]._status.value() == monitor_2_entry::EMPTY)
1224                       	 {
1225                       	    _entries[index].socket = socket;
1226                       	    //_entries[index].queueId  = queueId;
1227                       	    //_entries[index]._type = type;
1228 a.arora          1.71 	    _entries[index]._status = IDLE;
1229 dj.gorey         1.70 
1230                       	    return index;
1231                       	 }
1232                             }
1233                             catch(...)
1234                             {
1235                             }
1236                       
1237                          }
1238                          PEG_METHOD_EXIT();
1239                          return -1;
1240                       }
1241                       
1242                       
1243                       void monitor_2::unsolicitSocketMessages(Sint32 socket)
1244                       {
1245                       
1246                           PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");
1247 alagaraja        1.75     AutoMutex autoMut(_entry2_mut);
1248 dj.gorey         1.70 
1249                           for(int index = 0; index < (int)_entries2.size(); index++)
1250                           {
1251                              if(_entries2[index].socket == socket)
1252                              {
1253                       	  _entries2[index]._status = monitor_2_entry::EMPTY; 
1254                       	  _entries2[index].socket = -1;
1255                       	  break;
1256                              }
1257                           }
1258                           PEG_METHOD_EXIT();
1259                       }
1260                       
1261 mday             1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
1262 mday             1.40 {
1263 mday             1.42   void* old = (void *)_session_dispatch;
1264 mday             1.40   _session_dispatch = dp;
1265                         return old;
1266                       }
1267                       
1268 mday             1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
1269                       {
1270                         void* old = (void*)_accept_dispatch;
1271                         _accept_dispatch = dp;
1272                         return old;
1273 mday             1.56 }
1274                       
1275                       void* monitor_2::set_idle_dispatch(void (*dp)(void*))
1276                       {
1277                          void* old = (void*)_idle_dispatch;
1278                          _idle_dispatch = dp;
1279                          return old;
1280                       }
1281                       
1282                       void* monitor_2::set_idle_parm(void* parm)
1283                       {
1284                          void* old = _idle_parm;
1285                          _idle_parm = parm;
1286                          return old;
1287 mday             1.42 }
1288                       
1289 mday             1.40 
1290 mday             1.60 
1291                       //-----------------------------------------------------------------
1292                       // Note on deleting the monitor_2_entry nodes: 
1293                       //  Each case: in the switch statement needs to handle the deletion 
1294                       //  of the monitor_2_entry * node differently. A SESSION dispatch 
1295                       //  routine MUST DELETE the entry during its dispatch handling. 
1296                       //  All other dispatch routines MUST NOT delete the entry during the 
1297                       //  dispatch handling, but must allow monitor_2::_dispatch to delete
1298                       //   the entry. 
1299                       //
1300                       //  The reason is pretty obscure and it is debatable whether or not
1301                       //  to even bother, but during cimserver shutdown the single monitor_2_entry* 
1302                       //  will leak unless the _session_dispatch routine takes care of deleting it. 
1303                       //
1304                       //  The reason is that a shutdown messages completely stops everything and 
1305                       //  the _session_dispatch routine never returns. So monitor_2::_dispatch is 
1306                       //  never able to do its own cleanup. 
1307                       //
1308                       // << Mon Oct 13 09:33:33 2003 mdd >>
1309                       //-----------------------------------------------------------------
1310                       
1311 mday             1.40 void monitor_2::_dispatch(void)
1312                       {
1313 mday             1.49    monitor_2_entry* entry;
1314                          
1315                          try 
1316                          {
1317                       
1318                       	 entry = _ready.remove_first();
1319                          }
1320                          catch(...)
1321                          {
1322                          }
1323                          
1324                         while(entry != 0 ) {
1325 mday             1.42     switch(entry->get_type()) {
1326 mday             1.40     case INTERNAL:
1327                             static char buffer[2];
1328 mday             1.49       entry->get_sock().disableBlocking();
1329 mday             1.42       entry->get_sock().read(&buffer, 2);
1330 mday             1.49       entry->get_sock().enableBlocking();
1331 a.arora          1.71       entry->set_state(IDLE);   // Set state of the socket to IDLE so that 
1332                                                       // monitor_2::run can add to the list of FDs
1333                                                       // on which select would be called.
1334                       
1335                            
1336                        
1337 mday             1.60       delete entry;
1338                             
1339 mday             1.40       break;
1340                           case LISTEN:
1341                             {
1342                       	static struct sockaddr peer;
1343                       	static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1344 mday             1.49 	entry->get_sock().disableBlocking();
1345 mday             1.42 	pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
1346 a.arora          1.71         entry->set_state(IDLE);  // Set state of the LISTEN socket to IDLE
1347 mday             1.65 #ifdef PEGASUS_OS_TYPE_WINDOWS
1348                           if((Sint32)connected  == SOCKET_ERROR)
1349                       #else
1350                       	if((Sint32)connected == -1 )
1351                       #endif
1352                       	{
1353                       	   delete entry;
1354                       	   break;
1355                       	}
1356                       	
1357 mday             1.49 	entry->get_sock().enableBlocking();
1358 mday             1.42 	monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
1359                       	if(temp && _accept_dispatch != 0)
1360 mday             1.49 	   _accept_dispatch(temp);
1361 mday             1.60 	delete entry;
1362                       	
1363 mday             1.40       }
1364                             break;
1365                           case SESSION:
1366 a.arora          1.72     case CLIENTSESSION:
1367 mday             1.60        if(_session_dispatch != 0 )
1368                              {
1369                       	  // NOTE: _session_dispatch will delete entry - do not do it here
1370 a.arora          1.72 	     unsigned client=0;
1371                                if(entry->get_type() == CLIENTSESSION) client = 1;
1372                                Sint32 sock=(Sint32)(entry->get_sock());
1373                            
1374                       	     _session_dispatch(entry);
1375                       
1376                                if(client)
1377                                {
1378                                  HTTPConnection2 *cn = monitor_2::remove_connection(sock);
1379                       	       if(cn) delete cn;
1380                                  // stop();
1381                                  _die=1;
1382                                }
1383 mday             1.60        }
1384                              
1385 mday             1.40       else {
1386                       	static char buffer[4096];
1387 mday             1.42 	int bytes = entry->get_sock().read(&buffer, 4096);
1388 mday             1.60 	delete entry;
1389 mday             1.40       }
1390                           
1391                             break;
1392                           case UNTYPED:
1393                           default:
1394 mday             1.60            delete entry;
1395 mday             1.40       break;
1396                           }
1397 mday             1.42     _requestCount--;
1398 mday             1.49     
1399                           if(_ready.count() == 0 )
1400                              break;
1401                           
1402                           try 
1403                           {
1404                              entry = _ready.remove_first();
1405                           }
1406                           catch(...)
1407                           {
1408                           }
1409                           
1410 mday             1.40   }
1411                       }
1412                       
1413                       void monitor_2::stop(void)
1414                       {
1415                         _die = 1;
1416                         tickle();
1417                         // shut down the listener list, free the list nodes
1418 mday             1.42   _tickler.get_sock().close();
1419 mday             1.40   _listeners.shutdown_queue();
1420                       }
1421                       
1422                       void monitor_2::tickle(void)
1423                       {
1424                         static char _buffer[] = 
1425                           {
1426                             '0','0'
1427                           };
1428                         
1429 mday             1.57   _tickler.get_sock().disableBlocking();
1430                         
1431 mday             1.42   _tickler.get_sock().write(&_buffer, 2);
1432 mday             1.57   _tickler.get_sock().enableBlocking();
1433                         
1434 mday             1.40 }
1435                       
1436                       
1437 mday             1.42 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps, 
1438                       				       monitor_2_entry_type type,
1439                       				       void* accept_parm, 
1440                       				       void* dispatch_parm)
1441 mday             1.40 {
1442 a.arora          1.71   Sint32 fd1,fd2;
1443                       
1444                         fd2=(Sint32) ps;
1445                       
1446 mday             1.42   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
1447 a.arora          1.71 
1448                       // The purpose of the following piece of code is to avoid duplicate entries in
1449                       // the _listeners list. Would it be too much of an overhead ?
1450                       try {
1451                       
1452                            monitor_2_entry* temp;
1453                       
1454                             _listeners.lock(pegasus_thread_self());
1455                             temp = _listeners.next(0);
1456                             while(temp != 0 )
1457                             {
1458                               fd1=(Sint32) temp->get_sock();
1459                       
1460                               if(fd1 == fd2)
1461                               {
1462                       
1463                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1464                                 "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);
1465                                   if(temp->get_state() == CLOSED)
1466                                   {
1467                                     temp->set_state(IDLE);
1468 a.arora          1.71               Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1469                                     "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);
1470                                    }
1471                                    _listeners.unlock();
1472                                   delete m2e;
1473                                   return 0;
1474                               }
1475                              temp = _listeners.next(temp);
1476                             }
1477                          } 
1478                          catch(...) 
1479                          {
1480                             delete m2e;
1481                             return 0;
1482                          }
1483                       
1484                       
1485                         _listeners.unlock();
1486                       
1487                       
1488 mday             1.40   try{
1489                           _listeners.insert_first(m2e);
1490                         }
1491                         catch(...){
1492                           delete m2e;
1493 mday             1.42     return 0;
1494 mday             1.40   }
1495 a.arora          1.71       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1496                             "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);
1497 mday             1.40   tickle();
1498 mday             1.42   return m2e;
1499 mday             1.40 }
1500                       
1501                       Boolean monitor_2::remove_entry(Sint32 s)
1502                       {
1503                         monitor_2_entry* temp;
1504                         try {
1505                           _listeners.try_lock(pegasus_thread_self());
1506                           temp = _listeners.next(0);
1507                           while(temp != 0){
1508 mday             1.42       if(s == (Sint32)temp->_rep->psock ){
1509 mday             1.40 	temp = _listeners.remove_no_lock(temp);
1510                       	delete temp;
1511                       	_listeners.unlock();
1512                       	return true;
1513                             }
1514                             temp = _listeners.next(temp);
1515                           }
1516                           _listeners.unlock();
1517                         }
1518                         catch(...){
1519                         }
1520                         return false;
1521 mday             1.7  }
1522 mday             1.37 
1523 mday             1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
1524                       {
1525                         return _requestCount.value();
1526                         
1527 mday             1.49 }
1528                       
1529                       
1530                       HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
1531                       {
1532                       
1533                          HTTPConnection2* temp;
1534                          try 
1535                          {
1536                             monitor_2::_connections.lock(pegasus_thread_self());
1537                             temp = monitor_2::_connections.next(0);
1538                             while(temp != 0 )
1539                             {
1540                       	 if(sock == temp->getSocket())
1541                       	 {
1542                       	    temp = monitor_2::_connections.remove_no_lock(temp);
1543                       	    monitor_2::_connections.unlock();
1544                       	    return temp;
1545                       	 }
1546                       	 temp = monitor_2::_connections.next(temp);
1547                             }
1548 mday             1.49       monitor_2::_connections.unlock();
1549                          }
1550                          catch(...)
1551                          {
1552                          }
1553                          return 0;
1554                       }
1555                       
1556                       Boolean monitor_2::insert_connection(HTTPConnection2* connection)
1557                       {
1558                          try 
1559                          {
1560                             monitor_2::_connections.insert_first(connection);
1561                          }
1562                          catch(...)
1563                          {
1564                             return false;
1565                          }
1566                          return true;
1567 mday             1.42 }
1568 mday             1.7  
1569 mike             1.2  
1570                       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2