(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.90 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 david.dillard 1.95 //
 19 kumpf         1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike          1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21                    // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 kumpf         1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23                    // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24                    // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike          1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26                    // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //==============================================================================
 29                    //
 30                    // Author: Mike Brasher (mbrasher@bmc.com)
 31                    //
 32 r.kieninger   1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 33 a.arora       1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
 34 alagaraja     1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 35 sushma.fernandes 1.78 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 36 joyce.j          1.84 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 37 kumpf            1.85 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 38 mike             1.2  //
 39                       //%/////////////////////////////////////////////////////////////////////////////
 40                       
 41                       #include <Pegasus/Common/Config.h>
 42 mday             1.40 
 43 mike             1.2  #include <cstring>
 44                       #include "Monitor.h"
 45                       #include "MessageQueue.h"
 46                       #include "Socket.h"
 47 kumpf            1.4  #include <Pegasus/Common/Tracer.h>
 48 mday             1.7  #include <Pegasus/Common/HTTPConnection.h>
 49 kumpf            1.69 #include <Pegasus/Common/MessageQueueService.h>
 50 a.arora          1.73 #include <Pegasus/Common/Exception.h>
 51 mike             1.100 #include "ArrayIterator.h"
 52 mike             1.2   
 53                        #ifdef PEGASUS_OS_TYPE_WINDOWS
 54                        # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 55                        #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 56                        of <winsock.h>. It may have been indirectly included (e.g., by including \
 57 david.dillard    1.95  <windows.h>). Find inclusion of that header which is visible to this \
 58 mike             1.2   compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 59                        otherwise, less than 64 clients (the default) will be able to connect to the \
 60                        CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 61 mday             1.5   
 62 mike             1.2   # endif
 63                        # define FD_SETSIZE 1024
 64 mday             1.5   # include <windows.h>
 65 mike             1.2   #else
 66                        # include <sys/types.h>
 67                        # include <sys/socket.h>
 68                        # include <sys/time.h>
 69                        # include <netinet/in.h>
 70                        # include <netdb.h>
 71                        # include <arpa/inet.h>
 72                        #endif
 73                        
 74                        PEGASUS_USING_STD;
 75                        
 76                        PEGASUS_NAMESPACE_BEGIN
 77                        
 78 mike             1.96  static AtomicInt _connections(0);
 79 mday             1.25  
 80 mike             1.2   ////////////////////////////////////////////////////////////////////////////////
 81                        //
 82                        // Monitor
 83                        //
 84                        ////////////////////////////////////////////////////////////////////////////////
 85                        
 86 kumpf            1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 87 mike             1.2   Monitor::Monitor()
 88 kumpf            1.87     : _stopConnections(0),
 89 a.arora          1.73       _stopConnectionsSem(0),
 90 a.dunfey         1.76       _solicitSocketCount(0),
 91 a.dunfey         1.77       _tickle_client_socket(-1),
 92                             _tickle_server_socket(-1),
 93                             _tickle_peer_socket(-1)
 94 mike             1.2   {
 95 kumpf            1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 96 mike             1.2       Socket::initializeInterface();
 97 kumpf            1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 98 a.arora          1.73  
 99                            // setup the tickler
100                            initializeTickler();
101                        
102 r.kieninger      1.83      // Start the count at 1 because initilizeTickler()
103 a.arora          1.73      // has added an entry in the first position of the
104                            // _entries array
105                            for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
106 mday             1.37      {
107                               _MonitorEntry entry(0, 0, 0);
108                               _entries.append(entry);
109                            }
110 mday             1.18  }
111 mday             1.20  
112 mike             1.2   Monitor::~Monitor()
113                        {
114 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
115 a.dunfey         1.76  
116                            try{
117 a.dunfey         1.77          if(_tickle_peer_socket >= 0)
118 a.dunfey         1.76          {
119                                    Socket::close(_tickle_peer_socket);
120                                }
121 a.dunfey         1.77          if(_tickle_client_socket >= 0)
122 a.dunfey         1.76          {
123                                    Socket::close(_tickle_client_socket);
124                                }
125 a.dunfey         1.77          if(_tickle_server_socket >= 0)
126 a.dunfey         1.76          {
127                                    Socket::close(_tickle_server_socket);
128                                }
129                            }
130                            catch(...)
131                            {
132                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
133                                          "Failed to close tickle sockets");
134                            }
135                        
136 mike             1.2       Socket::uninitializeInterface();
137 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
138                                          "returning from monitor destructor");
139 mday             1.18  }
140                        
141 a.arora          1.73  void Monitor::initializeTickler(){
142 r.kieninger      1.83      /*
143                               NOTE: On any errors trying to
144                                     setup out tickle connection,
145 a.arora          1.73               throw an exception/end the server
146                            */
147                        
148                            /* setup the tickle server/listener */
149                        
150                            // get a socket for the server side
151 david.dillard    1.95      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
152 a.arora          1.73  	//handle error
153                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
154                        				 "Received error number $0 while creating the internal socket.",
155                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
156                        				 errno);
157                        #else
158                        				 WSAGetLastError());
159                        #endif
160                        	throw Exception(parms);
161                            }
162                        
163                            // initialize the address
164                            memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
165                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
166                        #pragma convert(37)
167                        #endif
168                            _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
169                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
170                        #pragma convert(0)
171                        #endif
172                            _tickle_server_addr.sin_family = PF_INET;
173 a.arora          1.73      _tickle_server_addr.sin_port = 0;
174                        
175 kumpf            1.86      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
176 a.arora          1.73  
177                            // bind server side to socket
178                            if((::bind(_tickle_server_socket,
179 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
180 a.arora          1.73  	       sizeof(_tickle_server_addr))) < 0){
181                        	// handle error
182 r.kieninger      1.83  #ifdef PEGASUS_OS_ZOS
183                            MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
184                        				 "Received error:$0 while binding the internal socket.",strerror(errno));
185                        #else
186 a.arora          1.73  	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
187                        				 "Received error number $0 while binding the internal socket.",
188                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
189                        				 errno);
190                        #else
191                        				 WSAGetLastError());
192                        #endif
193 r.kieninger      1.83  #endif
194 a.arora          1.73          throw Exception(parms);
195                            }
196                        
197                            // tell the kernel we are a server
198                            if((::listen(_tickle_server_socket,3)) < 0){
199                        	// handle error
200                        	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
201                        			 "Received error number $0 while listening to the internal socket.",
202                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
203                        				 errno);
204                        #else
205                        				 WSAGetLastError());
206                        #endif
207                        	throw Exception(parms);
208                            }
209 r.kieninger      1.83  
210 a.arora          1.73      // make sure we have the correct socket for our server
211                            int sock = ::getsockname(_tickle_server_socket,
212 kumpf            1.88                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
213                                           &_addr_size);
214 a.arora          1.73      if(sock < 0){
215                        	// handle error
216                        	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
217                        			 "Received error number $0 while getting the internal socket name.",
218                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
219                        				 errno);
220                        #else
221                        				 WSAGetLastError());
222                        #endif
223                        	throw Exception(parms);
224                            }
225                        
226                            /* set up the tickle client/connector */
227 r.kieninger      1.83  
228 a.arora          1.73      // get a socket for our tickle client
229 david.dillard    1.95      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
230 a.arora          1.73  	// handle error
231                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
232                        			 "Received error number $0 while creating the internal client socket.",
233                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
234                        				 errno);
235                        #else
236                        				 WSAGetLastError());
237                        #endif
238                        	throw Exception(parms);
239                            }
240                        
241                            // setup the address of the client
242                            memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
243                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
244                        #pragma convert(37)
245                        #endif
246                            _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
247                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
248                        #pragma convert(0)
249                        #endif
250                            _tickle_client_addr.sin_family = PF_INET;
251 a.arora          1.73      _tickle_client_addr.sin_port = 0;
252                        
253                            // bind socket to client side
254                            if((::bind(_tickle_client_socket,
255 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
256 a.arora          1.73  	       sizeof(_tickle_client_addr))) < 0){
257                        	// handle error
258                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
259                        			 "Received error number $0 while binding the internal client socket.",
260                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
261                        				 errno);
262                        #else
263                        				 WSAGetLastError());
264                        #endif
265                        	throw Exception(parms);
266                            }
267                        
268                            // connect to server side
269                            if((::connect(_tickle_client_socket,
270 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
271 a.arora          1.73  		  sizeof(_tickle_server_addr))) < 0){
272                        	// handle error
273                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
274                        			 "Received error number $0 while connecting the internal client socket.",
275                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
276                        				 errno);
277                        #else
278                        				 WSAGetLastError());
279                        #endif
280                        	throw Exception(parms);
281                            }
282                        
283                            /* set up the slave connection */
284                            memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
285 kumpf            1.86      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
286 r.kieninger      1.83      pegasus_sleep(1);
287 a.arora          1.73  
288                            // this call may fail, we will try a max of 20 times to establish this peer connection
289                            if((_tickle_peer_socket = ::accept(_tickle_server_socket,
290 kumpf            1.88              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
291                                    &peer_size)) < 0){
292 a.arora          1.73  #if !defined(PEGASUS_OS_TYPE_WINDOWS)
293                                // Only retry on non-windows platforms.
294                                if(_tickle_peer_socket == -1 && errno == EAGAIN)
295                                {
296 r.kieninger      1.83            int retries = 0;
297 a.arora          1.73            do
298                                  {
299                                    pegasus_sleep(1);
300                                    _tickle_peer_socket = ::accept(_tickle_server_socket,
301 kumpf            1.88                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
302                                        &peer_size);
303 a.arora          1.73              retries++;
304                                  } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
305                                }
306                        #endif
307                            }
308                            if(_tickle_peer_socket == -1){
309                        	// handle error
310                        	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
311                        			 "Received error number $0 while accepting the internal socket connection.",
312                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
313                        				 errno);
314                        #else
315                        				 WSAGetLastError());
316                        #endif
317                        	throw Exception(parms);
318                            }
319                            // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
320                            // checks entries with IDLE state for events
321                            _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
322                            entry._status = _MonitorEntry::IDLE;
323                            _entries.append(entry);
324 a.arora          1.73  }
325                        
326                        void Monitor::tickle(void)
327                        {
328 sushma.fernandes 1.78      static char _buffer[] =
329 a.arora          1.73      {
330                              '0','0'
331                            };
332 r.kieninger      1.83  
333 sushma.fernandes 1.78      AutoMutex autoMutex(_tickle_mutex);
334 r.kieninger      1.83      Socket::disableBlocking(_tickle_client_socket);
335 sushma.fernandes 1.78      Socket::write(_tickle_client_socket,&_buffer, 2);
336 r.kieninger      1.83      Socket::enableBlocking(_tickle_client_socket);
337 sushma.fernandes 1.78  }
338                        
339                        void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
340                        {
341                            // Set the state to requested state
342                            _entries[index]._status = status;
343 a.arora          1.73  }
344                        
345 mike             1.2   Boolean Monitor::run(Uint32 milliseconds)
346                        {
347 mday             1.18  
348 mday             1.25      Boolean handled_events = false;
349 a.arora          1.73      int i = 0;
350 r.kieninger      1.83  
351 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
352 a.arora          1.73  
353 mday             1.25      fd_set fdread;
354                            FD_ZERO(&fdread);
355 a.arora          1.73  
356 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
357 r.kieninger      1.83  
358 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
359                        
360 r.kieninger      1.83      // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
361 mike             1.96      if (_stopConnections.get() == 1)
362 kumpf            1.48      {
363 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
364 kumpf            1.48          {
365 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
366 kumpf            1.48              {
367 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
368 kumpf            1.48                  {
369 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
370                                                entries[indx]._status.get() == _MonitorEntry::DYING )
371 kumpf            1.48                     {
372                                               // remove the entry
373 mike             1.100 		       entries[indx]._status = _MonitorEntry::EMPTY;
374 kumpf            1.48                     }
375                                           else
376                                           {
377                                               // set status to DYING
378 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
379 kumpf            1.48                     }
380                                       }
381                                   }
382                                }
383                                _stopConnections = 0;
384 a.arora          1.73  	_stopConnectionsSem.signal();
385 kumpf            1.48      }
386 kumpf            1.51  
387 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
388 kumpf            1.68      {
389 mike             1.100 			 const _MonitorEntry &entry = entries[indx];
390 mike             1.96         if ((entry._status.get() == _MonitorEntry::DYING) &&
391 brian.campbell   1.80  					 (entry._type == Monitor::CONNECTION))
392 kumpf            1.68         {
393 brian.campbell   1.80            MessageQueue *q = MessageQueue::lookup(entry.queueId);
394 kumpf            1.68            PEGASUS_ASSERT(q != 0);
395 brian.campbell   1.80            HTTPConnection &h = *static_cast<HTTPConnection *>(q);
396 r.kieninger      1.83  
397 brian.campbell   1.80  					if (h._connectionClosePending == false)
398                        						continue;
399                        
400                        					// NOTE: do not attempt to delete while there are pending responses
401 r.kieninger      1.83  					// coming thru. The last response to come thru after a
402 brian.campbell   1.80  					// _connectionClosePending will reset _responsePending to false
403                        					// and then cause the monitor to rerun this code and clean up.
404                        					// (see HTTPConnection.cpp)
405                        
406                        					if (h._responsePending == true)
407                        					{
408                        						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
409                        													"Ignoring connection delete request because "
410                        													"responses are still pending. "
411 r.kieninger      1.83  													"connection=0x%p, socket=%d\n",
412 brian.campbell   1.81  													(void *)&h, h.getSocket());
413 brian.campbell   1.80  						continue;
414                        					}
415                        					h._connectionClosePending = false;
416                                  MessageQueue &o = h.get_owner();
417                                  Message* message= new CloseConnectionMessage(entry.socket);
418 kumpf            1.68            message->dest = o.getQueueId();
419                        
420 r.kieninger      1.83            // HTTPAcceptor is responsible for closing the connection.
421 kumpf            1.68            // The lock is released to allow HTTPAcceptor to call
422 r.kieninger      1.83            // unsolicitSocketMessages to free the entry.
423 kumpf            1.68            // Once HTTPAcceptor completes processing of the close
424                                  // connection, the lock is re-requested and processing of
425                                  // the for loop continues.  This is safe with the current
426 mike             1.100           // implementation of the entries object.  Note that the
427                                  // loop condition accesses the entries.size() on each
428 kumpf            1.68            // iteration, so that a change in size while the mutex is
429                                  // unlocked will not result in an ArrayIndexOutOfBounds
430                                  // exception.
431                        
432 kumpf            1.94            autoEntryMutex.unlock();
433 kumpf            1.68            o.enqueue(message);
434 kumpf            1.94            autoEntryMutex.lock();
435 r.kieninger      1.102           // After enqueue a message and the autoEntryMutex has been released and locked again,
436                                  // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
437                                  entries.reset(_entries);
438 kumpf            1.68         }
439                            }
440                        
441 kumpf            1.51      Uint32 _idleEntries = 0;
442 r.kieninger      1.83  
443 a.arora          1.73      /*
444 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
445                                to the kernel as a parameter to SELECT.  This loop seems like a good
446                                place to calculate the max file descriptor (maximum socket number)
447                                because we have to traverse the entire array.
448 r.kieninger      1.83      */
449 david.dillard    1.95      PEGASUS_SOCKET maxSocketCurrentPass = 0;
450 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
451 mike             1.2       {
452 mike             1.100        if(maxSocketCurrentPass < entries[indx].socket)
453                                    maxSocketCurrentPass = entries[indx].socket;
454 a.arora          1.73  
455 mike             1.100        if(entries[indx]._status.get() == _MonitorEntry::IDLE)
456 mday             1.25         {
457 david.dillard    1.95             _idleEntries++;
458 mike             1.100            FD_SET(entries[indx].socket, &fdread);
459 mday             1.25         }
460 mday             1.13      }
461 s.hills          1.62  
462 a.arora          1.73      /*
463 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
464                                descriptors starting at 0.
465 a.arora          1.73      */
466                            maxSocketCurrentPass++;
467                        
468 kumpf            1.94      autoEntryMutex.unlock();
469 david.dillard    1.95  
470                            //
471                            // The first argument to select() is ignored on Windows and it is not
472                            // a socket value.  The original code assumed that the number of sockets
473                            // and a socket value have the same type.  On Windows they do not.
474                            //
475                        #ifdef PEGASUS_OS_TYPE_WINDOWS
476                            int events = select(0, &fdread, NULL, NULL, &tv);
477                        #else
478 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
479 david.dillard    1.95  #endif
480 kumpf            1.94      autoEntryMutex.lock();
481 r.kieninger      1.102     // After enqueue a message and the autoEntryMutex has been released and locked again,
482                            // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
483                            entries.reset(_entries);
484 mike             1.2   #ifdef PEGASUS_OS_TYPE_WINDOWS
485 kumpf            1.50      if(events == SOCKET_ERROR)
486 mike             1.2   #else
487 kumpf            1.50      if(events == -1)
488 mike             1.2   #endif
489 mday             1.13      {
490 kumpf            1.50         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
491                                  "Monitor::run - errorno = %d has occurred on select.", errno);
492                               // The EBADF error indicates that one or more or the file
493                               // descriptions was not valid. This could indicate that
494 mike             1.100        // the entries structure has been corrupted or that
495 kumpf            1.50         // we have a synchronization error.
496                        
497                               PEGASUS_ASSERT(errno != EBADF);
498                            }
499                            else if (events)
500                            {
501 kumpf            1.51         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
502 r.kieninger      1.83            "Monitor::run select event received events = %d, monitoring %d idle entries",
503 david.dillard    1.95             events, _idleEntries);
504 mike             1.100        for( int indx = 0; indx < (int)entries.size(); indx++)
505 mday             1.25         {
506 kumpf            1.53            // The Monitor should only look at entries in the table that are IDLE (i.e.,
507                                  // owned by the Monitor).
508 mike             1.100           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
509                                     (FD_ISSET(entries[indx].socket, &fdread)))
510 david.dillard    1.95            {
511 mike             1.100              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
512 kumpf            1.53               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
513                                          "Monitor::run indx = %d, queueId =  %d, q = %p",
514 mike             1.100                   indx, entries[indx].queueId, q);
515 kumpf            1.53               PEGASUS_ASSERT(q !=0);
516 mday             1.37  
517 david.dillard    1.95               try
518                                     {
519 mike             1.100                 if(entries[indx]._type == Monitor::CONNECTION)
520 david.dillard    1.95                  {
521 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
522 mike             1.100                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
523 david.dillard    1.95                     static_cast<HTTPConnection *>(q)->_entry_index = indx;
524 sushma.fernandes 1.78  
525                                           // Do not update the entry just yet. The entry gets updated once
526 r.kieninger      1.83                     // the request has been read.
527 mike             1.100                    //entries[indx]._status = _MonitorEntry::BUSY;
528 sushma.fernandes 1.78  
529 kumpf            1.66                     // If allocate_and_awaken failure, retry on next iteration
530 a.arora          1.73  /* Removed for PEP 183.
531 kumpf            1.69                     if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
532                                                   (void *)q, _dispatch))
533 kumpf            1.67                     {
534                                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
535                                                  "Monitor::run: Insufficient resources to process request.");
536 mike             1.100                       entries[indx]._status = _MonitorEntry::IDLE;
537 kumpf            1.67                        return true;
538                                           }
539 a.arora          1.73  */
540                        // Added for PEP 183
541 david.dillard    1.95                     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
542                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
543 a.arora          1.73                           "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
544                                           dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
545                                           try
546                                           {
547                                               dst->run(1);
548                                           }
549 david.dillard    1.95                     catch (...)
550                                           {
551                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
552                                               "Monitor::_dispatch: exception received");
553                                           }
554                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
555 a.arora          1.73                     "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
556                        
557 sushma.fernandes 1.78                     // It is possible the entry status may not be set to busy.
558 r.kieninger      1.83                     // The following will fail in that case.
559 mike             1.96     		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
560 a.arora          1.73  		   // Once the HTTPConnection thread has set the status value to either
561                        		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
562                        		   // to the Monitor.  It is no longer permissible to access the connection
563                        		   // or the entry in the _entries table.
564 sushma.fernandes 1.78  
565                                           // The following is not relevant as the worker thread or the
566                                           // reader thread will update the status of the entry.
567                        		   //if (dst->_connectionClosePending)
568 r.kieninger      1.83  		   //{
569 sushma.fernandes 1.78  		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
570                        		   //}
571                        		   //else
572                        		   //{
573                        		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
574 r.kieninger      1.83  		   //}
575                        // end Added for PEP 183
576 a.arora          1.73  		}
577 mike             1.100 	        else if( entries[indx]._type == Monitor::INTERNAL){
578 r.kieninger      1.83  			// set ourself to BUSY,
579                                                // read the data
580 a.arora          1.73                          // and set ourself back to IDLE
581 r.kieninger      1.83  
582 mike             1.100 		   	entries[indx]._status = _MonitorEntry::BUSY;
583 a.arora          1.73  			static char buffer[2];
584 mike             1.100       			Socket::disableBlocking(entries[indx].socket);
585                              			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
586                              			Socket::enableBlocking(entries[indx].socket);
587                        			entries[indx]._status = _MonitorEntry::IDLE;
588 mday             1.37  		}
589                        		else
590 mday             1.25  		{
591 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
592                                             "Non-connection entry, indx = %d, has been received.", indx);
593 mday             1.37  		   int events = 0;
594                        		   events |= SocketMessage::READ;
595 mike             1.100 		   Message *msg = new SocketMessage(entries[indx].socket, events);
596                        		   entries[indx]._status = _MonitorEntry::BUSY;
597 kumpf            1.94                     autoEntryMutex.unlock();
598 mday             1.37  		   q->enqueue(msg);
599 kumpf            1.94                     autoEntryMutex.lock();
600 r.kieninger      1.102            // After enqueue a message and the autoEntryMutex has been released and locked again,
601                                   // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
602                                   entries.reset(_entries);
603 mike             1.100 		   entries[indx]._status = _MonitorEntry::IDLE;
604 kumpf            1.94  
605 mday             1.25  		   return true;
606                        		}
607                        	     }
608 mday             1.37  	     catch(...)
609 mday             1.25  	     {
610                        	     }
611                        	     handled_events = true;
612                        	  }
613                               }
614 mday             1.24      }
615 kumpf            1.94  
616 mday             1.13      return(handled_events);
617 mike             1.2   }
618                        
619 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
620 kumpf            1.48  {
621                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
622 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
623 kumpf            1.48      _stopConnections = 1;
624 a.arora          1.73      tickle();
625 kumpf            1.48  
626 chuck            1.74      if (wait)
627 a.arora          1.73      {
628 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
629                              // caller of this function may unbind the ports while the monitor
630                              // is still accepting connections on them.
631 kumpf            1.101       _stopConnectionsSem.wait();
632 a.arora          1.73      }
633 r.kieninger      1.83  
634 kumpf            1.48      PEG_METHOD_EXIT();
635                        }
636 mday             1.25  
637 mday             1.37  
638 mday             1.25  int  Monitor::solicitSocketMessages(
639 david.dillard    1.95      PEGASUS_SOCKET socket,
640 mike             1.2       Uint32 events,
641 r.kieninger      1.83      Uint32 queueId,
642 mday             1.8       int type)
643 mike             1.2   {
644 r.kieninger      1.83     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
645 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
646 a.arora          1.73     // Check to see if we need to dynamically grow the _entries array
647                           // We always want the _entries array to 2 bigger than the
648                           // current connections requested
649                           _solicitSocketCount++;  // bump the count
650                           int size = (int)_entries.size();
651 w.otsuka         1.89     if((int)_solicitSocketCount >= (size-1)){
652                                for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
653 a.arora          1.73                  _MonitorEntry entry(0, 0, 0);
654                                        _entries.append(entry);
655                                }
656                           }
657 kumpf            1.4   
658 a.arora          1.73     int index;
659                           for(index = 1; index < (int)_entries.size(); index++)
660 mday             1.25     {
661 a.arora          1.73        try
662 mday             1.37        {
663 mike             1.96           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
664 a.arora          1.73           {
665                                    _entries[index].socket = socket;
666                                    _entries[index].queueId  = queueId;
667                                    _entries[index]._type = type;
668                                    _entries[index]._status = _MonitorEntry::IDLE;
669 r.kieninger      1.83  
670 a.arora          1.73              return index;
671                                 }
672 mday             1.37        }
673                              catch(...)
674 mday             1.25        {
675                              }
676                           }
677 a.arora          1.73     _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
678 mday             1.25     PEG_METHOD_EXIT();
679 kumpf            1.50     return -1;
680 a.arora          1.73  
681 mike             1.2   }
682                        
683 david.dillard    1.95  void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
684 mike             1.2   {
685 kumpf            1.50  
686 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
687 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
688 a.arora          1.73  
689                            /*
690                                Start at index = 1 because _entries[0] is the tickle entry which never needs
691                                to be EMPTY;
692                            */
693 w.otsuka         1.89      unsigned int index;
694 a.arora          1.73      for(index = 1; index < _entries.size(); index++)
695 mike             1.2       {
696 mday             1.25         if(_entries[index].socket == socket)
697                               {
698 a.arora          1.73            _entries[index]._status = _MonitorEntry::EMPTY;
699 david.dillard    1.95            _entries[index].socket = PEGASUS_INVALID_SOCKET;
700 a.arora          1.73            _solicitSocketCount--;
701                                  break;
702 mday             1.25         }
703 mike             1.2       }
704 a.arora          1.73  
705                            /*
706                        	Dynamic Contraction:
707                        	To remove excess entries we will start from the end of the _entries array
708                        	and remove all entries with EMPTY status until we find the first NON EMPTY.
709                        	This prevents the positions, of the NON EMPTY entries, from being changed.
710 r.kieninger      1.83      */
711 a.arora          1.73      index = _entries.size() - 1;
712 mike             1.96      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
713 a.arora          1.73  	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
714                                        _entries.remove(index);
715                        	index--;
716                            }
717 kumpf            1.4       PEG_METHOD_EXIT();
718 mike             1.2   }
719                        
720 a.arora          1.73  // Note: this is no longer called with PEP 183.
721 mday             1.7   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
722                        {
723 mday             1.8      HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
724 kumpf            1.51     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
725 kumpf            1.53          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
726                                dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
727 kumpf            1.51     try
728                           {
729                              dst->run(1);
730                           }
731                           catch (...)
732                           {
733                              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
734                                  "Monitor::_dispatch: exception received");
735                           }
736                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
737                                  "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
738 r.kieninger      1.83  
739 mike             1.96     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
740 kumpf            1.68  
741                           // Once the HTTPConnection thread has set the status value to either
742                           // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
743                           // to the Monitor.  It is no longer permissible to access the connection
744                           // or the entry in the _entries table.
745 kumpf            1.50     if (dst->_connectionClosePending)
746                           {
747 kumpf            1.68        dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
748                           }
749                           else
750                           {
751                              dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
752 kumpf            1.50     }
753 mday             1.8      return 0;
754 mday             1.40  }
755                        
756 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2