(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             // Author: Mike Brasher (mbrasher@bmc.com)
 33             //
 34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
 36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 40 mike             1.2   //
 41                        //%/////////////////////////////////////////////////////////////////////////////
 42                        
 43                        #include <Pegasus/Common/Config.h>
 44 mday             1.40  
 45 mike             1.2   #include <cstring>
 46                        #include "Monitor.h"
 47                        #include "MessageQueue.h"
 48                        #include "Socket.h"
 49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
 50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
 51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
 52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
 53 mike             1.100 #include "ArrayIterator.h"
 54 mike             1.2   
 55                        #ifdef PEGASUS_OS_TYPE_WINDOWS
 56                        # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 57                        #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 58                        of <winsock.h>. It may have been indirectly included (e.g., by including \
 59 david.dillard    1.95  <windows.h>). Find inclusion of that header which is visible to this \
 60 mike             1.2   compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 61                        otherwise, less than 64 clients (the default) will be able to connect to the \
 62                        CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 63 mday             1.5   
 64 mike             1.2   # endif
 65                        # define FD_SETSIZE 1024
 66 mday             1.5   # include <windows.h>
 67 mike             1.2   #else
 68                        # include <sys/types.h>
 69                        # include <sys/socket.h>
 70                        # include <sys/time.h>
 71                        # include <netinet/in.h>
 72                        # include <netdb.h>
 73                        # include <arpa/inet.h>
 74                        #endif
 75                        
 76                        PEGASUS_USING_STD;
 77                        
 78                        PEGASUS_NAMESPACE_BEGIN
 79                        
 80 mike             1.96  static AtomicInt _connections(0);
 81 mday             1.25  
 82 mike             1.2   ////////////////////////////////////////////////////////////////////////////////
 83                        //
 84                        // Monitor
 85                        //
 86                        ////////////////////////////////////////////////////////////////////////////////
 87                        
 88 kumpf            1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 89 mike             1.2   Monitor::Monitor()
 90 kumpf            1.87     : _stopConnections(0),
 91 a.arora          1.73       _stopConnectionsSem(0),
 92 a.dunfey         1.76       _solicitSocketCount(0),
 93 a.dunfey         1.77       _tickle_client_socket(-1),
 94                             _tickle_server_socket(-1),
 95                             _tickle_peer_socket(-1)
 96 mike             1.2   {
 97 kumpf            1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 98 mike             1.2       Socket::initializeInterface();
 99 kumpf            1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
100 a.arora          1.73  
101                            // setup the tickler
102                            initializeTickler();
103                        
104 r.kieninger      1.83      // Start the count at 1 because initilizeTickler()
105 a.arora          1.73      // has added an entry in the first position of the
106                            // _entries array
107                            for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
108 mday             1.37      {
109                               _MonitorEntry entry(0, 0, 0);
110                               _entries.append(entry);
111                            }
112 mday             1.18  }
113 mday             1.20  
114 mike             1.2   Monitor::~Monitor()
115                        {
116 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
117 a.dunfey         1.76  
118                            try{
119 a.dunfey         1.77          if(_tickle_peer_socket >= 0)
120 a.dunfey         1.76          {
121                                    Socket::close(_tickle_peer_socket);
122                                }
123 a.dunfey         1.77          if(_tickle_client_socket >= 0)
124 a.dunfey         1.76          {
125                                    Socket::close(_tickle_client_socket);
126                                }
127 a.dunfey         1.77          if(_tickle_server_socket >= 0)
128 a.dunfey         1.76          {
129                                    Socket::close(_tickle_server_socket);
130                                }
131                            }
132                            catch(...)
133                            {
134                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
135                                          "Failed to close tickle sockets");
136                            }
137                        
138 mike             1.2       Socket::uninitializeInterface();
139 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
140                                          "returning from monitor destructor");
141 mday             1.18  }
142                        
143 a.arora          1.73  void Monitor::initializeTickler(){
144 r.kieninger      1.83      /*
145                               NOTE: On any errors trying to
146                                     setup out tickle connection,
147 a.arora          1.73               throw an exception/end the server
148                            */
149                        
150                            /* setup the tickle server/listener */
151                        
152                            // get a socket for the server side
153 david.dillard    1.95      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
154 a.arora          1.73  	//handle error
155                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
156                        				 "Received error number $0 while creating the internal socket.",
157                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
158                        				 errno);
159                        #else
160                        				 WSAGetLastError());
161                        #endif
162                        	throw Exception(parms);
163                            }
164                        
165                            // initialize the address
166                            memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
167                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
168                        #pragma convert(37)
169                        #endif
170                            _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
171                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
172                        #pragma convert(0)
173                        #endif
174                            _tickle_server_addr.sin_family = PF_INET;
175 a.arora          1.73      _tickle_server_addr.sin_port = 0;
176                        
177 kumpf            1.86      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
178 a.arora          1.73  
179                            // bind server side to socket
180                            if((::bind(_tickle_server_socket,
181 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
182 a.arora          1.73  	       sizeof(_tickle_server_addr))) < 0){
183                        	// handle error
184 r.kieninger      1.83  #ifdef PEGASUS_OS_ZOS
185                            MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
186                        				 "Received error:$0 while binding the internal socket.",strerror(errno));
187                        #else
188 a.arora          1.73  	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
189                        				 "Received error number $0 while binding the internal socket.",
190                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
191                        				 errno);
192                        #else
193                        				 WSAGetLastError());
194                        #endif
195 r.kieninger      1.83  #endif
196 a.arora          1.73          throw Exception(parms);
197                            }
198                        
199                            // tell the kernel we are a server
200                            if((::listen(_tickle_server_socket,3)) < 0){
201                        	// handle error
202                        	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
203                        			 "Received error number $0 while listening to the internal socket.",
204                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
205                        				 errno);
206                        #else
207                        				 WSAGetLastError());
208                        #endif
209                        	throw Exception(parms);
210                            }
211 r.kieninger      1.83  
212 a.arora          1.73      // make sure we have the correct socket for our server
213                            int sock = ::getsockname(_tickle_server_socket,
214 kumpf            1.88                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
215                                           &_addr_size);
216 a.arora          1.73      if(sock < 0){
217                        	// handle error
218                        	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
219                        			 "Received error number $0 while getting the internal socket name.",
220                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
221                        				 errno);
222                        #else
223                        				 WSAGetLastError());
224                        #endif
225                        	throw Exception(parms);
226                            }
227                        
228                            /* set up the tickle client/connector */
229 r.kieninger      1.83  
230 a.arora          1.73      // get a socket for our tickle client
231 david.dillard    1.95      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
232 a.arora          1.73  	// handle error
233                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
234                        			 "Received error number $0 while creating the internal client socket.",
235                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
236                        				 errno);
237                        #else
238                        				 WSAGetLastError());
239                        #endif
240                        	throw Exception(parms);
241                            }
242                        
243                            // setup the address of the client
244                            memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
245                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
246                        #pragma convert(37)
247                        #endif
248                            _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
249                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
250                        #pragma convert(0)
251                        #endif
252                            _tickle_client_addr.sin_family = PF_INET;
253 a.arora          1.73      _tickle_client_addr.sin_port = 0;
254                        
255                            // bind socket to client side
256                            if((::bind(_tickle_client_socket,
257 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
258 a.arora          1.73  	       sizeof(_tickle_client_addr))) < 0){
259                        	// handle error
260                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
261                        			 "Received error number $0 while binding the internal client socket.",
262                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
263                        				 errno);
264                        #else
265                        				 WSAGetLastError());
266                        #endif
267                        	throw Exception(parms);
268                            }
269                        
270                            // connect to server side
271                            if((::connect(_tickle_client_socket,
272 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
273 a.arora          1.73  		  sizeof(_tickle_server_addr))) < 0){
274                        	// handle error
275                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
276                        			 "Received error number $0 while connecting the internal client socket.",
277                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
278                        				 errno);
279                        #else
280                        				 WSAGetLastError());
281                        #endif
282                        	throw Exception(parms);
283                            }
284                        
285                            /* set up the slave connection */
286                            memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
287 kumpf            1.86      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
288 r.kieninger      1.83      pegasus_sleep(1);
289 a.arora          1.73  
290                            // this call may fail, we will try a max of 20 times to establish this peer connection
291                            if((_tickle_peer_socket = ::accept(_tickle_server_socket,
292 kumpf            1.88              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
293                                    &peer_size)) < 0){
294 a.arora          1.73  #if !defined(PEGASUS_OS_TYPE_WINDOWS)
295                                // Only retry on non-windows platforms.
296                                if(_tickle_peer_socket == -1 && errno == EAGAIN)
297                                {
298 r.kieninger      1.83            int retries = 0;
299 a.arora          1.73            do
300                                  {
301                                    pegasus_sleep(1);
302                                    _tickle_peer_socket = ::accept(_tickle_server_socket,
303 kumpf            1.88                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
304                                        &peer_size);
305 a.arora          1.73              retries++;
306                                  } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
307                                }
308                        #endif
309                            }
310                            if(_tickle_peer_socket == -1){
311                        	// handle error
312                        	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
313                        			 "Received error number $0 while accepting the internal socket connection.",
314                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
315                        				 errno);
316                        #else
317                        				 WSAGetLastError());
318                        #endif
319                        	throw Exception(parms);
320                            }
321                            // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
322                            // checks entries with IDLE state for events
323                            _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
324                            entry._status = _MonitorEntry::IDLE;
325                            _entries.append(entry);
326 a.arora          1.73  }
327                        
328                        void Monitor::tickle(void)
329                        {
330 sushma.fernandes 1.78      static char _buffer[] =
331 a.arora          1.73      {
332                              '0','0'
333                            };
334 r.kieninger      1.83  
335 sushma.fernandes 1.78      AutoMutex autoMutex(_tickle_mutex);
336 r.kieninger      1.83      Socket::disableBlocking(_tickle_client_socket);
337 sushma.fernandes 1.78      Socket::write(_tickle_client_socket,&_buffer, 2);
338 r.kieninger      1.83      Socket::enableBlocking(_tickle_client_socket);
339 sushma.fernandes 1.78  }
340                        
341                        void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
342                        {
343                            // Set the state to requested state
344                            _entries[index]._status = status;
345 a.arora          1.73  }
346                        
347 mike             1.2   Boolean Monitor::run(Uint32 milliseconds)
348                        {
349 mday             1.18  
350 mday             1.25      Boolean handled_events = false;
351 a.arora          1.73      int i = 0;
352 r.kieninger      1.83  
353 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
354 a.arora          1.73  
355 mday             1.25      fd_set fdread;
356                            FD_ZERO(&fdread);
357 a.arora          1.73  
358 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
359 r.kieninger      1.83  
360 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
361                        
362 r.kieninger      1.83      // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
363 mike             1.96      if (_stopConnections.get() == 1)
364 kumpf            1.48      {
365 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
366 kumpf            1.48          {
367 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
368 kumpf            1.48              {
369 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
370 kumpf            1.48                  {
371 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
372                                                entries[indx]._status.get() == _MonitorEntry::DYING )
373 kumpf            1.48                     {
374                                               // remove the entry
375 mike             1.100 		       entries[indx]._status = _MonitorEntry::EMPTY;
376 kumpf            1.48                     }
377                                           else
378                                           {
379                                               // set status to DYING
380 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
381 kumpf            1.48                     }
382                                       }
383                                   }
384                                }
385                                _stopConnections = 0;
386 a.arora          1.73  	_stopConnectionsSem.signal();
387 kumpf            1.48      }
388 kumpf            1.51  
389 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
390 kumpf            1.68      {
391 mike             1.100 			 const _MonitorEntry &entry = entries[indx];
392 mike             1.96         if ((entry._status.get() == _MonitorEntry::DYING) &&
393 brian.campbell   1.80  					 (entry._type == Monitor::CONNECTION))
394 kumpf            1.68         {
395 brian.campbell   1.80            MessageQueue *q = MessageQueue::lookup(entry.queueId);
396 kumpf            1.68            PEGASUS_ASSERT(q != 0);
397 brian.campbell   1.80            HTTPConnection &h = *static_cast<HTTPConnection *>(q);
398 r.kieninger      1.83  
399 brian.campbell   1.80  					if (h._connectionClosePending == false)
400                        						continue;
401                        
402                        					// NOTE: do not attempt to delete while there are pending responses
403 r.kieninger      1.83  					// coming thru. The last response to come thru after a
404 brian.campbell   1.80  					// _connectionClosePending will reset _responsePending to false
405                        					// and then cause the monitor to rerun this code and clean up.
406                        					// (see HTTPConnection.cpp)
407                        
408                        					if (h._responsePending == true)
409                        					{
410                        						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
411                        													"Ignoring connection delete request because "
412                        													"responses are still pending. "
413 r.kieninger      1.83  													"connection=0x%p, socket=%d\n",
414 brian.campbell   1.81  													(void *)&h, h.getSocket());
415 brian.campbell   1.80  						continue;
416                        					}
417                        					h._connectionClosePending = false;
418                                  MessageQueue &o = h.get_owner();
419                                  Message* message= new CloseConnectionMessage(entry.socket);
420 kumpf            1.68            message->dest = o.getQueueId();
421                        
422 r.kieninger      1.83            // HTTPAcceptor is responsible for closing the connection.
423 kumpf            1.68            // The lock is released to allow HTTPAcceptor to call
424 r.kieninger      1.83            // unsolicitSocketMessages to free the entry.
425 kumpf            1.68            // Once HTTPAcceptor completes processing of the close
426                                  // connection, the lock is re-requested and processing of
427                                  // the for loop continues.  This is safe with the current
428 mike             1.100           // implementation of the entries object.  Note that the
429                                  // loop condition accesses the entries.size() on each
430 kumpf            1.68            // iteration, so that a change in size while the mutex is
431                                  // unlocked will not result in an ArrayIndexOutOfBounds
432                                  // exception.
433                        
434 kumpf            1.94            autoEntryMutex.unlock();
435 kumpf            1.68            o.enqueue(message);
436 kumpf            1.94            autoEntryMutex.lock();
437 r.kieninger      1.102           // After enqueue a message and the autoEntryMutex has been released and locked again,
438                                  // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
439                                  entries.reset(_entries);
440 kumpf            1.68         }
441                            }
442                        
443 kumpf            1.51      Uint32 _idleEntries = 0;
444 r.kieninger      1.83  
445 a.arora          1.73      /*
446 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
447                                to the kernel as a parameter to SELECT.  This loop seems like a good
448                                place to calculate the max file descriptor (maximum socket number)
449                                because we have to traverse the entire array.
450 r.kieninger      1.83      */
451 david.dillard    1.95      PEGASUS_SOCKET maxSocketCurrentPass = 0;
452 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
453 mike             1.2       {
454 mike             1.100        if(maxSocketCurrentPass < entries[indx].socket)
455                                    maxSocketCurrentPass = entries[indx].socket;
456 a.arora          1.73  
457 mike             1.100        if(entries[indx]._status.get() == _MonitorEntry::IDLE)
458 mday             1.25         {
459 david.dillard    1.95             _idleEntries++;
460 mike             1.100            FD_SET(entries[indx].socket, &fdread);
461 mday             1.25         }
462 mday             1.13      }
463 s.hills          1.62  
464 a.arora          1.73      /*
465 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
466                                descriptors starting at 0.
467 a.arora          1.73      */
468                            maxSocketCurrentPass++;
469                        
470 kumpf            1.94      autoEntryMutex.unlock();
471 david.dillard    1.95  
472                            //
473                            // The first argument to select() is ignored on Windows and it is not
474                            // a socket value.  The original code assumed that the number of sockets
475                            // and a socket value have the same type.  On Windows they do not.
476                            //
477                        #ifdef PEGASUS_OS_TYPE_WINDOWS
478                            int events = select(0, &fdread, NULL, NULL, &tv);
479                        #else
480 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
481 david.dillard    1.95  #endif
482 kumpf            1.94      autoEntryMutex.lock();
483 r.kieninger      1.102     // After enqueue a message and the autoEntryMutex has been released and locked again,
484                            // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
485                            entries.reset(_entries);
486 mike             1.2   #ifdef PEGASUS_OS_TYPE_WINDOWS
487 kumpf            1.50      if(events == SOCKET_ERROR)
488 mike             1.2   #else
489 kumpf            1.50      if(events == -1)
490 mike             1.2   #endif
491 mday             1.13      {
492 kumpf            1.50         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
493                                  "Monitor::run - errorno = %d has occurred on select.", errno);
494                               // The EBADF error indicates that one or more or the file
495                               // descriptions was not valid. This could indicate that
496 mike             1.100        // the entries structure has been corrupted or that
497 kumpf            1.50         // we have a synchronization error.
498                        
499                               PEGASUS_ASSERT(errno != EBADF);
500                            }
501                            else if (events)
502                            {
503 kumpf            1.51         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
504 r.kieninger      1.83            "Monitor::run select event received events = %d, monitoring %d idle entries",
505 david.dillard    1.95             events, _idleEntries);
506 mike             1.100        for( int indx = 0; indx < (int)entries.size(); indx++)
507 mday             1.25         {
508 kumpf            1.53            // The Monitor should only look at entries in the table that are IDLE (i.e.,
509                                  // owned by the Monitor).
510 mike             1.100           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
511                                     (FD_ISSET(entries[indx].socket, &fdread)))
512 david.dillard    1.95            {
513 mike             1.100              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
514 kumpf            1.53               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
515                                          "Monitor::run indx = %d, queueId =  %d, q = %p",
516 mike             1.100                   indx, entries[indx].queueId, q);
517 kumpf            1.53               PEGASUS_ASSERT(q !=0);
518 mday             1.37  
519 david.dillard    1.95               try
520                                     {
521 mike             1.100                 if(entries[indx]._type == Monitor::CONNECTION)
522 david.dillard    1.95                  {
523 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
524 mike             1.100                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
525 david.dillard    1.95                     static_cast<HTTPConnection *>(q)->_entry_index = indx;
526 sushma.fernandes 1.78  
527                                           // Do not update the entry just yet. The entry gets updated once
528 r.kieninger      1.83                     // the request has been read.
529 mike             1.100                    //entries[indx]._status = _MonitorEntry::BUSY;
530 sushma.fernandes 1.78  
531 kumpf            1.66                     // If allocate_and_awaken failure, retry on next iteration
532 a.arora          1.73  /* Removed for PEP 183.
533 kumpf            1.69                     if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
534                                                   (void *)q, _dispatch))
535 kumpf            1.67                     {
536                                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
537                                                  "Monitor::run: Insufficient resources to process request.");
538 mike             1.100                       entries[indx]._status = _MonitorEntry::IDLE;
539 kumpf            1.67                        return true;
540                                           }
541 a.arora          1.73  */
542                        // Added for PEP 183
543 david.dillard    1.95                     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
544                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
545 a.arora          1.73                           "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
546                                           dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
547                                           try
548                                           {
549                                               dst->run(1);
550                                           }
551 david.dillard    1.95                     catch (...)
552                                           {
553                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554                                               "Monitor::_dispatch: exception received");
555                                           }
556                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
557 a.arora          1.73                     "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
558                        
559 sushma.fernandes 1.78                     // It is possible the entry status may not be set to busy.
560 r.kieninger      1.83                     // The following will fail in that case.
561 mike             1.96     		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
562 a.arora          1.73  		   // Once the HTTPConnection thread has set the status value to either
563                        		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
564                        		   // to the Monitor.  It is no longer permissible to access the connection
565                        		   // or the entry in the _entries table.
566 sushma.fernandes 1.78  
567                                           // The following is not relevant as the worker thread or the
568                                           // reader thread will update the status of the entry.
569                        		   //if (dst->_connectionClosePending)
570 r.kieninger      1.83  		   //{
571 sushma.fernandes 1.78  		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
572                        		   //}
573                        		   //else
574                        		   //{
575                        		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
576 r.kieninger      1.83  		   //}
577                        // end Added for PEP 183
578 a.arora          1.73  		}
579 mike             1.100 	        else if( entries[indx]._type == Monitor::INTERNAL){
580 r.kieninger      1.83  			// set ourself to BUSY,
581                                                // read the data
582 a.arora          1.73                          // and set ourself back to IDLE
583 r.kieninger      1.83  
584 mike             1.100 		   	entries[indx]._status = _MonitorEntry::BUSY;
585 a.arora          1.73  			static char buffer[2];
586 mike             1.100       			Socket::disableBlocking(entries[indx].socket);
587                              			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
588                              			Socket::enableBlocking(entries[indx].socket);
589                        			entries[indx]._status = _MonitorEntry::IDLE;
590 mday             1.37  		}
591                        		else
592 mday             1.25  		{
593 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
594                                             "Non-connection entry, indx = %d, has been received.", indx);
595 mday             1.37  		   int events = 0;
596                        		   events |= SocketMessage::READ;
597 mike             1.100 		   Message *msg = new SocketMessage(entries[indx].socket, events);
598                        		   entries[indx]._status = _MonitorEntry::BUSY;
599 kumpf            1.94                     autoEntryMutex.unlock();
600 mday             1.37  		   q->enqueue(msg);
601 kumpf            1.94                     autoEntryMutex.lock();
602 r.kieninger      1.102            // After enqueue a message and the autoEntryMutex has been released and locked again,
603                                   // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
604                                   entries.reset(_entries);
605 mike             1.100 		   entries[indx]._status = _MonitorEntry::IDLE;
606 kumpf            1.94  
607 mday             1.25  		   return true;
608                        		}
609                        	     }
610 mday             1.37  	     catch(...)
611 mday             1.25  	     {
612                        	     }
613                        	     handled_events = true;
614                        	  }
615                               }
616 mday             1.24      }
617 kumpf            1.94  
618 mday             1.13      return(handled_events);
619 mike             1.2   }
620                        
621 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
622 kumpf            1.48  {
623                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
624 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
625 kumpf            1.48      _stopConnections = 1;
626 a.arora          1.73      tickle();
627 kumpf            1.48  
628 chuck            1.74      if (wait)
629 a.arora          1.73      {
630 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
631                              // caller of this function may unbind the ports while the monitor
632                              // is still accepting connections on them.
633 kumpf            1.101       _stopConnectionsSem.wait();
634 a.arora          1.73      }
635 r.kieninger      1.83  
636 kumpf            1.48      PEG_METHOD_EXIT();
637                        }
638 mday             1.25  
639 mday             1.37  
640 mday             1.25  int  Monitor::solicitSocketMessages(
641 david.dillard    1.95      PEGASUS_SOCKET socket,
642 mike             1.2       Uint32 events,
643 r.kieninger      1.83      Uint32 queueId,
644 mday             1.8       int type)
645 mike             1.2   {
646 r.kieninger      1.83     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
647 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
648 a.arora          1.73     // Check to see if we need to dynamically grow the _entries array
649                           // We always want the _entries array to 2 bigger than the
650                           // current connections requested
651                           _solicitSocketCount++;  // bump the count
652                           int size = (int)_entries.size();
653 w.otsuka         1.89     if((int)_solicitSocketCount >= (size-1)){
654                                for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
655 a.arora          1.73                  _MonitorEntry entry(0, 0, 0);
656                                        _entries.append(entry);
657                                }
658                           }
659 kumpf            1.4   
660 a.arora          1.73     int index;
661                           for(index = 1; index < (int)_entries.size(); index++)
662 mday             1.25     {
663 a.arora          1.73        try
664 mday             1.37        {
665 mike             1.96           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
666 a.arora          1.73           {
667                                    _entries[index].socket = socket;
668                                    _entries[index].queueId  = queueId;
669                                    _entries[index]._type = type;
670                                    _entries[index]._status = _MonitorEntry::IDLE;
671 r.kieninger      1.83  
672 a.arora          1.73              return index;
673                                 }
674 mday             1.37        }
675                              catch(...)
676 mday             1.25        {
677                              }
678                           }
679 a.arora          1.73     _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
680 mday             1.25     PEG_METHOD_EXIT();
681 kumpf            1.50     return -1;
682 a.arora          1.73  
683 mike             1.2   }
684                        
685 david.dillard    1.95  void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
686 mike             1.2   {
687 kumpf            1.50  
688 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
689 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
690 a.arora          1.73  
691                            /*
692                                Start at index = 1 because _entries[0] is the tickle entry which never needs
693                                to be EMPTY;
694                            */
695 w.otsuka         1.89      unsigned int index;
696 a.arora          1.73      for(index = 1; index < _entries.size(); index++)
697 mike             1.2       {
698 mday             1.25         if(_entries[index].socket == socket)
699                               {
700 a.arora          1.73            _entries[index]._status = _MonitorEntry::EMPTY;
701 david.dillard    1.95            _entries[index].socket = PEGASUS_INVALID_SOCKET;
702 a.arora          1.73            _solicitSocketCount--;
703                                  break;
704 mday             1.25         }
705 mike             1.2       }
706 a.arora          1.73  
707                            /*
708                        	Dynamic Contraction:
709                        	To remove excess entries we will start from the end of the _entries array
710                        	and remove all entries with EMPTY status until we find the first NON EMPTY.
711                        	This prevents the positions, of the NON EMPTY entries, from being changed.
712 r.kieninger      1.83      */
713 a.arora          1.73      index = _entries.size() - 1;
714 mike             1.96      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
715 a.arora          1.73  	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
716                                        _entries.remove(index);
717                        	index--;
718                            }
719 kumpf            1.4       PEG_METHOD_EXIT();
720 mike             1.2   }
721                        
722 a.arora          1.73  // Note: this is no longer called with PEP 183.
723 mday             1.7   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
724                        {
725 mday             1.8      HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
726 kumpf            1.51     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
727 kumpf            1.53          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
728                                dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
729 kumpf            1.51     try
730                           {
731                              dst->run(1);
732                           }
733                           catch (...)
734                           {
735                              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
736                                  "Monitor::_dispatch: exception received");
737                           }
738                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
739                                  "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
740 r.kieninger      1.83  
741 mike             1.96     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
742 kumpf            1.68  
743                           // Once the HTTPConnection thread has set the status value to either
744                           // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
745                           // to the Monitor.  It is no longer permissible to access the connection
746                           // or the entry in the _entries table.
747 kumpf            1.50     if (dst->_connectionClosePending)
748                           {
749 kumpf            1.68        dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
750                           }
751                           else
752                           {
753                              dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
754 kumpf            1.50     }
755 mday             1.8      return 0;
756 mday             1.40  }
757                        
758 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2