(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.64 //%2003////////////////////////////////////////////////////////////////////////
   2 mike  1.2  //
   3 karl  1.64 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
   4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
   6            // IBM Corp.; EMC Corporation, The Open Group.
   7 mike  1.2  //
   8            // Permission is hereby granted, free of charge, to any person obtaining a copy
   9 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  10            // deal in the Software without restriction, including without limitation the
  11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  12 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
  13            // furnished to do so, subject to the following conditions:
  14            // 
  15 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  16 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  18 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  21 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23            //
  24            //==============================================================================
  25            //
  26            // Author: Mike Brasher (mbrasher@bmc.com)
  27            //
  28 mday  1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com 
  29 a.arora 1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
  30 mike    1.2  //
  31              //%/////////////////////////////////////////////////////////////////////////////
  32              
  33              #include <Pegasus/Common/Config.h>
  34 mday    1.40 
  35 mike    1.2  #include <cstring>
  36              #include "Monitor.h"
  37              #include "MessageQueue.h"
  38              #include "Socket.h"
  39 kumpf   1.4  #include <Pegasus/Common/Tracer.h>
  40 mday    1.7  #include <Pegasus/Common/HTTPConnection.h>
  41 kumpf   1.69 #include <Pegasus/Common/MessageQueueService.h>
  42 a.arora 1.73 #include <Pegasus/Common/Exception.h>
  43 mike    1.2  
  44              #ifdef PEGASUS_OS_TYPE_WINDOWS
  45              # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
  46              #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
  47              of <winsock.h>. It may have been indirectly included (e.g., by including \
  48 mday    1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
  49 mike    1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
  50              otherwise, less than 64 clients (the default) will be able to connect to the \
  51              CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
  52 mday    1.5  
  53 mike    1.2  # endif
  54              # define FD_SETSIZE 1024
  55 mday    1.5  # include <windows.h>
  56 mike    1.2  #else
  57              # include <sys/types.h>
  58              # include <sys/socket.h>
  59              # include <sys/time.h>
  60              # include <netinet/in.h>
  61              # include <netdb.h>
  62              # include <arpa/inet.h>
  63              #endif
  64              
  65              PEGASUS_USING_STD;
  66              
  67              PEGASUS_NAMESPACE_BEGIN
  68              
  69 mday    1.18 
  70 mday    1.25 static AtomicInt _connections = 0;
  71              
  72              static struct timeval create_time = {0, 1};
  73 mday    1.38 static struct timeval destroy_time = {300, 0};
  74 mday    1.26 static struct timeval deadlock_time = {0, 0};
  75 mday    1.18 
  76 mike    1.2  ////////////////////////////////////////////////////////////////////////////////
  77              //
  78              // MonitorRep
  79              //
  80              ////////////////////////////////////////////////////////////////////////////////
  81              
  82              struct MonitorRep
  83              {
  84                  fd_set rd_fd_set;
  85                  fd_set wr_fd_set;
  86                  fd_set ex_fd_set;
  87                  fd_set active_rd_fd_set;
  88                  fd_set active_wr_fd_set;
  89                  fd_set active_ex_fd_set;
  90              };
  91              
  92              ////////////////////////////////////////////////////////////////////////////////
  93              //
  94              // Monitor
  95              //
  96              ////////////////////////////////////////////////////////////////////////////////
  97 mike    1.2  
  98 kumpf   1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
  99 mike    1.2  Monitor::Monitor()
 100 a.arora 1.73    : _module_handle(0), 
 101                   _controller(0),
 102                   _async(false),
 103                   _stopConnections(0),
 104                   _stopConnectionsSem(0),
 105                   _solicitSocketCount(0)
 106 mike    1.2  {
 107 kumpf   1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 108 mike    1.2      Socket::initializeInterface();
 109 mday    1.25     _rep = 0;
 110 kumpf   1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 111 a.arora 1.73 
 112                  // setup the tickler
 113                  initializeTickler();
 114                  
 115                  // Start the count at 1 because initilizeTickler() 
 116                  // has added an entry in the first position of the 
 117                  // _entries array
 118                  for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 119 mday    1.37     {
 120                     _MonitorEntry entry(0, 0, 0);
 121                     _entries.append(entry);
 122                  }
 123 mike    1.2  }
 124              
 125 mday    1.18 Monitor::Monitor(Boolean async)
 126 a.arora 1.73    : _module_handle(0),
 127                   _controller(0),
 128                   _async(async),
 129                   _stopConnections(0),
 130                   _stopConnectionsSem(0),
 131                   _solicitSocketCount(0)
 132 mday    1.18 {
 133 kumpf   1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 134 mday    1.18     Socket::initializeInterface();
 135 mday    1.25     _rep = 0;
 136 kumpf   1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 137 a.arora 1.73 
 138                  // setup the tickler
 139                  initializeTickler();
 140              
 141                  // Start the count at 1 because initilizeTickler() 
 142                  // has added an entry in the first position of the
 143                  // _entries array
 144                  for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 145 mday    1.37     {
 146                     _MonitorEntry entry(0, 0, 0);
 147                     _entries.append(entry);
 148                  }
 149 mday    1.18 }
 150 mday    1.20 
 151 mike    1.2  Monitor::~Monitor()
 152              {
 153 kumpf   1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 154                                "deregistering with module controller");
 155 kumpf   1.10 
 156 kumpf   1.11     if(_module_handle != NULL)
 157 mday    1.8      {
 158                     _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
 159                     _controller = 0;
 160 kumpf   1.10        delete _module_handle;
 161 mday    1.8      }
 162 kumpf   1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
 163 kumpf   1.48 
 164 kumpf   1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 165 mike    1.2      Socket::uninitializeInterface();
 166 kumpf   1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 167                                "returning from monitor destructor");
 168 mday    1.18 }
 169              
 170 a.arora 1.73 void Monitor::initializeTickler(){
 171                  /* 
 172                     NOTE: On any errors trying to 
 173                           setup out tickle connection, 
 174                           throw an exception/end the server
 175                  */
 176              
 177                  /* setup the tickle server/listener */
 178              
 179                  // get a socket for the server side
 180                  if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
 181              	//handle error
 182              	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
 183              				 "Received error number $0 while creating the internal socket.",
 184              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 185              				 errno);
 186              #else
 187              				 WSAGetLastError());
 188              #endif
 189              	throw Exception(parms);
 190                  }
 191 a.arora 1.73 
 192                  // initialize the address
 193                  memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 194              #ifdef PEGASUS_OS_ZOS
 195                  _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 196              #else
 197              #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 198              #pragma convert(37)
 199              #endif
 200                  _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 201              #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 202              #pragma convert(0)
 203              #endif
 204              #endif
 205                  _tickle_server_addr.sin_family = PF_INET;
 206                  _tickle_server_addr.sin_port = 0;
 207              
 208                  PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);
 209              
 210                  // bind server side to socket
 211                  if((::bind(_tickle_server_socket,
 212 a.arora 1.73 	       (struct sockaddr *)&_tickle_server_addr, 
 213              	       sizeof(_tickle_server_addr))) < 0){
 214              	// handle error
 215              	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
 216              				 "Received error number $0 while binding the internal socket.",
 217              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 218              				 errno);
 219              #else
 220              				 WSAGetLastError());
 221              #endif
 222                      throw Exception(parms);
 223                  }
 224              
 225                  // tell the kernel we are a server
 226                  if((::listen(_tickle_server_socket,3)) < 0){
 227              	// handle error
 228              	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
 229              			 "Received error number $0 while listening to the internal socket.",
 230              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 231              				 errno);
 232              #else
 233 a.arora 1.73 				 WSAGetLastError());
 234              #endif
 235              	throw Exception(parms);
 236                  }
 237                  
 238                  // make sure we have the correct socket for our server
 239                  int sock = ::getsockname(_tickle_server_socket,
 240              			     (struct sockaddr*)&_tickle_server_addr,
 241              			     &_addr_size); 
 242                  if(sock < 0){
 243              	// handle error
 244              	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
 245              			 "Received error number $0 while getting the internal socket name.",
 246              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 247              				 errno);
 248              #else
 249              				 WSAGetLastError());
 250              #endif
 251              	throw Exception(parms);
 252                  }
 253              
 254 a.arora 1.73     /* set up the tickle client/connector */
 255                   
 256                  // get a socket for our tickle client
 257                  if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
 258              	// handle error
 259              	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
 260              			 "Received error number $0 while creating the internal client socket.",
 261              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 262              				 errno);
 263              #else
 264              				 WSAGetLastError());
 265              #endif
 266              	throw Exception(parms);
 267                  }
 268              
 269                  // setup the address of the client
 270                  memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 271              #ifdef PEGASUS_OS_ZOS
 272                  _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 273              #else
 274              #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 275 a.arora 1.73 #pragma convert(37)
 276              #endif
 277                  _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 278              #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 279              #pragma convert(0)
 280              #endif
 281              #endif
 282                  _tickle_client_addr.sin_family = PF_INET;
 283                  _tickle_client_addr.sin_port = 0;
 284              
 285                  // bind socket to client side
 286                  if((::bind(_tickle_client_socket,
 287              	       (struct sockaddr*)&_tickle_client_addr,
 288              	       sizeof(_tickle_client_addr))) < 0){
 289              	// handle error
 290              	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
 291              			 "Received error number $0 while binding the internal client socket.",
 292              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 293              				 errno);
 294              #else
 295              				 WSAGetLastError());
 296 a.arora 1.73 #endif
 297              	throw Exception(parms);
 298                  }
 299              
 300                  // connect to server side
 301                  if((::connect(_tickle_client_socket,
 302              		  (struct sockaddr*)&_tickle_server_addr,
 303              		  sizeof(_tickle_server_addr))) < 0){
 304              	// handle error
 305              	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
 306              			 "Received error number $0 while connecting the internal client socket.",
 307              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 308              				 errno);
 309              #else
 310              				 WSAGetLastError());
 311              #endif
 312              	throw Exception(parms);
 313                  }
 314              
 315                  /* set up the slave connection */
 316                  memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
 317 a.arora 1.73     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);
 318                  pegasus_sleep(1); 
 319              
 320                  // this call may fail, we will try a max of 20 times to establish this peer connection
 321                  if((_tickle_peer_socket = ::accept(_tickle_server_socket,
 322              				       (struct sockaddr*)&_tickle_peer_addr,
 323              				       &peer_size)) < 0){
 324              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 325                      // Only retry on non-windows platforms.
 326                      if(_tickle_peer_socket == -1 && errno == EAGAIN)
 327                      {
 328                        int retries = 0;                                                                        
 329                        do
 330                        {
 331                          pegasus_sleep(1);
 332                          _tickle_peer_socket = ::accept(_tickle_server_socket,
 333              					   (struct sockaddr*)&_tickle_peer_addr,
 334              					   &peer_size);
 335                          retries++;
 336                        } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
 337                      }
 338 a.arora 1.73 #endif
 339                  }
 340                  if(_tickle_peer_socket == -1){
 341              	// handle error
 342              	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
 343              			 "Received error number $0 while accepting the internal socket connection.",
 344              #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 345              				 errno);
 346              #else
 347              				 WSAGetLastError());
 348              #endif
 349              	throw Exception(parms);
 350                  }
 351                  // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
 352                  // checks entries with IDLE state for events
 353                  _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
 354                  entry._status = _MonitorEntry::IDLE;
 355                  _entries.append(entry);
 356              }
 357              
 358              void Monitor::tickle(void)
 359 a.arora 1.73 {
 360                static char _buffer[] =
 361                  {
 362                    '0','0'
 363                  };
 364                           
 365                Socket::disableBlocking(_tickle_client_socket);    
 366                Socket::write(_tickle_client_socket,&_buffer, 2);
 367                Socket::enableBlocking(_tickle_client_socket); 
 368              }
 369              
 370 mike    1.2  Boolean Monitor::run(Uint32 milliseconds)
 371              {
 372 mday    1.18 
 373 mday    1.25     Boolean handled_events = false;
 374 a.arora 1.73     int i = 0;
 375                   
 376 kumpf   1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
 377 a.arora 1.73 
 378 mday    1.25     fd_set fdread;
 379                  FD_ZERO(&fdread);
 380 a.arora 1.73 
 381 mday    1.37     _entry_mut.lock(pegasus_thread_self());
 382 mday    1.13     
 383 kumpf   1.48     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries  
 384                  if (_stopConnections == 1) 
 385                  {
 386                      for ( int indx = 0; indx < (int)_entries.size(); indx++)
 387                      {
 388                          if (_entries[indx]._type == Monitor::ACCEPTOR)
 389                          {
 390                              if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
 391                              {
 392                                 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
 393                                      _entries[indx]._status.value() == _MonitorEntry::DYING )
 394                                 {
 395                                     // remove the entry
 396              		       _entries[indx]._status = _MonitorEntry::EMPTY;
 397                                 }
 398                                 else
 399                                 {
 400                                     // set status to DYING
 401 kumpf   1.52                       _entries[indx]._status = _MonitorEntry::DYING;
 402 kumpf   1.48                    }
 403                             }
 404                         }
 405                      }
 406                      _stopConnections = 0;
 407 a.arora 1.73 	_stopConnectionsSem.signal();
 408 kumpf   1.48     }
 409 kumpf   1.51 
 410 kumpf   1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
 411                  {
 412                     if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&
 413                              (_entries[indx]._type == Monitor::CONNECTION))
 414                     {
 415                        MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
 416                        PEGASUS_ASSERT(q != 0);
 417                        MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
 418                        Message* message= new CloseConnectionMessage(_entries[indx].socket);
 419                        message->dest = o.getQueueId();
 420              
 421                        // HTTPAcceptor is responsible for closing the connection. 
 422                        // The lock is released to allow HTTPAcceptor to call
 423                        // unsolicitSocketMessages to free the entry. 
 424                        // Once HTTPAcceptor completes processing of the close
 425                        // connection, the lock is re-requested and processing of
 426                        // the for loop continues.  This is safe with the current
 427                        // implementation of the _entries object.  Note that the
 428                        // loop condition accesses the _entries.size() on each
 429                        // iteration, so that a change in size while the mutex is
 430                        // unlocked will not result in an ArrayIndexOutOfBounds
 431 kumpf   1.68           // exception.
 432              
 433                        _entry_mut.unlock();
 434                        o.enqueue(message);
 435                        _entry_mut.lock(pegasus_thread_self());
 436                     }
 437                  }
 438              
 439 kumpf   1.51     Uint32 _idleEntries = 0;
 440 a.arora 1.73    
 441                  /*
 442              	We will keep track of the maximum socket number and pass this value
 443              	to the kernel as a parameter to SELECT.  This loop seems like a good 
 444              	place to calculate the max file descriptor (maximum socket number)
 445              	because we have to traverse the entire array.
 446                  */ 
 447                  int maxSocketCurrentPass = 0;
 448 mday    1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
 449 mike    1.2      {
 450 a.arora 1.73        if(maxSocketCurrentPass < _entries[indx].socket)
 451              	  maxSocketCurrentPass = _entries[indx].socket;
 452              
 453 mday    1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
 454 mday    1.25        {
 455 kumpf   1.51 	  _idleEntries++;
 456 mday    1.25 	  FD_SET(_entries[indx].socket, &fdread);
 457                     }
 458 mday    1.13     }
 459 s.hills 1.62 
 460 a.arora 1.73     /*
 461              	Add 1 then assign maxSocket accordingly. We add 1 to account for
 462              	descriptors starting at 0.
 463                  */
 464                  maxSocketCurrentPass++;
 465              
 466 kumpf   1.51     _entry_mut.unlock(); 
 467 a.arora 1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 468 kumpf   1.51    _entry_mut.lock(pegasus_thread_self());
 469 mday    1.25 
 470 mike    1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
 471 kumpf   1.50     if(events == SOCKET_ERROR)
 472 mike    1.2  #else
 473 kumpf   1.50     if(events == -1)
 474 mike    1.2  #endif
 475 mday    1.13     {
 476 kumpf   1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 477                        "Monitor::run - errorno = %d has occurred on select.", errno);
 478                     // The EBADF error indicates that one or more or the file
 479                     // descriptions was not valid. This could indicate that
 480                     // the _entries structure has been corrupted or that
 481                     // we have a synchronization error.
 482              
 483                     PEGASUS_ASSERT(errno != EBADF);
 484                  }
 485                  else if (events)
 486                  {
 487 kumpf   1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 488                        "Monitor::run select event received events = %d, monitoring %d idle entries", 
 489              	   events, _idleEntries);
 490 mday    1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
 491                     {
 492 kumpf   1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
 493                        // owned by the Monitor).
 494              	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) && 
 495              	     (FD_ISSET(_entries[indx].socket, &fdread)))
 496 mday    1.25 	  {
 497              	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
 498 kumpf   1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 499                                "Monitor::run indx = %d, queueId =  %d, q = %p",
 500                                indx, _entries[indx].queueId, q);
 501                           PEGASUS_ASSERT(q !=0);
 502 mday    1.37 
 503              	     try 
 504 mday    1.25 	     {
 505 mday    1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
 506              		{
 507 kumpf   1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 508                                   "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
 509 mday    1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
 510              		   _entries[indx]._status = _MonitorEntry::BUSY;
 511 kumpf   1.66                    // If allocate_and_awaken failure, retry on next iteration
 512 a.arora 1.73 /* Removed for PEP 183.
 513 kumpf   1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
 514                                         (void *)q, _dispatch))
 515 kumpf   1.67                    {
 516                                    Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 517                                        "Monitor::run: Insufficient resources to process request.");
 518                                    _entries[indx]._status = _MonitorEntry::IDLE;
 519                                    _entry_mut.unlock();
 520                                    return true;
 521                                 }
 522 a.arora 1.73 */
 523              // Added for PEP 183
 524              		   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
 525                			 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 526                                       "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 527                                 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 528                                 try
 529                                 {
 530                                     dst->run(1);
 531                                 }
 532                 		   catch (...)
 533                 		   {
 534                    			Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 535                        		"Monitor::_dispatch: exception received");
 536                 		   }
 537                 		   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 538                                 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 539              
 540                 		   PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                                                         
 541              		   // Once the HTTPConnection thread has set the status value to either
 542              		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 543 a.arora 1.73 		   // to the Monitor.  It is no longer permissible to access the connection
 544              		   // or the entry in the _entries table.
 545              		   if (dst->_connectionClosePending)
 546              		   {  
 547              		     dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 548              		   }
 549              		   else
 550              		   {
 551              		     dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 552              		   }	
 553              // end Added for PEP 183
 554              		}
 555              	        else if( _entries[indx]._type == Monitor::INTERNAL){
 556              			// set ourself to BUSY, 
 557                                      // read the data  
 558                                      // and set ourself back to IDLE
 559              		
 560              		   	_entries[indx]._status == _MonitorEntry::BUSY;
 561              			static char buffer[2];
 562                    			Socket::disableBlocking(_entries[indx].socket);
 563                    			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
 564 a.arora 1.73       			Socket::enableBlocking(_entries[indx].socket);
 565              			_entries[indx]._status == _MonitorEntry::IDLE;
 566 mday    1.37 		}
 567              		else
 568 mday    1.25 		{
 569 kumpf   1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 570                                   "Non-connection entry, indx = %d, has been received.", indx);
 571 mday    1.37 		   int events = 0;
 572              		   events |= SocketMessage::READ;
 573              		   Message *msg = new SocketMessage(_entries[indx].socket, events);
 574              		   _entries[indx]._status = _MonitorEntry::BUSY;
 575              		   _entry_mut.unlock();
 576 mday    1.27 
 577 mday    1.37 		   q->enqueue(msg);
 578              		   _entries[indx]._status = _MonitorEntry::IDLE;
 579 mday    1.25 		   return true;
 580              		}
 581              	     }
 582 mday    1.37 	     catch(...)
 583 mday    1.25 	     {
 584              	     }
 585              	     handled_events = true;
 586              	  }
 587                     }
 588 mday    1.24     }
 589 mday    1.37     _entry_mut.unlock();
 590 mday    1.13     return(handled_events);
 591 mike    1.2  }
 592              
 593 chuck   1.74 void Monitor::stopListeningForConnections(Boolean wait)
 594 kumpf   1.48 {
 595                  PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
 596 a.arora 1.73     // set boolean then tickle the server to recognize _stopConnections 
 597 kumpf   1.48     _stopConnections = 1;
 598 a.arora 1.73     tickle();
 599 kumpf   1.48 
 600 chuck   1.74     if (wait)
 601 a.arora 1.73     {
 602 chuck   1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
 603                    // caller of this function may unbind the ports while the monitor
 604                    // is still accepting connections on them.
 605                    try
 606              	{
 607              	  _stopConnectionsSem.time_wait(10000);
 608              	}
 609                    catch (TimeOut &)
 610              	{
 611              	  // The monitor is probably busy processng a very long request, and is
 612              	  // not accepting connections.  Let the caller unbind the ports.
 613              	}
 614 a.arora 1.73     }
 615                  
 616 kumpf   1.48     PEG_METHOD_EXIT();
 617              }
 618 mday    1.25 
 619 mday    1.37 
 620 mday    1.25 int  Monitor::solicitSocketMessages(
 621 mike    1.2      Sint32 socket, 
 622                  Uint32 events,
 623 mday    1.8      Uint32 queueId, 
 624                  int type)
 625 mike    1.2  {
 626 a.arora 1.73    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");                                        
 627                 _entry_mut.lock(pegasus_thread_self());
 628                 // Check to see if we need to dynamically grow the _entries array
 629                 // We always want the _entries array to 2 bigger than the
 630                 // current connections requested
 631                 _solicitSocketCount++;  // bump the count
 632                 int size = (int)_entries.size();
 633                 if(_solicitSocketCount >= (size-1)){
 634                      for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
 635                              _MonitorEntry entry(0, 0, 0);
 636                              _entries.append(entry);
 637                      }
 638                 }
 639 kumpf   1.4  
 640 a.arora 1.73    int index;
 641                 for(index = 1; index < (int)_entries.size(); index++)
 642 mday    1.25    {
 643 a.arora 1.73       try
 644 mday    1.37       {
 645 a.arora 1.73          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
 646                       {
 647                          _entries[index].socket = socket;
 648                          _entries[index].queueId  = queueId;
 649                          _entries[index]._type = type;
 650                          _entries[index]._status = _MonitorEntry::IDLE;
 651                          _entry_mut.unlock();
 652                                                  
 653                          return index;
 654                       }
 655 mday    1.37       }
 656                    catch(...)
 657 mday    1.25       {
 658                    }
 659                 }
 660 a.arora 1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
 661 kumpf   1.50    _entry_mut.unlock();
 662 mday    1.25    PEG_METHOD_EXIT();
 663 kumpf   1.50    return -1;
 664 a.arora 1.73 
 665 mike    1.2  }
 666              
 667 mday    1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
 668 mike    1.2  {
 669 kumpf   1.50 
 670 mday    1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
 671 mday    1.37     _entry_mut.lock(pegasus_thread_self());
 672 a.arora 1.73 
 673                  /*
 674                      Start at index = 1 because _entries[0] is the tickle entry which never needs
 675                      to be EMPTY;
 676                  */
 677                  int index;
 678                  for(index = 1; index < _entries.size(); index++)
 679 mike    1.2      {
 680 mday    1.25        if(_entries[index].socket == socket)
 681                     {
 682 a.arora 1.73           _entries[index]._status = _MonitorEntry::EMPTY;
 683                        _entries[index].socket = -1;
 684                        _solicitSocketCount--;
 685                        break;
 686 mday    1.25        }
 687 mike    1.2      }
 688 a.arora 1.73 
 689                  /*
 690              	Dynamic Contraction:
 691              	To remove excess entries we will start from the end of the _entries array
 692              	and remove all entries with EMPTY status until we find the first NON EMPTY.
 693              	This prevents the positions, of the NON EMPTY entries, from being changed.
 694                  */ 
 695                  index = _entries.size() - 1;
 696                  while(_entries[index]._status == _MonitorEntry::EMPTY){
 697              	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
 698                              _entries.remove(index);
 699              	index--;
 700                  }
 701              
 702 mday    1.37     _entry_mut.unlock();
 703 kumpf   1.4      PEG_METHOD_EXIT();
 704 mike    1.2  }
 705              
 706 a.arora 1.73 // Note: this is no longer called with PEP 183.
 707 mday    1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 708              {
 709 mday    1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
 710 kumpf   1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 711 kumpf   1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 712                      dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 713 kumpf   1.51    try
 714                 {
 715                    dst->run(1);
 716                 }
 717                 catch (...)
 718                 {
 719                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 720                        "Monitor::_dispatch: exception received");
 721                 }
 722                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 723                        "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 724                 
 725 kumpf   1.53    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
 726 kumpf   1.68 
 727                 // Once the HTTPConnection thread has set the status value to either
 728                 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 729                 // to the Monitor.  It is no longer permissible to access the connection
 730                 // or the entry in the _entries table.
 731 kumpf   1.50    if (dst->_connectionClosePending)
 732                 {
 733 kumpf   1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 734                 }
 735                 else
 736                 {
 737                    dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 738 kumpf   1.50    }
 739 mday    1.8     return 0;
 740 mday    1.40 }
 741              
 742              
 743              
 744              ////************************* monitor 2 *****************************////
 745 mday    1.43 ////************************* monitor 2 *****************************////
 746              ////************************* monitor 2 *****************************////
 747              ////************************* monitor 2 *****************************////
 748              ////************************* monitor 2 *****************************////
 749              ////************************* monitor 2 *****************************////
 750              ////************************* monitor 2 *****************************////
 751 mday    1.40 
 752              
 753 mday    1.56 
 754              
 755              
 756 mday    1.42 m2e_rep::m2e_rep(void)
 757 mday    1.43   :Base(), state(IDLE)
 758              
 759 mday    1.42 {
 760              }
 761              
 762              m2e_rep::m2e_rep(monitor_2_entry_type _type, 
 763              		 pegasus_socket _sock, 
 764              		 void* _accept, 
 765              		 void* _dispatch)
 766 mday    1.43   : Base(), type(_type), state(IDLE), psock(_sock), 
 767 mday    1.42     accept_parm(_accept), dispatch_parm(_dispatch)
 768              {
 769                
 770              }
 771              
 772              m2e_rep::~m2e_rep(void)
 773              {
 774              }
 775              
 776              m2e_rep::m2e_rep(const m2e_rep& r)
 777                : Base()
 778              {
 779                if(this != &r){
 780                  type = r.type;
 781                  psock = r.psock;
 782                  accept_parm = r.accept_parm;
 783                  dispatch_parm = r.dispatch_parm;
 784 mday    1.43     state = IDLE;
 785                  
 786 mday    1.42   }
 787              }
 788              
 789              
 790              m2e_rep& m2e_rep::operator =(const m2e_rep& r)
 791              {
 792                if(this != &r) {
 793                  type = r.type;
 794                  psock = r.psock;
 795                  accept_parm = r.accept_parm;
 796                  dispatch_parm = r.dispatch_parm;
 797 mday    1.43     state = IDLE;
 798 mday    1.42   }
 799                return *this;
 800              }
 801              
 802              Boolean m2e_rep::operator ==(const m2e_rep& r)
 803              {
 804                if(this == &r)
 805                  return true;
 806                return false;
 807              }
 808              
 809              Boolean m2e_rep::operator ==(void* r)
 810              {
 811                if((void*)this == r)
 812                  return true;
 813                return false;
 814              }
 815              
 816              m2e_rep::operator pegasus_socket() const 
 817              {
 818                return psock;
 819 mday    1.42 }
 820              
 821              
 822 mday    1.40 monitor_2_entry::monitor_2_entry(void)
 823              {
 824 mday    1.42   _rep = new m2e_rep();
 825 mday    1.40 }
 826              
 827 mday    1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, 
 828              				 monitor_2_entry_type _type, 
 829              				 void* _accept_parm, void* _dispatch_parm)
 830 mday    1.40 {
 831 mday    1.42   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
 832 mday    1.40 }
 833              
 834              monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
 835              {
 836                if(this != &e){
 837 mday    1.42     Inc(this->_rep = e._rep);
 838 mday    1.40   }
 839              }
 840              
 841              monitor_2_entry::~monitor_2_entry(void)
 842              {
 843 mday    1.60    
 844 mday    1.42   Dec(_rep);
 845 mday    1.40 }
 846              
 847              monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
 848              {
 849                if(this != &e){
 850 mday    1.42     Dec(_rep);
 851                  Inc(this->_rep = e._rep);
 852 mday    1.40   }
 853                return *this;
 854              }
 855              
 856 mday    1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
 857 mday    1.40 {
 858                if(this == &me)
 859                  return true;
 860                return false;
 861              }
 862              
 863 mday    1.42 Boolean monitor_2_entry::operator ==(void* k) const
 864 mday    1.40 {
 865                if((void *)this == k)
 866                  return true;
 867                return false;
 868              }
 869              
 870              
 871 mday    1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
 872 mday    1.40 {
 873 mday    1.42   return _rep->type;
 874              }
 875              
 876              void monitor_2_entry::set_type(monitor_2_entry_type t)
 877              {
 878                _rep->type = t;
 879              }
 880              
 881              
 882 mday    1.43 monitor_2_entry_state  monitor_2_entry::get_state(void) const
 883              {
 884                return (monitor_2_entry_state) _rep->state.value();
 885              }
 886              
 887              void monitor_2_entry::set_state(monitor_2_entry_state t)
 888              {
 889                _rep->state = t;
 890              }
 891              
 892 mday    1.42 void* monitor_2_entry::get_accept(void) const
 893              {
 894                return _rep->accept_parm;
 895              }
 896              
 897              void monitor_2_entry::set_accept(void* a)
 898              {
 899                _rep->accept_parm = a;
 900              }
 901              
 902              
 903              void* monitor_2_entry::get_dispatch(void) const
 904              {
 905                return _rep->dispatch_parm;
 906              }
 907              
 908              void monitor_2_entry::set_dispatch(void* a)
 909              {
 910                _rep->dispatch_parm = a;
 911              }
 912              
 913 mday    1.42 pegasus_socket monitor_2_entry::get_sock(void) const
 914              {
 915                return _rep->psock;
 916              }
 917              
 918              
 919              void monitor_2_entry::set_sock(pegasus_socket& s)
 920              {
 921                _rep->psock = s;
 922                
 923 mday    1.40 }
 924              
 925 mday    1.59 //static monitor_2* _m2_instance;
 926 mday    1.40 
 927 mday    1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
 928              
 929 mday    1.40 monitor_2::monitor_2(void)
 930 mday    1.42   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0), 
 931 mday    1.49     _ready(true, 0), _die(0), _requestCount(0)
 932 mday    1.40 {
 933                try {
 934                  
 935                  bsd_socket_factory _factory;
 936              
 937                  // set up the listener/acceptor 
 938                  pegasus_socket temp = pegasus_socket(&_factory);
 939                  
 940                  temp.socket(PF_INET, SOCK_STREAM, 0);
 941                  // initialize the address
 942                  memset(&_tickle_addr, 0, sizeof(_tickle_addr));
 943 marek   1.47 #ifdef PEGASUS_OS_ZOS
 944                  _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 945              #else
 946 chuck   1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 947              #pragma convert(37)
 948              #endif
 949 mday    1.40     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 950 chuck   1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 951              #pragma convert(0)
 952              #endif
 953 marek   1.47 #endif
 954 mday    1.40     _tickle_addr.sin_family = PF_INET;
 955                  _tickle_addr.sin_port = 0;
 956              
 957                  PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
 958                  
 959                  temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
 960                  temp.listen(3);  
 961                  temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
 962              
 963                  // set up the connector
 964              
 965                  pegasus_socket tickler = pegasus_socket(&_factory);
 966                  tickler.socket(PF_INET, SOCK_STREAM, 0);
 967                  struct sockaddr_in _addr;
 968                  memset(&_addr, 0, sizeof(_addr));
 969 kumpf   1.48 #ifdef PEGASUS_OS_ZOS
 970 marek   1.47     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 971              #else
 972 mday    1.40     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 973 marek   1.47 #endif
 974 mday    1.40     _addr.sin_family = PF_INET;
 975                  _addr.sin_port = 0;
 976                  tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
 977                  tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
 978              
 979 mday    1.42     _tickler.set_sock(tickler);
 980                  _tickler.set_type(INTERNAL);
 981 mday    1.43     _tickler.set_state(BUSY);
 982                  
 983 mday    1.40     struct sockaddr_in peer;
 984                  memset(&peer, 0, sizeof(peer));
 985                  PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
 986              
 987                  pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
 988 mday    1.57     
 989 mday    1.42     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
 990 a.arora 1.71 
 991              // No need to set _tickle's state as BUSY, since monitor_2::run() now
 992              // does a select only on sockets which are in IDLE (default) state.
 993              //  _tickle->set_state(BUSY);
 994 mday    1.43     
 995 mday    1.40     _listeners.insert_first(_tickle);
 996              
 997                }
 998                catch(...){  }
 999              }
1000              
1001              monitor_2::~monitor_2(void)
1002              {
1003 mday    1.60 
1004                 stop();
1005              
1006 mday    1.41   try {
1007                  monitor_2_entry* temp = _listeners.remove_first();
1008                  while(temp){
1009                    delete temp;
1010                    temp = _listeners.remove_first();
1011                  }
1012                }
1013 mday    1.60 
1014 mday    1.41   catch(...){  }
1015 mday    1.60   
1016              
1017                try 
1018                {
1019                   HTTPConnection2* temp = _connections.remove_first();
1020                   while(temp)
1021                   {
1022              	delete temp;
1023              	temp = _connections.remove_first();
1024                   }
1025                }
1026                catch(...)
1027                {
1028                }
1029                
1030              
1031 mday    1.40 }
1032              
1033              
1034              void monitor_2::run(void)
1035              {
1036                monitor_2_entry* temp;
1037 a.arora 1.71   int _nonIdle=0, _idleCount=0, events;
1038              
1039 mday    1.40   while(_die.value() == 0) {
1040 a.arora 1.71     _nonIdle=_idleCount=0;
1041 mday    1.49      
1042 mday    1.57      struct timeval tv_idle = { 60, 0 };
1043 mday    1.56      
1044 mday    1.40     // place all sockets in the select set 
1045                  FD_ZERO(&rd_fd_set);
1046                  try {
1047                    _listeners.lock(pegasus_thread_self());
1048                    temp = _listeners.next(0);
1049 a.arora 1.71       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1050                     "monitor_2::run:Creating New FD list for SELECT.");
1051 mday    1.40       while(temp != 0 ){
1052 mday    1.57 	if(temp->get_state() == CLOSED ) {
1053 mday    1.43 	  monitor_2_entry* closed = temp;
1054 a.arora 1.72       temp = _listeners.next(closed);
1055 mday    1.43 	  _listeners.remove_no_lock(closed);
1056 a.arora 1.71       
1057                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1058                     "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());
1059 mday    1.60 	  
1060 mday    1.49 	  HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
1061              	  delete cn;
1062 mday    1.43 	  delete closed;
1063              	}
1064 mday    1.45 	if(temp == 0)
1065              	   break;
1066 a.arora 1.71 
1067              
1068                      //Count the number if IDLE sockets
1069                      if(temp->get_state() != IDLE ) _nonIdle++;
1070                       else _idleCount++;
1071              
1072 mday    1.46 	Sint32 fd = (Sint32) temp->get_sock();
1073 a.arora 1.71 
1074                      //Select should be called ONLY on the FDs which are in IDLE state
1075              	if((fd >= 0) && (temp->get_state() == IDLE))
1076                      {
1077                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1078                         "monitor_2::run:Adding FD %d to the list for SELECT.",fd);
1079              	  FD_SET(fd , &rd_fd_set);
1080                      }
1081              	   temp = _listeners.next(temp);
1082 mday    1.40       }
1083                    _listeners.unlock();
1084                  } 
1085                  catch(...){
1086                    return;
1087                  }
1088 a.arora 1.71 
1089 mday    1.42     // important -  the dispatch routine has pointers to all the 
1090                  // entries that are readable. These entries can be changed but 
1091                  // the pointer must not be tampered with. 
1092 mday    1.56     if(_connections.count() )
1093 a.arora 1.71        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
1094 mday    1.56     else
1095 a.arora 1.71        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);
1096 mday    1.57     
1097                  if(_die.value())
1098                  {
1099                     break;
1100                  }
1101 a.arora 1.71 
1102              #ifdef PEGASUS_OS_TYPE_WINDOWS
1103                  if(events == SOCKET_ERROR)
1104              #else
1105                  if(events == -1)
1106              #endif
1107                  {
1108                     Tracer::trace(TRC_HTTP, Tracer::LEVEL2,
1109                        "monitor_2:run:INVALID FD. errorno = %d on select.", errno);
1110                     // The EBADF error indicates that one or more or the file
1111                     // descriptions was not valid. This could indicate that
1112                     // the _entries structure has been corrupted or that
1113                     // we have a synchronization error.
1114              
1115                   // Keeping the line below commented for time being.
1116                   //  PEGASUS_ASSERT(errno != EBADF);
1117                  }
1118                  else if (events)
1119                  {
1120                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1121                        "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);
1122 a.arora 1.71    
1123 mday    1.57     
1124 mday    1.40     try {
1125                    _listeners.lock(pegasus_thread_self());
1126                    temp = _listeners.next(0);
1127                    while(temp != 0 ){
1128 a.arora 1.72 	  Sint32 fd = (Sint32) temp->get_sock();
1129              	  if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
1130              	  if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);
1131 mday    1.42 	  FD_CLR(fd,  &rd_fd_set);
1132              	  monitor_2_entry* ready = new monitor_2_entry(*temp);
1133 mday    1.49 	  try 
1134              	  {
1135              	     _ready.insert_first(ready);
1136              	  }
1137              	  catch(...)
1138              	  {
1139              	  }
1140              	  
1141 mday    1.42 	  _requestCount++;
1142 mday    1.40 	}
1143              	temp = _listeners.next(temp);
1144                    }
1145                    _listeners.unlock();
1146                  } 
1147                  catch(...){
1148                    return;
1149                  }
1150                  // now handle the sockets that are ready to read 
1151 mday    1.56     if(_ready.count())
1152                     _dispatch();
1153                  else
1154                  {
1155                     if(_connections.count() == 0 )
1156              	  _idle_dispatch(_idle_parm);
1157                  }
1158 a.arora 1.71    }  // if events
1159 mday    1.40   } // while alive 
1160 a.arora 1.72   _die=0;
1161 mday    1.57 
1162 mday    1.40 }
1163              
1164 dj.gorey 1.70 int  monitor_2::solicitSocketMessages(
1165                   Sint32 socket,
1166                   Uint32 events,
1167                   Uint32 queueId,
1168                   int type)
1169               {
1170               
1171 a.arora  1.71    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");
1172 dj.gorey 1.70 
1173                  _entry_mut.lock(pegasus_thread_self());
1174               
1175                  for(int index = 0; index < (int)_entries.size(); index++)
1176                  {
1177                     try
1178                     {
1179               	 if(_entries[index]._status.value() == monitor_2_entry::EMPTY)
1180               	 {
1181               	    _entries[index].socket = socket;
1182               	    //_entries[index].queueId  = queueId;
1183               	    //_entries[index]._type = type;
1184 a.arora  1.71 	    _entries[index]._status = IDLE;
1185 dj.gorey 1.70 	    _entry_mut.unlock();
1186               
1187               	    return index;
1188               	 }
1189                     }
1190                     catch(...)
1191                     {
1192                     }
1193               
1194                  }
1195                  _entry_mut.unlock();
1196                  PEG_METHOD_EXIT();
1197                  return -1;
1198               }
1199               
1200               
1201               void monitor_2::unsolicitSocketMessages(Sint32 socket)
1202               {
1203               
1204                   PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");
1205                   _entry2_mut.lock(pegasus_thread_self());
1206 dj.gorey 1.70 
1207                   for(int index = 0; index < (int)_entries2.size(); index++)
1208                   {
1209                      if(_entries2[index].socket == socket)
1210                      {
1211               	  _entries2[index]._status = monitor_2_entry::EMPTY; 
1212               	  _entries2[index].socket = -1;
1213               	  break;
1214                      }
1215                   }
1216                   _entry2_mut.unlock();
1217                   PEG_METHOD_EXIT();
1218               }
1219               
1220 mday     1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
1221 mday     1.40 {
1222 mday     1.42   void* old = (void *)_session_dispatch;
1223 mday     1.40   _session_dispatch = dp;
1224                 return old;
1225               }
1226               
1227 mday     1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
1228               {
1229                 void* old = (void*)_accept_dispatch;
1230                 _accept_dispatch = dp;
1231                 return old;
1232 mday     1.56 }
1233               
1234               void* monitor_2::set_idle_dispatch(void (*dp)(void*))
1235               {
1236                  void* old = (void*)_idle_dispatch;
1237                  _idle_dispatch = dp;
1238                  return old;
1239               }
1240               
1241               void* monitor_2::set_idle_parm(void* parm)
1242               {
1243                  void* old = _idle_parm;
1244                  _idle_parm = parm;
1245                  return old;
1246 mday     1.42 }
1247               
1248 mday     1.40 
1249 mday     1.60 
1250               //-----------------------------------------------------------------
1251               // Note on deleting the monitor_2_entry nodes: 
1252               //  Each case: in the switch statement needs to handle the deletion 
1253               //  of the monitor_2_entry * node differently. A SESSION dispatch 
1254               //  routine MUST DELETE the entry during its dispatch handling. 
1255               //  All other dispatch routines MUST NOT delete the entry during the 
1256               //  dispatch handling, but must allow monitor_2::_dispatch to delete
1257               //   the entry. 
1258               //
1259               //  The reason is pretty obscure and it is debatable whether or not
1260               //  to even bother, but during cimserver shutdown the single monitor_2_entry* 
1261               //  will leak unless the _session_dispatch routine takes care of deleting it. 
1262               //
1263               //  The reason is that a shutdown messages completely stops everything and 
1264               //  the _session_dispatch routine never returns. So monitor_2::_dispatch is 
1265               //  never able to do its own cleanup. 
1266               //
1267               // << Mon Oct 13 09:33:33 2003 mdd >>
1268               //-----------------------------------------------------------------
1269               
1270 mday     1.40 void monitor_2::_dispatch(void)
1271               {
1272 mday     1.49    monitor_2_entry* entry;
1273                  
1274                  try 
1275                  {
1276               
1277               	 entry = _ready.remove_first();
1278                  }
1279                  catch(...)
1280                  {
1281                  }
1282                  
1283                 while(entry != 0 ) {
1284 mday     1.42     switch(entry->get_type()) {
1285 mday     1.40     case INTERNAL:
1286                     static char buffer[2];
1287 mday     1.49       entry->get_sock().disableBlocking();
1288 mday     1.42       entry->get_sock().read(&buffer, 2);
1289 mday     1.49       entry->get_sock().enableBlocking();
1290 a.arora  1.71       entry->set_state(IDLE);   // Set state of the socket to IDLE so that 
1291                                               // monitor_2::run can add to the list of FDs
1292                                               // on which select would be called.
1293               
1294                    
1295                
1296 mday     1.60       delete entry;
1297                     
1298 mday     1.40       break;
1299                   case LISTEN:
1300                     {
1301               	static struct sockaddr peer;
1302               	static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1303 mday     1.49 	entry->get_sock().disableBlocking();
1304 mday     1.42 	pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
1305 a.arora  1.71         entry->set_state(IDLE);  // Set state of the LISTEN socket to IDLE
1306 mday     1.65 #ifdef PEGASUS_OS_TYPE_WINDOWS
1307                   if((Sint32)connected  == SOCKET_ERROR)
1308               #else
1309               	if((Sint32)connected == -1 )
1310               #endif
1311               	{
1312               	   delete entry;
1313               	   break;
1314               	}
1315               	
1316 mday     1.49 	entry->get_sock().enableBlocking();
1317 mday     1.42 	monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
1318               	if(temp && _accept_dispatch != 0)
1319 mday     1.49 	   _accept_dispatch(temp);
1320 mday     1.60 	delete entry;
1321               	
1322 mday     1.40       }
1323                     break;
1324                   case SESSION:
1325 a.arora  1.72     case CLIENTSESSION:
1326 mday     1.60        if(_session_dispatch != 0 )
1327                      {
1328               	  // NOTE: _session_dispatch will delete entry - do not do it here
1329 a.arora  1.72 	     unsigned client=0;
1330                        if(entry->get_type() == CLIENTSESSION) client = 1;
1331                        Sint32 sock=(Sint32)(entry->get_sock());
1332                    
1333               	     _session_dispatch(entry);
1334               
1335                        if(client)
1336                        {
1337                          HTTPConnection2 *cn = monitor_2::remove_connection(sock);
1338               	       if(cn) delete cn;
1339                          // stop();
1340                          _die=1;
1341                        }
1342 mday     1.60        }
1343                      
1344 mday     1.40       else {
1345               	static char buffer[4096];
1346 mday     1.42 	int bytes = entry->get_sock().read(&buffer, 4096);
1347 mday     1.60 	delete entry;
1348 mday     1.40       }
1349                   
1350                     break;
1351                   case UNTYPED:
1352                   default:
1353 mday     1.60            delete entry;
1354 mday     1.40       break;
1355                   }
1356 mday     1.42     _requestCount--;
1357 mday     1.49     
1358                   if(_ready.count() == 0 )
1359                      break;
1360                   
1361                   try 
1362                   {
1363                      entry = _ready.remove_first();
1364                   }
1365                   catch(...)
1366                   {
1367                   }
1368                   
1369 mday     1.40   }
1370               }
1371               
1372               void monitor_2::stop(void)
1373               {
1374                 _die = 1;
1375                 tickle();
1376                 // shut down the listener list, free the list nodes
1377 mday     1.42   _tickler.get_sock().close();
1378 mday     1.40   _listeners.shutdown_queue();
1379               }
1380               
1381               void monitor_2::tickle(void)
1382               {
1383                 static char _buffer[] = 
1384                   {
1385                     '0','0'
1386                   };
1387                 
1388 mday     1.57   _tickler.get_sock().disableBlocking();
1389                 
1390 mday     1.42   _tickler.get_sock().write(&_buffer, 2);
1391 mday     1.57   _tickler.get_sock().enableBlocking();
1392                 
1393 mday     1.40 }
1394               
1395               
1396 mday     1.42 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps, 
1397               				       monitor_2_entry_type type,
1398               				       void* accept_parm, 
1399               				       void* dispatch_parm)
1400 mday     1.40 {
1401 a.arora  1.71   Sint32 fd1,fd2;
1402               
1403                 fd2=(Sint32) ps;
1404               
1405 mday     1.42   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
1406 a.arora  1.71 
1407               // The purpose of the following piece of code is to avoid duplicate entries in
1408               // the _listeners list. Would it be too much of an overhead ?
1409               try {
1410               
1411                    monitor_2_entry* temp;
1412               
1413                     _listeners.lock(pegasus_thread_self());
1414                     temp = _listeners.next(0);
1415                     while(temp != 0 )
1416                     {
1417                       fd1=(Sint32) temp->get_sock();
1418               
1419                       if(fd1 == fd2)
1420                       {
1421               
1422                          Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1423                         "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);
1424                           if(temp->get_state() == CLOSED)
1425                           {
1426                             temp->set_state(IDLE);
1427 a.arora  1.71               Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1428                             "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);
1429                            }
1430                            _listeners.unlock();
1431                           delete m2e;
1432                           return 0;
1433                       }
1434                      temp = _listeners.next(temp);
1435                     }
1436                  } 
1437                  catch(...) 
1438                  {
1439                     delete m2e;
1440                     return 0;
1441                  }
1442               
1443               
1444                 _listeners.unlock();
1445               
1446               
1447 mday     1.40   try{
1448                   _listeners.insert_first(m2e);
1449                 }
1450                 catch(...){
1451                   delete m2e;
1452 mday     1.42     return 0;
1453 mday     1.40   }
1454 a.arora  1.71       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1455                     "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);
1456 mday     1.40   tickle();
1457 mday     1.42   return m2e;
1458 mday     1.40 }
1459               
1460               Boolean monitor_2::remove_entry(Sint32 s)
1461               {
1462                 monitor_2_entry* temp;
1463                 try {
1464                   _listeners.try_lock(pegasus_thread_self());
1465                   temp = _listeners.next(0);
1466                   while(temp != 0){
1467 mday     1.42       if(s == (Sint32)temp->_rep->psock ){
1468 mday     1.40 	temp = _listeners.remove_no_lock(temp);
1469               	delete temp;
1470               	_listeners.unlock();
1471               	return true;
1472                     }
1473                     temp = _listeners.next(temp);
1474                   }
1475                   _listeners.unlock();
1476                 }
1477                 catch(...){
1478                 }
1479                 return false;
1480 mday     1.7  }
1481 mday     1.37 
1482 mday     1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
1483               {
1484                 return _requestCount.value();
1485                 
1486 mday     1.49 }
1487               
1488               
1489               HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
1490               {
1491               
1492                  HTTPConnection2* temp;
1493                  try 
1494                  {
1495                     monitor_2::_connections.lock(pegasus_thread_self());
1496                     temp = monitor_2::_connections.next(0);
1497                     while(temp != 0 )
1498                     {
1499               	 if(sock == temp->getSocket())
1500               	 {
1501               	    temp = monitor_2::_connections.remove_no_lock(temp);
1502               	    monitor_2::_connections.unlock();
1503               	    return temp;
1504               	 }
1505               	 temp = monitor_2::_connections.next(temp);
1506                     }
1507 mday     1.49       monitor_2::_connections.unlock();
1508                  }
1509                  catch(...)
1510                  {
1511                  }
1512                  return 0;
1513               }
1514               
1515               Boolean monitor_2::insert_connection(HTTPConnection2* connection)
1516               {
1517                  try 
1518                  {
1519                     monitor_2::_connections.insert_first(connection);
1520                  }
1521                  catch(...)
1522                  {
1523                     return false;
1524                  }
1525                  return true;
1526 mday     1.42 }
1527 mday     1.7  
1528 mike     1.2  
1529               PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2