(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.64 //%2003////////////////////////////////////////////////////////////////////////
   2 mike  1.2  //
   3 karl  1.64 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
   4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
   6            // IBM Corp.; EMC Corporation, The Open Group.
   7 mike  1.2  //
   8            // Permission is hereby granted, free of charge, to any person obtaining a copy
   9 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  10            // deal in the Software without restriction, including without limitation the
  11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  12 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
  13            // furnished to do so, subject to the following conditions:
  14            // 
  15 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  16 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  18 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  21 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23            //
  24            //==============================================================================
  25            //
  26            // Author: Mike Brasher (mbrasher@bmc.com)
  27            //
  28 mday  1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com 
  29 a.arora 1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
  30 alagaraja 1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
  31 mike      1.2  //
  32                //%/////////////////////////////////////////////////////////////////////////////
  33                
  34                #include <Pegasus/Common/Config.h>
  35 mday      1.40 
  36 mike      1.2  #include <cstring>
  37                #include "Monitor.h"
  38                #include "MessageQueue.h"
  39                #include "Socket.h"
  40 kumpf     1.4  #include <Pegasus/Common/Tracer.h>
  41 mday      1.7  #include <Pegasus/Common/HTTPConnection.h>
  42 kumpf     1.69 #include <Pegasus/Common/MessageQueueService.h>
  43 a.arora   1.73 #include <Pegasus/Common/Exception.h>
  44 mike      1.2  
  45                #ifdef PEGASUS_OS_TYPE_WINDOWS
  46                # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
  47                #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
  48                of <winsock.h>. It may have been indirectly included (e.g., by including \
  49 mday      1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
  50 mike      1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
  51                otherwise, less than 64 clients (the default) will be able to connect to the \
  52                CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
  53 mday      1.5  
  54 mike      1.2  # endif
  55                # define FD_SETSIZE 1024
  56 mday      1.5  # include <windows.h>
  57 mike      1.2  #else
  58                # include <sys/types.h>
  59                # include <sys/socket.h>
  60                # include <sys/time.h>
  61                # include <netinet/in.h>
  62                # include <netdb.h>
  63                # include <arpa/inet.h>
  64                #endif
  65                
  66                PEGASUS_USING_STD;
  67                
  68                PEGASUS_NAMESPACE_BEGIN
  69                
  70 mday      1.18 
  71 mday      1.25 static AtomicInt _connections = 0;
  72                
  73                static struct timeval create_time = {0, 1};
  74 mday      1.38 static struct timeval destroy_time = {300, 0};
  75 mday      1.26 static struct timeval deadlock_time = {0, 0};
  76 mday      1.18 
  77 mike      1.2  ////////////////////////////////////////////////////////////////////////////////
  78                //
  79                // MonitorRep
  80                //
  81                ////////////////////////////////////////////////////////////////////////////////
  82                
  83                struct MonitorRep
  84                {
  85                    fd_set rd_fd_set;
  86                    fd_set wr_fd_set;
  87                    fd_set ex_fd_set;
  88                    fd_set active_rd_fd_set;
  89                    fd_set active_wr_fd_set;
  90                    fd_set active_ex_fd_set;
  91                };
  92                
  93                ////////////////////////////////////////////////////////////////////////////////
  94                //
  95                // Monitor
  96                //
  97                ////////////////////////////////////////////////////////////////////////////////
  98 mike      1.2  
  99 kumpf     1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 100 mike      1.2  Monitor::Monitor()
 101 a.arora   1.73    : _module_handle(0), 
 102                     _controller(0),
 103                     _async(false),
 104                     _stopConnections(0),
 105                     _stopConnectionsSem(0),
 106 a.dunfey  1.76      _solicitSocketCount(0),
 107                     _tickle_client_socket(0),
 108                     _tickle_server_socket(0),
 109                     _tickle_peer_socket(0)
 110 mike      1.2  {
 111 kumpf     1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 112 mike      1.2      Socket::initializeInterface();
 113 mday      1.25     _rep = 0;
 114 kumpf     1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 115 a.arora   1.73 
 116                    // setup the tickler
 117                    initializeTickler();
 118                    
 119                    // Start the count at 1 because initilizeTickler() 
 120                    // has added an entry in the first position of the 
 121                    // _entries array
 122                    for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 123 mday      1.37     {
 124                       _MonitorEntry entry(0, 0, 0);
 125                       _entries.append(entry);
 126                    }
 127 mike      1.2  }
 128                
 129 mday      1.18 Monitor::Monitor(Boolean async)
 130 a.arora   1.73    : _module_handle(0),
 131                     _controller(0),
 132                     _async(async),
 133                     _stopConnections(0),
 134                     _stopConnectionsSem(0),
 135 a.dunfey  1.76      _solicitSocketCount(0),
 136                     _tickle_client_socket(0),
 137                     _tickle_server_socket(0),
 138                     _tickle_peer_socket(0)
 139 mday      1.18 {
 140 kumpf     1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 141 mday      1.18     Socket::initializeInterface();
 142 mday      1.25     _rep = 0;
 143 kumpf     1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 144 a.arora   1.73 
 145                    // setup the tickler
 146                    initializeTickler();
 147                
 148                    // Start the count at 1 because initilizeTickler() 
 149                    // has added an entry in the first position of the
 150                    // _entries array
 151                    for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 152 mday      1.37     {
 153                       _MonitorEntry entry(0, 0, 0);
 154                       _entries.append(entry);
 155                    }
 156 mday      1.18 }
 157 mday      1.20 
 158 mike      1.2  Monitor::~Monitor()
 159                {
 160 kumpf     1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 161                                  "deregistering with module controller");
 162 kumpf     1.10 
 163 kumpf     1.11     if(_module_handle != NULL)
 164 mday      1.8      {
 165                       _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
 166                       _controller = 0;
 167 kumpf     1.10        delete _module_handle;
 168 mday      1.8      }
 169 kumpf     1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
 170 kumpf     1.48 
 171 kumpf     1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 172 a.dunfey  1.76 
 173                    try{
 174                        if(_tickle_peer_socket > 0)
 175                        {
 176                            Socket::close(_tickle_peer_socket);
 177                        }
 178                        if(_tickle_client_socket > 0)
 179                        {
 180                            Socket::close(_tickle_client_socket);
 181                        }
 182                        if(_tickle_server_socket > 0)
 183                        {
 184                            Socket::close(_tickle_server_socket);
 185                        }
 186                    }
 187                    catch(...)
 188                    {
 189                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 190                                  "Failed to close tickle sockets");
 191                    }
 192                
 193 mike      1.2      Socket::uninitializeInterface();
 194 kumpf     1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 195                                  "returning from monitor destructor");
 196 mday      1.18 }
 197                
 198 a.arora   1.73 void Monitor::initializeTickler(){
 199                    /* 
 200                       NOTE: On any errors trying to 
 201                             setup out tickle connection, 
 202                             throw an exception/end the server
 203                    */
 204                
 205                    /* setup the tickle server/listener */
 206                
 207                    // get a socket for the server side
 208                    if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
 209                	//handle error
 210                	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
 211                				 "Received error number $0 while creating the internal socket.",
 212                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 213                				 errno);
 214                #else
 215                				 WSAGetLastError());
 216                #endif
 217                	throw Exception(parms);
 218                    }
 219 a.arora   1.73 
 220                    // initialize the address
 221                    memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 222                #ifdef PEGASUS_OS_ZOS
 223                    _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 224                #else
 225                #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 226                #pragma convert(37)
 227                #endif
 228                    _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 229                #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 230                #pragma convert(0)
 231                #endif
 232                #endif
 233                    _tickle_server_addr.sin_family = PF_INET;
 234                    _tickle_server_addr.sin_port = 0;
 235                
 236                    PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);
 237                
 238                    // bind server side to socket
 239                    if((::bind(_tickle_server_socket,
 240 a.arora   1.73 	       (struct sockaddr *)&_tickle_server_addr, 
 241                	       sizeof(_tickle_server_addr))) < 0){
 242                	// handle error
 243                	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
 244                				 "Received error number $0 while binding the internal socket.",
 245                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 246                				 errno);
 247                #else
 248                				 WSAGetLastError());
 249                #endif
 250                        throw Exception(parms);
 251                    }
 252                
 253                    // tell the kernel we are a server
 254                    if((::listen(_tickle_server_socket,3)) < 0){
 255                	// handle error
 256                	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
 257                			 "Received error number $0 while listening to the internal socket.",
 258                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 259                				 errno);
 260                #else
 261 a.arora   1.73 				 WSAGetLastError());
 262                #endif
 263                	throw Exception(parms);
 264                    }
 265                    
 266                    // make sure we have the correct socket for our server
 267                    int sock = ::getsockname(_tickle_server_socket,
 268                			     (struct sockaddr*)&_tickle_server_addr,
 269                			     &_addr_size); 
 270                    if(sock < 0){
 271                	// handle error
 272                	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
 273                			 "Received error number $0 while getting the internal socket name.",
 274                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 275                				 errno);
 276                #else
 277                				 WSAGetLastError());
 278                #endif
 279                	throw Exception(parms);
 280                    }
 281                
 282 a.arora   1.73     /* set up the tickle client/connector */
 283                     
 284                    // get a socket for our tickle client
 285                    if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
 286                	// handle error
 287                	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
 288                			 "Received error number $0 while creating the internal client socket.",
 289                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 290                				 errno);
 291                #else
 292                				 WSAGetLastError());
 293                #endif
 294                	throw Exception(parms);
 295                    }
 296                
 297                    // setup the address of the client
 298                    memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 299                #ifdef PEGASUS_OS_ZOS
 300                    _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 301                #else
 302                #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 303 a.arora   1.73 #pragma convert(37)
 304                #endif
 305                    _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 306                #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 307                #pragma convert(0)
 308                #endif
 309                #endif
 310                    _tickle_client_addr.sin_family = PF_INET;
 311                    _tickle_client_addr.sin_port = 0;
 312                
 313                    // bind socket to client side
 314                    if((::bind(_tickle_client_socket,
 315                	       (struct sockaddr*)&_tickle_client_addr,
 316                	       sizeof(_tickle_client_addr))) < 0){
 317                	// handle error
 318                	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
 319                			 "Received error number $0 while binding the internal client socket.",
 320                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 321                				 errno);
 322                #else
 323                				 WSAGetLastError());
 324 a.arora   1.73 #endif
 325                	throw Exception(parms);
 326                    }
 327                
 328                    // connect to server side
 329                    if((::connect(_tickle_client_socket,
 330                		  (struct sockaddr*)&_tickle_server_addr,
 331                		  sizeof(_tickle_server_addr))) < 0){
 332                	// handle error
 333                	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
 334                			 "Received error number $0 while connecting the internal client socket.",
 335                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 336                				 errno);
 337                #else
 338                				 WSAGetLastError());
 339                #endif
 340                	throw Exception(parms);
 341                    }
 342                
 343                    /* set up the slave connection */
 344                    memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
 345 a.arora   1.73     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);
 346                    pegasus_sleep(1); 
 347                
 348                    // this call may fail, we will try a max of 20 times to establish this peer connection
 349                    if((_tickle_peer_socket = ::accept(_tickle_server_socket,
 350                				       (struct sockaddr*)&_tickle_peer_addr,
 351                				       &peer_size)) < 0){
 352                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 353                        // Only retry on non-windows platforms.
 354                        if(_tickle_peer_socket == -1 && errno == EAGAIN)
 355                        {
 356                          int retries = 0;                                                                        
 357                          do
 358                          {
 359                            pegasus_sleep(1);
 360                            _tickle_peer_socket = ::accept(_tickle_server_socket,
 361                					   (struct sockaddr*)&_tickle_peer_addr,
 362                					   &peer_size);
 363                            retries++;
 364                          } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
 365                        }
 366 a.arora   1.73 #endif
 367                    }
 368                    if(_tickle_peer_socket == -1){
 369                	// handle error
 370                	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
 371                			 "Received error number $0 while accepting the internal socket connection.",
 372                #if !defined(PEGASUS_OS_TYPE_WINDOWS)
 373                				 errno);
 374                #else
 375                				 WSAGetLastError());
 376                #endif
 377                	throw Exception(parms);
 378                    }
 379                    // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
 380                    // checks entries with IDLE state for events
 381                    _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
 382                    entry._status = _MonitorEntry::IDLE;
 383                    _entries.append(entry);
 384                }
 385                
 386                void Monitor::tickle(void)
 387 a.arora   1.73 {
 388                  static char _buffer[] =
 389                    {
 390                      '0','0'
 391                    };
 392                             
 393                  Socket::disableBlocking(_tickle_client_socket);    
 394                  Socket::write(_tickle_client_socket,&_buffer, 2);
 395                  Socket::enableBlocking(_tickle_client_socket); 
 396                }
 397                
 398 mike      1.2  Boolean Monitor::run(Uint32 milliseconds)
 399                {
 400 mday      1.18 
 401 mday      1.25     Boolean handled_events = false;
 402 a.arora   1.73     int i = 0;
 403                     
 404 kumpf     1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
 405 a.arora   1.73 
 406 mday      1.25     fd_set fdread;
 407                    FD_ZERO(&fdread);
 408 a.arora   1.73 
 409 mday      1.37     _entry_mut.lock(pegasus_thread_self());
 410 mday      1.13     
 411 kumpf     1.48     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries  
 412                    if (_stopConnections == 1) 
 413                    {
 414                        for ( int indx = 0; indx < (int)_entries.size(); indx++)
 415                        {
 416                            if (_entries[indx]._type == Monitor::ACCEPTOR)
 417                            {
 418                                if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
 419                                {
 420                                   if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
 421                                        _entries[indx]._status.value() == _MonitorEntry::DYING )
 422                                   {
 423                                       // remove the entry
 424                		       _entries[indx]._status = _MonitorEntry::EMPTY;
 425                                   }
 426                                   else
 427                                   {
 428                                       // set status to DYING
 429 kumpf     1.52                       _entries[indx]._status = _MonitorEntry::DYING;
 430 kumpf     1.48                    }
 431                               }
 432                           }
 433                        }
 434                        _stopConnections = 0;
 435 a.arora   1.73 	_stopConnectionsSem.signal();
 436 kumpf     1.48     }
 437 kumpf     1.51 
 438 kumpf     1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
 439                    {
 440                       if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&
 441                                (_entries[indx]._type == Monitor::CONNECTION))
 442                       {
 443                          MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
 444                          PEGASUS_ASSERT(q != 0);
 445                          MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
 446                          Message* message= new CloseConnectionMessage(_entries[indx].socket);
 447                          message->dest = o.getQueueId();
 448                
 449                          // HTTPAcceptor is responsible for closing the connection. 
 450                          // The lock is released to allow HTTPAcceptor to call
 451                          // unsolicitSocketMessages to free the entry. 
 452                          // Once HTTPAcceptor completes processing of the close
 453                          // connection, the lock is re-requested and processing of
 454                          // the for loop continues.  This is safe with the current
 455                          // implementation of the _entries object.  Note that the
 456                          // loop condition accesses the _entries.size() on each
 457                          // iteration, so that a change in size while the mutex is
 458                          // unlocked will not result in an ArrayIndexOutOfBounds
 459 kumpf     1.68           // exception.
 460                
 461                          _entry_mut.unlock();
 462                          o.enqueue(message);
 463                          _entry_mut.lock(pegasus_thread_self());
 464                       }
 465                    }
 466                
 467 kumpf     1.51     Uint32 _idleEntries = 0;
 468 a.arora   1.73    
 469                    /*
 470                	We will keep track of the maximum socket number and pass this value
 471                	to the kernel as a parameter to SELECT.  This loop seems like a good 
 472                	place to calculate the max file descriptor (maximum socket number)
 473                	because we have to traverse the entire array.
 474                    */ 
 475                    int maxSocketCurrentPass = 0;
 476 mday      1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
 477 mike      1.2      {
 478 a.arora   1.73        if(maxSocketCurrentPass < _entries[indx].socket)
 479                	  maxSocketCurrentPass = _entries[indx].socket;
 480                
 481 mday      1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
 482 mday      1.25        {
 483 kumpf     1.51 	  _idleEntries++;
 484 mday      1.25 	  FD_SET(_entries[indx].socket, &fdread);
 485                       }
 486 mday      1.13     }
 487 s.hills   1.62 
 488 a.arora   1.73     /*
 489                	Add 1 then assign maxSocket accordingly. We add 1 to account for
 490                	descriptors starting at 0.
 491                    */
 492                    maxSocketCurrentPass++;
 493                
 494 kumpf     1.51     _entry_mut.unlock(); 
 495 a.arora   1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 496 kumpf     1.51    _entry_mut.lock(pegasus_thread_self());
 497 mday      1.25 
 498 mike      1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
 499 kumpf     1.50     if(events == SOCKET_ERROR)
 500 mike      1.2  #else
 501 kumpf     1.50     if(events == -1)
 502 mike      1.2  #endif
 503 mday      1.13     {
 504 kumpf     1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 505                          "Monitor::run - errorno = %d has occurred on select.", errno);
 506                       // The EBADF error indicates that one or more or the file
 507                       // descriptions was not valid. This could indicate that
 508                       // the _entries structure has been corrupted or that
 509                       // we have a synchronization error.
 510                
 511                       PEGASUS_ASSERT(errno != EBADF);
 512                    }
 513                    else if (events)
 514                    {
 515 kumpf     1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 516                          "Monitor::run select event received events = %d, monitoring %d idle entries", 
 517                	   events, _idleEntries);
 518 mday      1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
 519                       {
 520 kumpf     1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
 521                          // owned by the Monitor).
 522                	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) && 
 523                	     (FD_ISSET(_entries[indx].socket, &fdread)))
 524 mday      1.25 	  {
 525                	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
 526 kumpf     1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 527                                  "Monitor::run indx = %d, queueId =  %d, q = %p",
 528                                  indx, _entries[indx].queueId, q);
 529                             PEGASUS_ASSERT(q !=0);
 530 mday      1.37 
 531                	     try 
 532 mday      1.25 	     {
 533 mday      1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
 534                		{
 535 kumpf     1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 536                                     "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
 537 mday      1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
 538                		   _entries[indx]._status = _MonitorEntry::BUSY;
 539 kumpf     1.66                    // If allocate_and_awaken failure, retry on next iteration
 540 a.arora   1.73 /* Removed for PEP 183.
 541 kumpf     1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
 542                                           (void *)q, _dispatch))
 543 kumpf     1.67                    {
 544                                      Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 545                                          "Monitor::run: Insufficient resources to process request.");
 546                                      _entries[indx]._status = _MonitorEntry::IDLE;
 547                                      _entry_mut.unlock();
 548                                      return true;
 549                                   }
 550 a.arora   1.73 */
 551                // Added for PEP 183
 552                		   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
 553                  			 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 554                                         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 555                                   dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 556                                   try
 557                                   {
 558                                       dst->run(1);
 559                                   }
 560                   		   catch (...)
 561                   		   {
 562                      			Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 563                          		"Monitor::_dispatch: exception received");
 564                   		   }
 565                   		   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 566                                   "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 567                
 568                   		   PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                                                         
 569                		   // Once the HTTPConnection thread has set the status value to either
 570                		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 571 a.arora   1.73 		   // to the Monitor.  It is no longer permissible to access the connection
 572                		   // or the entry in the _entries table.
 573                		   if (dst->_connectionClosePending)
 574                		   {  
 575                		     dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 576                		   }
 577                		   else
 578                		   {
 579                		     dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 580                		   }	
 581                // end Added for PEP 183
 582                		}
 583                	        else if( _entries[indx]._type == Monitor::INTERNAL){
 584                			// set ourself to BUSY, 
 585                                        // read the data  
 586                                        // and set ourself back to IDLE
 587                		
 588                		   	_entries[indx]._status == _MonitorEntry::BUSY;
 589                			static char buffer[2];
 590                      			Socket::disableBlocking(_entries[indx].socket);
 591                      			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
 592 a.arora   1.73       			Socket::enableBlocking(_entries[indx].socket);
 593                			_entries[indx]._status == _MonitorEntry::IDLE;
 594 mday      1.37 		}
 595                		else
 596 mday      1.25 		{
 597 kumpf     1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 598                                     "Non-connection entry, indx = %d, has been received.", indx);
 599 mday      1.37 		   int events = 0;
 600                		   events |= SocketMessage::READ;
 601                		   Message *msg = new SocketMessage(_entries[indx].socket, events);
 602                		   _entries[indx]._status = _MonitorEntry::BUSY;
 603                		   _entry_mut.unlock();
 604 mday      1.27 
 605 mday      1.37 		   q->enqueue(msg);
 606                		   _entries[indx]._status = _MonitorEntry::IDLE;
 607 mday      1.25 		   return true;
 608                		}
 609                	     }
 610 mday      1.37 	     catch(...)
 611 mday      1.25 	     {
 612                	     }
 613                	     handled_events = true;
 614                	  }
 615                       }
 616 mday      1.24     }
 617 mday      1.37     _entry_mut.unlock();
 618 mday      1.13     return(handled_events);
 619 mike      1.2  }
 620                
 621 chuck     1.74 void Monitor::stopListeningForConnections(Boolean wait)
 622 kumpf     1.48 {
 623                    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
 624 a.arora   1.73     // set boolean then tickle the server to recognize _stopConnections 
 625 kumpf     1.48     _stopConnections = 1;
 626 a.arora   1.73     tickle();
 627 kumpf     1.48 
 628 chuck     1.74     if (wait)
 629 a.arora   1.73     {
 630 chuck     1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
 631                      // caller of this function may unbind the ports while the monitor
 632                      // is still accepting connections on them.
 633                      try
 634                	{
 635                	  _stopConnectionsSem.time_wait(10000);
 636                	}
 637                      catch (TimeOut &)
 638                	{
 639                	  // The monitor is probably busy processng a very long request, and is
 640                	  // not accepting connections.  Let the caller unbind the ports.
 641                	}
 642 a.arora   1.73     }
 643                    
 644 kumpf     1.48     PEG_METHOD_EXIT();
 645                }
 646 mday      1.25 
 647 mday      1.37 
 648 mday      1.25 int  Monitor::solicitSocketMessages(
 649 mike      1.2      Sint32 socket, 
 650                    Uint32 events,
 651 mday      1.8      Uint32 queueId, 
 652                    int type)
 653 mike      1.2  {
 654 a.arora   1.73    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");                                        
 655 alagaraja 1.75    AutoMutex autoMut(_entry_mut);
 656 a.arora   1.73    // Check to see if we need to dynamically grow the _entries array
 657                   // We always want the _entries array to 2 bigger than the
 658                   // current connections requested
 659                   _solicitSocketCount++;  // bump the count
 660                   int size = (int)_entries.size();
 661                   if(_solicitSocketCount >= (size-1)){
 662                        for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
 663                                _MonitorEntry entry(0, 0, 0);
 664                                _entries.append(entry);
 665                        }
 666                   }
 667 kumpf     1.4  
 668 a.arora   1.73    int index;
 669                   for(index = 1; index < (int)_entries.size(); index++)
 670 mday      1.25    {
 671 a.arora   1.73       try
 672 mday      1.37       {
 673 a.arora   1.73          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
 674                         {
 675                            _entries[index].socket = socket;
 676                            _entries[index].queueId  = queueId;
 677                            _entries[index]._type = type;
 678                            _entries[index]._status = _MonitorEntry::IDLE;
 679 alagaraja 1.75                                        
 680 a.arora   1.73             return index;
 681                         }
 682 mday      1.37       }
 683                      catch(...)
 684 mday      1.25       {
 685                      }
 686                   }
 687 a.arora   1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
 688 mday      1.25    PEG_METHOD_EXIT();
 689 kumpf     1.50    return -1;
 690 a.arora   1.73 
 691 mike      1.2  }
 692                
 693 mday      1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
 694 mike      1.2  {
 695 kumpf     1.50 
 696 mday      1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
 697 alagaraja 1.75     AutoMutex autoMut(_entry_mut);
 698 a.arora   1.73 
 699                    /*
 700                        Start at index = 1 because _entries[0] is the tickle entry which never needs
 701                        to be EMPTY;
 702                    */
 703                    int index;
 704                    for(index = 1; index < _entries.size(); index++)
 705 mike      1.2      {
 706 mday      1.25        if(_entries[index].socket == socket)
 707                       {
 708 a.arora   1.73           _entries[index]._status = _MonitorEntry::EMPTY;
 709                          _entries[index].socket = -1;
 710                          _solicitSocketCount--;
 711                          break;
 712 mday      1.25        }
 713 mike      1.2      }
 714 a.arora   1.73 
 715                    /*
 716                	Dynamic Contraction:
 717                	To remove excess entries we will start from the end of the _entries array
 718                	and remove all entries with EMPTY status until we find the first NON EMPTY.
 719                	This prevents the positions, of the NON EMPTY entries, from being changed.
 720                    */ 
 721                    index = _entries.size() - 1;
 722                    while(_entries[index]._status == _MonitorEntry::EMPTY){
 723                	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
 724                                _entries.remove(index);
 725                	index--;
 726                    }
 727                
 728 kumpf     1.4      PEG_METHOD_EXIT();
 729 mike      1.2  }
 730                
 731 a.arora   1.73 // Note: this is no longer called with PEP 183.
 732 mday      1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
 733                {
 734 mday      1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
 735 kumpf     1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 736 kumpf     1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
 737                        dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
 738 kumpf     1.51    try
 739                   {
 740                      dst->run(1);
 741                   }
 742                   catch (...)
 743                   {
 744                      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 745                          "Monitor::_dispatch: exception received");
 746                   }
 747                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
 748                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
 749                   
 750 kumpf     1.53    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
 751 kumpf     1.68 
 752                   // Once the HTTPConnection thread has set the status value to either
 753                   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
 754                   // to the Monitor.  It is no longer permissible to access the connection
 755                   // or the entry in the _entries table.
 756 kumpf     1.50    if (dst->_connectionClosePending)
 757                   {
 758 kumpf     1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
 759                   }
 760                   else
 761                   {
 762                      dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
 763 kumpf     1.50    }
 764 mday      1.8     return 0;
 765 mday      1.40 }
 766                
 767                
 768                
 769                ////************************* monitor 2 *****************************////
 770 mday      1.43 ////************************* monitor 2 *****************************////
 771                ////************************* monitor 2 *****************************////
 772                ////************************* monitor 2 *****************************////
 773                ////************************* monitor 2 *****************************////
 774                ////************************* monitor 2 *****************************////
 775                ////************************* monitor 2 *****************************////
 776 mday      1.40 
 777                
 778 mday      1.56 
 779                
 780                
 781 mday      1.42 m2e_rep::m2e_rep(void)
 782 mday      1.43   :Base(), state(IDLE)
 783                
 784 mday      1.42 {
 785                }
 786                
 787                m2e_rep::m2e_rep(monitor_2_entry_type _type, 
 788                		 pegasus_socket _sock, 
 789                		 void* _accept, 
 790                		 void* _dispatch)
 791 mday      1.43   : Base(), type(_type), state(IDLE), psock(_sock), 
 792 mday      1.42     accept_parm(_accept), dispatch_parm(_dispatch)
 793                {
 794                  
 795                }
 796                
 797                m2e_rep::~m2e_rep(void)
 798                {
 799                }
 800                
 801                m2e_rep::m2e_rep(const m2e_rep& r)
 802                  : Base()
 803                {
 804                  if(this != &r){
 805                    type = r.type;
 806                    psock = r.psock;
 807                    accept_parm = r.accept_parm;
 808                    dispatch_parm = r.dispatch_parm;
 809 mday      1.43     state = IDLE;
 810                    
 811 mday      1.42   }
 812                }
 813                
 814                
 815                m2e_rep& m2e_rep::operator =(const m2e_rep& r)
 816                {
 817                  if(this != &r) {
 818                    type = r.type;
 819                    psock = r.psock;
 820                    accept_parm = r.accept_parm;
 821                    dispatch_parm = r.dispatch_parm;
 822 mday      1.43     state = IDLE;
 823 mday      1.42   }
 824                  return *this;
 825                }
 826                
 827                Boolean m2e_rep::operator ==(const m2e_rep& r)
 828                {
 829                  if(this == &r)
 830                    return true;
 831                  return false;
 832                }
 833                
 834                Boolean m2e_rep::operator ==(void* r)
 835                {
 836                  if((void*)this == r)
 837                    return true;
 838                  return false;
 839                }
 840                
 841                m2e_rep::operator pegasus_socket() const 
 842                {
 843                  return psock;
 844 mday      1.42 }
 845                
 846                
 847 mday      1.40 monitor_2_entry::monitor_2_entry(void)
 848                {
 849 mday      1.42   _rep = new m2e_rep();
 850 mday      1.40 }
 851                
 852 mday      1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, 
 853                				 monitor_2_entry_type _type, 
 854                				 void* _accept_parm, void* _dispatch_parm)
 855 mday      1.40 {
 856 mday      1.42   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
 857 mday      1.40 }
 858                
 859                monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
 860                {
 861                  if(this != &e){
 862 mday      1.42     Inc(this->_rep = e._rep);
 863 mday      1.40   }
 864                }
 865                
 866                monitor_2_entry::~monitor_2_entry(void)
 867                {
 868 a.dunfey  1.76 
 869 mday      1.42   Dec(_rep);
 870 mday      1.40 }
 871                
 872                monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
 873                {
 874                  if(this != &e){
 875 mday      1.42     Dec(_rep);
 876                    Inc(this->_rep = e._rep);
 877 mday      1.40   }
 878                  return *this;
 879                }
 880                
 881 mday      1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
 882 mday      1.40 {
 883                  if(this == &me)
 884                    return true;
 885                  return false;
 886                }
 887                
 888 mday      1.42 Boolean monitor_2_entry::operator ==(void* k) const
 889 mday      1.40 {
 890                  if((void *)this == k)
 891                    return true;
 892                  return false;
 893                }
 894                
 895                
 896 mday      1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
 897 mday      1.40 {
 898 mday      1.42   return _rep->type;
 899                }
 900                
 901                void monitor_2_entry::set_type(monitor_2_entry_type t)
 902                {
 903                  _rep->type = t;
 904                }
 905                
 906                
 907 mday      1.43 monitor_2_entry_state  monitor_2_entry::get_state(void) const
 908                {
 909                  return (monitor_2_entry_state) _rep->state.value();
 910                }
 911                
 912                void monitor_2_entry::set_state(monitor_2_entry_state t)
 913                {
 914                  _rep->state = t;
 915                }
 916                
 917 mday      1.42 void* monitor_2_entry::get_accept(void) const
 918                {
 919                  return _rep->accept_parm;
 920                }
 921                
 922                void monitor_2_entry::set_accept(void* a)
 923                {
 924                  _rep->accept_parm = a;
 925                }
 926                
 927                
 928                void* monitor_2_entry::get_dispatch(void) const
 929                {
 930                  return _rep->dispatch_parm;
 931                }
 932                
 933                void monitor_2_entry::set_dispatch(void* a)
 934                {
 935                  _rep->dispatch_parm = a;
 936                }
 937                
 938 mday      1.42 pegasus_socket monitor_2_entry::get_sock(void) const
 939                {
 940                  return _rep->psock;
 941                }
 942                
 943                
 944                void monitor_2_entry::set_sock(pegasus_socket& s)
 945                {
 946                  _rep->psock = s;
 947                  
 948 mday      1.40 }
 949                
 950 mday      1.59 //static monitor_2* _m2_instance;
 951 mday      1.40 
 952 mday      1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
 953                
 954 mday      1.40 monitor_2::monitor_2(void)
 955 mday      1.42   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0), 
 956 mday      1.49     _ready(true, 0), _die(0), _requestCount(0)
 957 mday      1.40 {
 958                  try {
 959                    
 960                    bsd_socket_factory _factory;
 961                
 962                    // set up the listener/acceptor 
 963                    pegasus_socket temp = pegasus_socket(&_factory);
 964                    
 965                    temp.socket(PF_INET, SOCK_STREAM, 0);
 966                    // initialize the address
 967                    memset(&_tickle_addr, 0, sizeof(_tickle_addr));
 968 marek     1.47 #ifdef PEGASUS_OS_ZOS
 969                    _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 970                #else
 971 chuck     1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 972                #pragma convert(37)
 973                #endif
 974 mday      1.40     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 975 chuck     1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 976                #pragma convert(0)
 977                #endif
 978 marek     1.47 #endif
 979 mday      1.40     _tickle_addr.sin_family = PF_INET;
 980                    _tickle_addr.sin_port = 0;
 981                
 982                    PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
 983                    
 984                    temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
 985                    temp.listen(3);  
 986                    temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
 987                
 988                    // set up the connector
 989                
 990                    pegasus_socket tickler = pegasus_socket(&_factory);
 991                    tickler.socket(PF_INET, SOCK_STREAM, 0);
 992                    struct sockaddr_in _addr;
 993                    memset(&_addr, 0, sizeof(_addr));
 994 kumpf     1.48 #ifdef PEGASUS_OS_ZOS
 995 marek     1.47     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
 996                #else
 997 mday      1.40     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
 998 marek     1.47 #endif
 999 mday      1.40     _addr.sin_family = PF_INET;
1000                    _addr.sin_port = 0;
1001                    tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
1002                    tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
1003                
1004 mday      1.42     _tickler.set_sock(tickler);
1005                    _tickler.set_type(INTERNAL);
1006 mday      1.43     _tickler.set_state(BUSY);
1007                    
1008 mday      1.40     struct sockaddr_in peer;
1009                    memset(&peer, 0, sizeof(peer));
1010                    PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1011                
1012                    pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
1013 mday      1.57     
1014 mday      1.42     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
1015 a.arora   1.71 
1016                // No need to set _tickle's state as BUSY, since monitor_2::run() now
1017                // does a select only on sockets which are in IDLE (default) state.
1018                //  _tickle->set_state(BUSY);
1019 mday      1.43     
1020 mday      1.40     _listeners.insert_first(_tickle);
1021                
1022                  }
1023                  catch(...){  }
1024                }
1025                
1026                monitor_2::~monitor_2(void)
1027                {
1028 mday      1.60 
1029                   stop();
1030                
1031 mday      1.41   try {
1032                    monitor_2_entry* temp = _listeners.remove_first();
1033                    while(temp){
1034                      delete temp;
1035                      temp = _listeners.remove_first();
1036                    }
1037                  }
1038 mday      1.60 
1039 mday      1.41   catch(...){  }
1040 mday      1.60   
1041                
1042                  try 
1043                  {
1044                     HTTPConnection2* temp = _connections.remove_first();
1045                     while(temp)
1046                     {
1047                	delete temp;
1048                	temp = _connections.remove_first();
1049                     }
1050                  }
1051                  catch(...)
1052                  {
1053                  }
1054                  
1055                
1056 mday      1.40 }
1057                
1058                
1059                void monitor_2::run(void)
1060                {
1061                  monitor_2_entry* temp;
1062 a.arora   1.71   int _nonIdle=0, _idleCount=0, events;
1063                
1064 mday      1.40   while(_die.value() == 0) {
1065 a.arora   1.71     _nonIdle=_idleCount=0;
1066 mday      1.49      
1067 mday      1.57      struct timeval tv_idle = { 60, 0 };
1068 mday      1.56      
1069 mday      1.40     // place all sockets in the select set 
1070                    FD_ZERO(&rd_fd_set);
1071                    try {
1072                      _listeners.lock(pegasus_thread_self());
1073                      temp = _listeners.next(0);
1074 a.arora   1.71       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1075                       "monitor_2::run:Creating New FD list for SELECT.");
1076 mday      1.40       while(temp != 0 ){
1077 mday      1.57 	if(temp->get_state() == CLOSED ) {
1078 mday      1.43 	  monitor_2_entry* closed = temp;
1079 a.arora   1.72       temp = _listeners.next(closed);
1080 mday      1.43 	  _listeners.remove_no_lock(closed);
1081 a.arora   1.71       
1082                      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1083                       "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());
1084 mday      1.60 	  
1085 mday      1.49 	  HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
1086                	  delete cn;
1087 mday      1.43 	  delete closed;
1088                	}
1089 mday      1.45 	if(temp == 0)
1090                	   break;
1091 a.arora   1.71 
1092                
1093                        //Count the number if IDLE sockets
1094                        if(temp->get_state() != IDLE ) _nonIdle++;
1095                         else _idleCount++;
1096                
1097 mday      1.46 	Sint32 fd = (Sint32) temp->get_sock();
1098 a.arora   1.71 
1099                        //Select should be called ONLY on the FDs which are in IDLE state
1100                	if((fd >= 0) && (temp->get_state() == IDLE))
1101                        {
1102                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1103                           "monitor_2::run:Adding FD %d to the list for SELECT.",fd);
1104                	  FD_SET(fd , &rd_fd_set);
1105                        }
1106                	   temp = _listeners.next(temp);
1107 mday      1.40       }
1108                      _listeners.unlock();
1109                    } 
1110                    catch(...){
1111                      return;
1112                    }
1113 a.arora   1.71 
1114 mday      1.42     // important -  the dispatch routine has pointers to all the 
1115                    // entries that are readable. These entries can be changed but 
1116                    // the pointer must not be tampered with. 
1117 mday      1.56     if(_connections.count() )
1118 a.arora   1.71        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
1119 mday      1.56     else
1120 a.arora   1.71        events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);
1121 mday      1.57     
1122                    if(_die.value())
1123                    {
1124                       break;
1125                    }
1126 a.arora   1.71 
1127                #ifdef PEGASUS_OS_TYPE_WINDOWS
1128                    if(events == SOCKET_ERROR)
1129                #else
1130                    if(events == -1)
1131                #endif
1132                    {
1133                       Tracer::trace(TRC_HTTP, Tracer::LEVEL2,
1134                          "monitor_2:run:INVALID FD. errorno = %d on select.", errno);
1135                       // The EBADF error indicates that one or more or the file
1136                       // descriptions was not valid. This could indicate that
1137                       // the _entries structure has been corrupted or that
1138                       // we have a synchronization error.
1139                
1140                     // Keeping the line below commented for time being.
1141                     //  PEGASUS_ASSERT(errno != EBADF);
1142                    }
1143                    else if (events)
1144                    {
1145                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1146                          "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);
1147 a.arora   1.71    
1148 mday      1.57     
1149 mday      1.40     try {
1150                      _listeners.lock(pegasus_thread_self());
1151                      temp = _listeners.next(0);
1152                      while(temp != 0 ){
1153 a.arora   1.72 	  Sint32 fd = (Sint32) temp->get_sock();
1154                	  if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
1155                	  if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);
1156 mday      1.42 	  FD_CLR(fd,  &rd_fd_set);
1157                	  monitor_2_entry* ready = new monitor_2_entry(*temp);
1158 mday      1.49 	  try 
1159                	  {
1160                	     _ready.insert_first(ready);
1161                	  }
1162                	  catch(...)
1163                	  {
1164                	  }
1165                	  
1166 mday      1.42 	  _requestCount++;
1167 mday      1.40 	}
1168                	temp = _listeners.next(temp);
1169                      }
1170                      _listeners.unlock();
1171                    } 
1172                    catch(...){
1173                      return;
1174                    }
1175                    // now handle the sockets that are ready to read 
1176 mday      1.56     if(_ready.count())
1177                       _dispatch();
1178                    else
1179                    {
1180                       if(_connections.count() == 0 )
1181                	  _idle_dispatch(_idle_parm);
1182                    }
1183 a.arora   1.71    }  // if events
1184 mday      1.40   } // while alive 
1185 a.arora   1.72   _die=0;
1186 mday      1.57 
1187 mday      1.40 }
1188                
1189 dj.gorey  1.70 int  monitor_2::solicitSocketMessages(
1190                    Sint32 socket,
1191                    Uint32 events,
1192                    Uint32 queueId,
1193                    int type)
1194                {
1195                
1196 a.arora   1.71    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");
1197 dj.gorey  1.70 
1198 alagaraja 1.75    AutoMutex autoMut(_entry_mut);
1199 dj.gorey  1.70 
1200                   for(int index = 0; index < (int)_entries.size(); index++)
1201                   {
1202                      try
1203                      {
1204                	 if(_entries[index]._status.value() == monitor_2_entry::EMPTY)
1205                	 {
1206                	    _entries[index].socket = socket;
1207                	    //_entries[index].queueId  = queueId;
1208                	    //_entries[index]._type = type;
1209 a.arora   1.71 	    _entries[index]._status = IDLE;
1210 dj.gorey  1.70 
1211                	    return index;
1212                	 }
1213                      }
1214                      catch(...)
1215                      {
1216                      }
1217                
1218                   }
1219                   PEG_METHOD_EXIT();
1220                   return -1;
1221                }
1222                
1223                
1224                void monitor_2::unsolicitSocketMessages(Sint32 socket)
1225                {
1226                
1227                    PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");
1228 alagaraja 1.75     AutoMutex autoMut(_entry2_mut);
1229 dj.gorey  1.70 
1230                    for(int index = 0; index < (int)_entries2.size(); index++)
1231                    {
1232                       if(_entries2[index].socket == socket)
1233                       {
1234                	  _entries2[index]._status = monitor_2_entry::EMPTY; 
1235                	  _entries2[index].socket = -1;
1236                	  break;
1237                       }
1238                    }
1239                    PEG_METHOD_EXIT();
1240                }
1241                
1242 mday      1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
1243 mday      1.40 {
1244 mday      1.42   void* old = (void *)_session_dispatch;
1245 mday      1.40   _session_dispatch = dp;
1246                  return old;
1247                }
1248                
1249 mday      1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
1250                {
1251                  void* old = (void*)_accept_dispatch;
1252                  _accept_dispatch = dp;
1253                  return old;
1254 mday      1.56 }
1255                
1256                void* monitor_2::set_idle_dispatch(void (*dp)(void*))
1257                {
1258                   void* old = (void*)_idle_dispatch;
1259                   _idle_dispatch = dp;
1260                   return old;
1261                }
1262                
1263                void* monitor_2::set_idle_parm(void* parm)
1264                {
1265                   void* old = _idle_parm;
1266                   _idle_parm = parm;
1267                   return old;
1268 mday      1.42 }
1269                
1270 mday      1.40 
1271 mday      1.60 
1272                //-----------------------------------------------------------------
1273                // Note on deleting the monitor_2_entry nodes: 
1274                //  Each case: in the switch statement needs to handle the deletion 
1275                //  of the monitor_2_entry * node differently. A SESSION dispatch 
1276                //  routine MUST DELETE the entry during its dispatch handling. 
1277                //  All other dispatch routines MUST NOT delete the entry during the 
1278                //  dispatch handling, but must allow monitor_2::_dispatch to delete
1279                //   the entry. 
1280                //
1281                //  The reason is pretty obscure and it is debatable whether or not
1282                //  to even bother, but during cimserver shutdown the single monitor_2_entry* 
1283                //  will leak unless the _session_dispatch routine takes care of deleting it. 
1284                //
1285                //  The reason is that a shutdown messages completely stops everything and 
1286                //  the _session_dispatch routine never returns. So monitor_2::_dispatch is 
1287                //  never able to do its own cleanup. 
1288                //
1289                // << Mon Oct 13 09:33:33 2003 mdd >>
1290                //-----------------------------------------------------------------
1291                
1292 mday      1.40 void monitor_2::_dispatch(void)
1293                {
1294 mday      1.49    monitor_2_entry* entry;
1295                   
1296                   try 
1297                   {
1298                
1299                	 entry = _ready.remove_first();
1300                   }
1301                   catch(...)
1302                   {
1303                   }
1304                   
1305                  while(entry != 0 ) {
1306 mday      1.42     switch(entry->get_type()) {
1307 mday      1.40     case INTERNAL:
1308                      static char buffer[2];
1309 mday      1.49       entry->get_sock().disableBlocking();
1310 mday      1.42       entry->get_sock().read(&buffer, 2);
1311 mday      1.49       entry->get_sock().enableBlocking();
1312 a.arora   1.71       entry->set_state(IDLE);   // Set state of the socket to IDLE so that 
1313                                                // monitor_2::run can add to the list of FDs
1314                                                // on which select would be called.
1315                
1316                     
1317                 
1318 mday      1.60       delete entry;
1319                      
1320 mday      1.40       break;
1321                    case LISTEN:
1322                      {
1323                	static struct sockaddr peer;
1324                	static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1325 mday      1.49 	entry->get_sock().disableBlocking();
1326 mday      1.42 	pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
1327 a.arora   1.71         entry->set_state(IDLE);  // Set state of the LISTEN socket to IDLE
1328 mday      1.65 #ifdef PEGASUS_OS_TYPE_WINDOWS
1329                    if((Sint32)connected  == SOCKET_ERROR)
1330                #else
1331                	if((Sint32)connected == -1 )
1332                #endif
1333                	{
1334                	   delete entry;
1335                	   break;
1336                	}
1337                	
1338 mday      1.49 	entry->get_sock().enableBlocking();
1339 mday      1.42 	monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
1340                	if(temp && _accept_dispatch != 0)
1341 mday      1.49 	   _accept_dispatch(temp);
1342 mday      1.60 	delete entry;
1343                	
1344 mday      1.40       }
1345                      break;
1346                    case SESSION:
1347 a.arora   1.72     case CLIENTSESSION:
1348 mday      1.60        if(_session_dispatch != 0 )
1349                       {
1350                	  // NOTE: _session_dispatch will delete entry - do not do it here
1351 a.arora   1.72 	     unsigned client=0;
1352                         if(entry->get_type() == CLIENTSESSION) client = 1;
1353                         Sint32 sock=(Sint32)(entry->get_sock());
1354                     
1355                	     _session_dispatch(entry);
1356                
1357                         if(client)
1358                         {
1359                           HTTPConnection2 *cn = monitor_2::remove_connection(sock);
1360                	       if(cn) delete cn;
1361                           // stop();
1362                           _die=1;
1363                         }
1364 mday      1.60        }
1365                       
1366 mday      1.40       else {
1367                	static char buffer[4096];
1368 mday      1.42 	int bytes = entry->get_sock().read(&buffer, 4096);
1369 mday      1.60 	delete entry;
1370 mday      1.40       }
1371                    
1372                      break;
1373                    case UNTYPED:
1374                    default:
1375 mday      1.60            delete entry;
1376 mday      1.40       break;
1377                    }
1378 mday      1.42     _requestCount--;
1379 mday      1.49     
1380                    if(_ready.count() == 0 )
1381                       break;
1382                    
1383                    try 
1384                    {
1385                       entry = _ready.remove_first();
1386                    }
1387                    catch(...)
1388                    {
1389                    }
1390                    
1391 mday      1.40   }
1392                }
1393                
1394                void monitor_2::stop(void)
1395                {
1396                  _die = 1;
1397                  tickle();
1398                  // shut down the listener list, free the list nodes
1399 mday      1.42   _tickler.get_sock().close();
1400 mday      1.40   _listeners.shutdown_queue();
1401                }
1402                
1403                void monitor_2::tickle(void)
1404                {
1405                  static char _buffer[] = 
1406                    {
1407                      '0','0'
1408                    };
1409                  
1410 mday      1.57   _tickler.get_sock().disableBlocking();
1411                  
1412 mday      1.42   _tickler.get_sock().write(&_buffer, 2);
1413 mday      1.57   _tickler.get_sock().enableBlocking();
1414                  
1415 mday      1.40 }
1416                
1417                
1418 mday      1.42 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps, 
1419                				       monitor_2_entry_type type,
1420                				       void* accept_parm, 
1421                				       void* dispatch_parm)
1422 mday      1.40 {
1423 a.arora   1.71   Sint32 fd1,fd2;
1424                
1425                  fd2=(Sint32) ps;
1426                
1427 mday      1.42   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
1428 a.arora   1.71 
1429                // The purpose of the following piece of code is to avoid duplicate entries in
1430                // the _listeners list. Would it be too much of an overhead ?
1431                try {
1432                
1433                     monitor_2_entry* temp;
1434                
1435                      _listeners.lock(pegasus_thread_self());
1436                      temp = _listeners.next(0);
1437                      while(temp != 0 )
1438                      {
1439                        fd1=(Sint32) temp->get_sock();
1440                
1441                        if(fd1 == fd2)
1442                        {
1443                
1444                           Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1445                          "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);
1446                            if(temp->get_state() == CLOSED)
1447                            {
1448                              temp->set_state(IDLE);
1449 a.arora   1.71               Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1450                              "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);
1451                             }
1452                             _listeners.unlock();
1453                            delete m2e;
1454                            return 0;
1455                        }
1456                       temp = _listeners.next(temp);
1457                      }
1458                   } 
1459                   catch(...) 
1460                   {
1461                      delete m2e;
1462                      return 0;
1463                   }
1464                
1465                
1466                  _listeners.unlock();
1467                
1468                
1469 mday      1.40   try{
1470                    _listeners.insert_first(m2e);
1471                  }
1472                  catch(...){
1473                    delete m2e;
1474 mday      1.42     return 0;
1475 mday      1.40   }
1476 a.arora   1.71       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1477                      "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);
1478 mday      1.40   tickle();
1479 mday      1.42   return m2e;
1480 mday      1.40 }
1481                
1482                Boolean monitor_2::remove_entry(Sint32 s)
1483                {
1484                  monitor_2_entry* temp;
1485                  try {
1486                    _listeners.try_lock(pegasus_thread_self());
1487                    temp = _listeners.next(0);
1488                    while(temp != 0){
1489 mday      1.42       if(s == (Sint32)temp->_rep->psock ){
1490 mday      1.40 	temp = _listeners.remove_no_lock(temp);
1491                	delete temp;
1492                	_listeners.unlock();
1493                	return true;
1494                      }
1495                      temp = _listeners.next(temp);
1496                    }
1497                    _listeners.unlock();
1498                  }
1499                  catch(...){
1500                  }
1501                  return false;
1502 mday      1.7  }
1503 mday      1.37 
1504 mday      1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
1505                {
1506                  return _requestCount.value();
1507                  
1508 mday      1.49 }
1509                
1510                
1511                HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
1512                {
1513                
1514                   HTTPConnection2* temp;
1515                   try 
1516                   {
1517                      monitor_2::_connections.lock(pegasus_thread_self());
1518                      temp = monitor_2::_connections.next(0);
1519                      while(temp != 0 )
1520                      {
1521                	 if(sock == temp->getSocket())
1522                	 {
1523                	    temp = monitor_2::_connections.remove_no_lock(temp);
1524                	    monitor_2::_connections.unlock();
1525                	    return temp;
1526                	 }
1527                	 temp = monitor_2::_connections.next(temp);
1528                      }
1529 mday      1.49       monitor_2::_connections.unlock();
1530                   }
1531                   catch(...)
1532                   {
1533                   }
1534                   return 0;
1535                }
1536                
1537                Boolean monitor_2::insert_connection(HTTPConnection2* connection)
1538                {
1539                   try 
1540                   {
1541                      monitor_2::_connections.insert_first(connection);
1542                   }
1543                   catch(...)
1544                   {
1545                      return false;
1546                   }
1547                   return true;
1548 mday      1.42 }
1549 mday      1.7  
1550 mike      1.2  
1551                PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2