(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             // Author: Mike Brasher (mbrasher@bmc.com)
 33             //
 34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
 36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 40 mike             1.2   //
 41                        //%/////////////////////////////////////////////////////////////////////////////
 42                        
 43 mike             1.107 #include "Network.h"
 44 mike             1.2   #include <Pegasus/Common/Config.h>
 45                        #include <cstring>
 46                        #include "Monitor.h"
 47                        #include "MessageQueue.h"
 48                        #include "Socket.h"
 49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
 50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
 51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
 52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
 53 mike             1.100 #include "ArrayIterator.h"
 54 mike             1.2   
 55                        PEGASUS_USING_STD;
 56                        
 57                        PEGASUS_NAMESPACE_BEGIN
 58                        
 59 mike             1.96  static AtomicInt _connections(0);
 60 mday             1.25  
 61 mike             1.2   ////////////////////////////////////////////////////////////////////////////////
 62                        //
 63                        // Monitor
 64                        //
 65                        ////////////////////////////////////////////////////////////////////////////////
 66                        
 67 kumpf            1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 68 mike             1.2   Monitor::Monitor()
 69 kumpf            1.87     : _stopConnections(0),
 70 a.arora          1.73       _stopConnectionsSem(0),
 71 a.dunfey         1.76       _solicitSocketCount(0),
 72 a.dunfey         1.77       _tickle_client_socket(-1),
 73                             _tickle_server_socket(-1),
 74                             _tickle_peer_socket(-1)
 75 mike             1.2   {
 76 kumpf            1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 77 mike             1.2       Socket::initializeInterface();
 78 kumpf            1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 79 a.arora          1.73  
 80                            // setup the tickler
 81                            initializeTickler();
 82                        
 83 r.kieninger      1.83      // Start the count at 1 because initilizeTickler()
 84 a.arora          1.73      // has added an entry in the first position of the
 85                            // _entries array
 86                            for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 87 mday             1.37      {
 88                               _MonitorEntry entry(0, 0, 0);
 89                               _entries.append(entry);
 90                            }
 91 mday             1.18  }
 92 mday             1.20  
 93 mike             1.2   Monitor::~Monitor()
 94                        {
 95 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 96 a.dunfey         1.76  
 97                            try{
 98 a.dunfey         1.77          if(_tickle_peer_socket >= 0)
 99 a.dunfey         1.76          {
100                                    Socket::close(_tickle_peer_socket);
101                                }
102 a.dunfey         1.77          if(_tickle_client_socket >= 0)
103 a.dunfey         1.76          {
104                                    Socket::close(_tickle_client_socket);
105                                }
106 a.dunfey         1.77          if(_tickle_server_socket >= 0)
107 a.dunfey         1.76          {
108                                    Socket::close(_tickle_server_socket);
109                                }
110                            }
111                            catch(...)
112                            {
113                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
114                                          "Failed to close tickle sockets");
115                            }
116                        
117 mike             1.2       Socket::uninitializeInterface();
118 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
119                                          "returning from monitor destructor");
120 mday             1.18  }
121                        
122 a.arora          1.73  void Monitor::initializeTickler(){
123 r.kieninger      1.83      /*
124                               NOTE: On any errors trying to
125                                     setup out tickle connection,
126 a.arora          1.73               throw an exception/end the server
127                            */
128                        
129                            /* setup the tickle server/listener */
130                        
131                            // get a socket for the server side
132 david.dillard    1.95      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
133 a.arora          1.73  	//handle error
134                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
135                        				 "Received error number $0 while creating the internal socket.",
136 kumpf            1.110 				 getSocketError());
137 a.arora          1.73  	throw Exception(parms);
138                            }
139 marek            1.104     
140                            // set TCP_NODELAY
141                            int opt = 1;
142                            setsockopt(_tickle_server_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
143 a.arora          1.73  
144                            // initialize the address
145                            memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
146                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
147                        #pragma convert(37)
148                        #endif
149                            _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
150                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
151                        #pragma convert(0)
152                        #endif
153                            _tickle_server_addr.sin_family = PF_INET;
154                            _tickle_server_addr.sin_port = 0;
155                        
156 mike             1.109     SocketLength _addr_size = sizeof(_tickle_server_addr);
157 a.arora          1.73  
158                            // bind server side to socket
159                            if((::bind(_tickle_server_socket,
160 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
161 a.arora          1.73  	       sizeof(_tickle_server_addr))) < 0){
162                        	// handle error
163 r.kieninger      1.83  #ifdef PEGASUS_OS_ZOS
164                            MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
165                        				 "Received error:$0 while binding the internal socket.",strerror(errno));
166                        #else
167 a.arora          1.73  	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
168                        				 "Received error number $0 while binding the internal socket.",
169 kumpf            1.110 				 getSocketError());
170 r.kieninger      1.83  #endif
171 a.arora          1.73          throw Exception(parms);
172                            }
173                        
174                            // tell the kernel we are a server
175                            if((::listen(_tickle_server_socket,3)) < 0){
176                        	// handle error
177                        	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
178                        			 "Received error number $0 while listening to the internal socket.",
179 kumpf            1.110 				 getSocketError());
180 a.arora          1.73  	throw Exception(parms);
181                            }
182 r.kieninger      1.83  
183 a.arora          1.73      // make sure we have the correct socket for our server
184                            int sock = ::getsockname(_tickle_server_socket,
185 kumpf            1.88                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
186                                           &_addr_size);
187 a.arora          1.73      if(sock < 0){
188                        	// handle error
189                        	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
190                        			 "Received error number $0 while getting the internal socket name.",
191 kumpf            1.110 				 getSocketError());
192 a.arora          1.73  	throw Exception(parms);
193                            }
194                        
195                            /* set up the tickle client/connector */
196 r.kieninger      1.83  
197 a.arora          1.73      // get a socket for our tickle client
198 david.dillard    1.95      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
199 a.arora          1.73  	// handle error
200                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
201                        			 "Received error number $0 while creating the internal client socket.",
202 kumpf            1.110 				 getSocketError());
203 a.arora          1.73  	throw Exception(parms);
204                            }
205 marek            1.104     
206                            // set TCP_NODELAY
207                            setsockopt(_tickle_client_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
208 a.arora          1.73  
209                            // setup the address of the client
210                            memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
211                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
212                        #pragma convert(37)
213                        #endif
214                            _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
215                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
216                        #pragma convert(0)
217                        #endif
218                            _tickle_client_addr.sin_family = PF_INET;
219                            _tickle_client_addr.sin_port = 0;
220                        
221                            // bind socket to client side
222                            if((::bind(_tickle_client_socket,
223 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
224 a.arora          1.73  	       sizeof(_tickle_client_addr))) < 0){
225                        	// handle error
226                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
227                        			 "Received error number $0 while binding the internal client socket.",
228 kumpf            1.110 				 getSocketError());
229 a.arora          1.73  	throw Exception(parms);
230                            }
231                        
232                            // connect to server side
233                            if((::connect(_tickle_client_socket,
234 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
235 a.arora          1.73  		  sizeof(_tickle_server_addr))) < 0){
236                        	// handle error
237                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
238                        			 "Received error number $0 while connecting the internal client socket.",
239 kumpf            1.110 				 getSocketError());
240 a.arora          1.73  	throw Exception(parms);
241                            }
242                        
243                            /* set up the slave connection */
244                            memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
245 mike             1.109     SocketLength peer_size = sizeof(_tickle_peer_addr);
246 r.kieninger      1.83      pegasus_sleep(1);
247 a.arora          1.73  
248                            // this call may fail, we will try a max of 20 times to establish this peer connection
249                            if((_tickle_peer_socket = ::accept(_tickle_server_socket,
250 kumpf            1.88              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
251                                    &peer_size)) < 0){
252 a.arora          1.73  #if !defined(PEGASUS_OS_TYPE_WINDOWS)
253                                // Only retry on non-windows platforms.
254                                if(_tickle_peer_socket == -1 && errno == EAGAIN)
255                                {
256 r.kieninger      1.83            int retries = 0;
257 a.arora          1.73            do
258                                  {
259                                    pegasus_sleep(1);
260                                    _tickle_peer_socket = ::accept(_tickle_server_socket,
261 kumpf            1.88                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
262                                        &peer_size);
263 a.arora          1.73              retries++;
264                                  } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
265                                }
266                        #endif
267                            }
268                            if(_tickle_peer_socket == -1){
269                        	// handle error
270                        	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
271                        			 "Received error number $0 while accepting the internal socket connection.",
272 kumpf            1.110 				 getSocketError());
273 a.arora          1.73  	throw Exception(parms);
274                            }
275                            // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
276                            // checks entries with IDLE state for events
277                            _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
278                            entry._status = _MonitorEntry::IDLE;
279                            _entries.append(entry);
280                        }
281                        
282                        void Monitor::tickle(void)
283                        {
284 sushma.fernandes 1.78      static char _buffer[] =
285 a.arora          1.73      {
286                              '0','0'
287                            };
288 r.kieninger      1.83  
289 sushma.fernandes 1.78      AutoMutex autoMutex(_tickle_mutex);
290 r.kieninger      1.83      Socket::disableBlocking(_tickle_client_socket);
291 sushma.fernandes 1.78      Socket::write(_tickle_client_socket,&_buffer, 2);
292 r.kieninger      1.83      Socket::enableBlocking(_tickle_client_socket);
293 sushma.fernandes 1.78  }
294                        
295                        void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
296                        {
297                            // Set the state to requested state
298                            _entries[index]._status = status;
299 a.arora          1.73  }
300                        
301 mike             1.2   Boolean Monitor::run(Uint32 milliseconds)
302                        {
303 mday             1.18  
304 mday             1.25      Boolean handled_events = false;
305 a.arora          1.73      int i = 0;
306 r.kieninger      1.83  
307 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
308 a.arora          1.73  
309 mday             1.25      fd_set fdread;
310                            FD_ZERO(&fdread);
311 a.arora          1.73  
312 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
313 r.kieninger      1.83  
314 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
315                        
316 r.kieninger      1.83      // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
317 mike             1.96      if (_stopConnections.get() == 1)
318 kumpf            1.48      {
319 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
320 kumpf            1.48          {
321 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
322 kumpf            1.48              {
323 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
324 kumpf            1.48                  {
325 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
326                                                entries[indx]._status.get() == _MonitorEntry::DYING )
327 kumpf            1.48                     {
328                                               // remove the entry
329 mike             1.100 		       entries[indx]._status = _MonitorEntry::EMPTY;
330 kumpf            1.48                     }
331                                           else
332                                           {
333                                               // set status to DYING
334 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
335 kumpf            1.48                     }
336                                       }
337                                   }
338                                }
339                                _stopConnections = 0;
340 a.arora          1.73  	_stopConnectionsSem.signal();
341 kumpf            1.48      }
342 kumpf            1.51  
343 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
344 kumpf            1.68      {
345 mike             1.100 			 const _MonitorEntry &entry = entries[indx];
346 mike             1.96         if ((entry._status.get() == _MonitorEntry::DYING) &&
347 brian.campbell   1.80  					 (entry._type == Monitor::CONNECTION))
348 kumpf            1.68         {
349 brian.campbell   1.80            MessageQueue *q = MessageQueue::lookup(entry.queueId);
350 kumpf            1.68            PEGASUS_ASSERT(q != 0);
351 brian.campbell   1.80            HTTPConnection &h = *static_cast<HTTPConnection *>(q);
352 r.kieninger      1.83  
353 brian.campbell   1.80  					if (h._connectionClosePending == false)
354                        						continue;
355                        
356                        					// NOTE: do not attempt to delete while there are pending responses
357 r.kieninger      1.83  					// coming thru. The last response to come thru after a
358 brian.campbell   1.80  					// _connectionClosePending will reset _responsePending to false
359                        					// and then cause the monitor to rerun this code and clean up.
360                        					// (see HTTPConnection.cpp)
361                        
362                        					if (h._responsePending == true)
363                        					{
364                        						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
365                        													"Ignoring connection delete request because "
366                        													"responses are still pending. "
367 r.kieninger      1.83  													"connection=0x%p, socket=%d\n",
368 brian.campbell   1.81  													(void *)&h, h.getSocket());
369 brian.campbell   1.80  						continue;
370                        					}
371                        					h._connectionClosePending = false;
372                                  MessageQueue &o = h.get_owner();
373                                  Message* message= new CloseConnectionMessage(entry.socket);
374 kumpf            1.68            message->dest = o.getQueueId();
375                        
376 r.kieninger      1.83            // HTTPAcceptor is responsible for closing the connection.
377 kumpf            1.68            // The lock is released to allow HTTPAcceptor to call
378 r.kieninger      1.83            // unsolicitSocketMessages to free the entry.
379 kumpf            1.68            // Once HTTPAcceptor completes processing of the close
380                                  // connection, the lock is re-requested and processing of
381                                  // the for loop continues.  This is safe with the current
382 mike             1.100           // implementation of the entries object.  Note that the
383                                  // loop condition accesses the entries.size() on each
384 kumpf            1.68            // iteration, so that a change in size while the mutex is
385                                  // unlocked will not result in an ArrayIndexOutOfBounds
386                                  // exception.
387                        
388 kumpf            1.94            autoEntryMutex.unlock();
389 kumpf            1.68            o.enqueue(message);
390 kumpf            1.94            autoEntryMutex.lock();
391 r.kieninger      1.102           // After enqueue a message and the autoEntryMutex has been released and locked again,
392                                  // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
393                                  entries.reset(_entries);
394 kumpf            1.68         }
395                            }
396                        
397 kumpf            1.51      Uint32 _idleEntries = 0;
398 r.kieninger      1.83  
399 a.arora          1.73      /*
400 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
401                                to the kernel as a parameter to SELECT.  This loop seems like a good
402                                place to calculate the max file descriptor (maximum socket number)
403                                because we have to traverse the entire array.
404 r.kieninger      1.83      */
405 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
406 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
407 mike             1.2       {
408 mike             1.100        if(maxSocketCurrentPass < entries[indx].socket)
409                                    maxSocketCurrentPass = entries[indx].socket;
410 a.arora          1.73  
411 mike             1.100        if(entries[indx]._status.get() == _MonitorEntry::IDLE)
412 mday             1.25         {
413 david.dillard    1.95             _idleEntries++;
414 mike             1.100            FD_SET(entries[indx].socket, &fdread);
415 mday             1.25         }
416 mday             1.13      }
417 s.hills          1.62  
418 a.arora          1.73      /*
419 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
420                                descriptors starting at 0.
421 a.arora          1.73      */
422                            maxSocketCurrentPass++;
423                        
424 kumpf            1.94      autoEntryMutex.unlock();
425 david.dillard    1.95  
426                            //
427                            // The first argument to select() is ignored on Windows and it is not
428                            // a socket value.  The original code assumed that the number of sockets
429                            // and a socket value have the same type.  On Windows they do not.
430                            //
431                        #ifdef PEGASUS_OS_TYPE_WINDOWS
432                            int events = select(0, &fdread, NULL, NULL, &tv);
433                        #else
434 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
435 david.dillard    1.95  #endif
436 kumpf            1.94      autoEntryMutex.lock();
437 r.kieninger      1.102     // After enqueue a message and the autoEntryMutex has been released and locked again,
438                            // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
439                            entries.reset(_entries);
440 mike             1.106 
441                            if (events == PEGASUS_SOCKET_ERROR)
442 mday             1.13      {
443 kumpf            1.50         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
444                                  "Monitor::run - errorno = %d has occurred on select.", errno);
445                               // The EBADF error indicates that one or more or the file
446                               // descriptions was not valid. This could indicate that
447 mike             1.100        // the entries structure has been corrupted or that
448 kumpf            1.50         // we have a synchronization error.
449                        
450                               PEGASUS_ASSERT(errno != EBADF);
451                            }
452                            else if (events)
453                            {
454 kumpf            1.51         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
455 r.kieninger      1.83            "Monitor::run select event received events = %d, monitoring %d idle entries",
456 david.dillard    1.95             events, _idleEntries);
457 mike             1.100        for( int indx = 0; indx < (int)entries.size(); indx++)
458 mday             1.25         {
459 kumpf            1.53            // The Monitor should only look at entries in the table that are IDLE (i.e.,
460                                  // owned by the Monitor).
461 mike             1.100           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
462                                     (FD_ISSET(entries[indx].socket, &fdread)))
463 david.dillard    1.95            {
464 mike             1.100              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
465 kumpf            1.53               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
466                                          "Monitor::run indx = %d, queueId =  %d, q = %p",
467 mike             1.100                   indx, entries[indx].queueId, q);
468 kumpf            1.53               PEGASUS_ASSERT(q !=0);
469 mday             1.37  
470 david.dillard    1.95               try
471                                     {
472 mike             1.100                 if(entries[indx]._type == Monitor::CONNECTION)
473 david.dillard    1.95                  {
474 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
475 mike             1.100                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
476 david.dillard    1.95                     static_cast<HTTPConnection *>(q)->_entry_index = indx;
477 sushma.fernandes 1.78  
478                                           // Do not update the entry just yet. The entry gets updated once
479 r.kieninger      1.83                     // the request has been read.
480 mike             1.100                    //entries[indx]._status = _MonitorEntry::BUSY;
481 sushma.fernandes 1.78  
482 kumpf            1.66                     // If allocate_and_awaken failure, retry on next iteration
483 a.arora          1.73  /* Removed for PEP 183.
484 kumpf            1.69                     if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
485                                                   (void *)q, _dispatch))
486 kumpf            1.67                     {
487                                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
488                                                  "Monitor::run: Insufficient resources to process request.");
489 mike             1.100                       entries[indx]._status = _MonitorEntry::IDLE;
490 kumpf            1.67                        return true;
491                                           }
492 a.arora          1.73  */
493                        // Added for PEP 183
494 david.dillard    1.95                     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
495                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
496 a.arora          1.73                           "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
497                                           dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
498                                           try
499                                           {
500                                               dst->run(1);
501                                           }
502 david.dillard    1.95                     catch (...)
503                                           {
504                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
505                                               "Monitor::_dispatch: exception received");
506                                           }
507                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
508 a.arora          1.73                     "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
509                        
510 sushma.fernandes 1.78                     // It is possible the entry status may not be set to busy.
511 r.kieninger      1.83                     // The following will fail in that case.
512 mike             1.96     		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
513 a.arora          1.73  		   // Once the HTTPConnection thread has set the status value to either
514                        		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
515                        		   // to the Monitor.  It is no longer permissible to access the connection
516                        		   // or the entry in the _entries table.
517 sushma.fernandes 1.78  
518                                           // The following is not relevant as the worker thread or the
519                                           // reader thread will update the status of the entry.
520                        		   //if (dst->_connectionClosePending)
521 r.kieninger      1.83  		   //{
522 sushma.fernandes 1.78  		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
523                        		   //}
524                        		   //else
525                        		   //{
526                        		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
527 r.kieninger      1.83  		   //}
528                        // end Added for PEP 183
529 a.arora          1.73  		}
530 mike             1.100 	        else if( entries[indx]._type == Monitor::INTERNAL){
531 r.kieninger      1.83  			// set ourself to BUSY,
532                                                // read the data
533 a.arora          1.73                          // and set ourself back to IDLE
534 r.kieninger      1.83  
535 mike             1.100 		   	entries[indx]._status = _MonitorEntry::BUSY;
536 a.arora          1.73  			static char buffer[2];
537 mike             1.100       			Socket::disableBlocking(entries[indx].socket);
538                              			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
539                              			Socket::enableBlocking(entries[indx].socket);
540                        			entries[indx]._status = _MonitorEntry::IDLE;
541 mday             1.37  		}
542                        		else
543 mday             1.25  		{
544 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
545                                             "Non-connection entry, indx = %d, has been received.", indx);
546 mday             1.37  		   int events = 0;
547                        		   events |= SocketMessage::READ;
548 mike             1.100 		   Message *msg = new SocketMessage(entries[indx].socket, events);
549                        		   entries[indx]._status = _MonitorEntry::BUSY;
550 kumpf            1.94                     autoEntryMutex.unlock();
551 mday             1.37  		   q->enqueue(msg);
552 kumpf            1.94                     autoEntryMutex.lock();
553 r.kieninger      1.102            // After enqueue a message and the autoEntryMutex has been released and locked again,
554                                   // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
555                                   entries.reset(_entries);
556 mike             1.100 		   entries[indx]._status = _MonitorEntry::IDLE;
557 kumpf            1.94  
558 mday             1.25  		   return true;
559                        		}
560                        	     }
561 mday             1.37  	     catch(...)
562 mday             1.25  	     {
563                        	     }
564                        	     handled_events = true;
565                        	  }
566                               }
567 mday             1.24      }
568 kumpf            1.94  
569 mday             1.13      return(handled_events);
570 mike             1.2   }
571                        
572 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
573 kumpf            1.48  {
574                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
575 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
576 kumpf            1.48      _stopConnections = 1;
577 a.arora          1.73      tickle();
578 kumpf            1.48  
579 chuck            1.74      if (wait)
580 a.arora          1.73      {
581 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
582                              // caller of this function may unbind the ports while the monitor
583                              // is still accepting connections on them.
584 kumpf            1.101       _stopConnectionsSem.wait();
585 a.arora          1.73      }
586 r.kieninger      1.83  
587 kumpf            1.48      PEG_METHOD_EXIT();
588                        }
589 mday             1.25  
590 mday             1.37  
591 mday             1.25  int  Monitor::solicitSocketMessages(
592 mike             1.106     SocketHandle socket,
593 mike             1.2       Uint32 events,
594 r.kieninger      1.83      Uint32 queueId,
595 mday             1.8       int type)
596 mike             1.2   {
597 r.kieninger      1.83     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
598 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
599 a.arora          1.73     // Check to see if we need to dynamically grow the _entries array
600                           // We always want the _entries array to 2 bigger than the
601                           // current connections requested
602                           _solicitSocketCount++;  // bump the count
603                           int size = (int)_entries.size();
604 w.otsuka         1.89     if((int)_solicitSocketCount >= (size-1)){
605                                for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
606 a.arora          1.73                  _MonitorEntry entry(0, 0, 0);
607                                        _entries.append(entry);
608                                }
609                           }
610 kumpf            1.4   
611 a.arora          1.73     int index;
612                           for(index = 1; index < (int)_entries.size(); index++)
613 mday             1.25     {
614 a.arora          1.73        try
615 mday             1.37        {
616 mike             1.96           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
617 a.arora          1.73           {
618                                    _entries[index].socket = socket;
619                                    _entries[index].queueId  = queueId;
620                                    _entries[index]._type = type;
621                                    _entries[index]._status = _MonitorEntry::IDLE;
622 r.kieninger      1.83  
623 a.arora          1.73              return index;
624                                 }
625 mday             1.37        }
626                              catch(...)
627 mday             1.25        {
628                              }
629                           }
630 a.arora          1.73     _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
631 mday             1.25     PEG_METHOD_EXIT();
632 kumpf            1.50     return -1;
633 a.arora          1.73  
634 mike             1.2   }
635                        
636 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
637 mike             1.2   {
638 kumpf            1.50  
639 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
640 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
641 a.arora          1.73  
642                            /*
643                                Start at index = 1 because _entries[0] is the tickle entry which never needs
644                                to be EMPTY;
645                            */
646 w.otsuka         1.89      unsigned int index;
647 a.arora          1.73      for(index = 1; index < _entries.size(); index++)
648 mike             1.2       {
649 mday             1.25         if(_entries[index].socket == socket)
650                               {
651 a.arora          1.73            _entries[index]._status = _MonitorEntry::EMPTY;
652 david.dillard    1.95            _entries[index].socket = PEGASUS_INVALID_SOCKET;
653 a.arora          1.73            _solicitSocketCount--;
654                                  break;
655 mday             1.25         }
656 mike             1.2       }
657 a.arora          1.73  
658                            /*
659                        	Dynamic Contraction:
660                        	To remove excess entries we will start from the end of the _entries array
661                        	and remove all entries with EMPTY status until we find the first NON EMPTY.
662                        	This prevents the positions, of the NON EMPTY entries, from being changed.
663 r.kieninger      1.83      */
664 a.arora          1.73      index = _entries.size() - 1;
665 mike             1.96      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
666 a.arora          1.73  	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
667                                        _entries.remove(index);
668                        	index--;
669                            }
670 kumpf            1.4       PEG_METHOD_EXIT();
671 mike             1.2   }
672                        
673 a.arora          1.73  // Note: this is no longer called with PEP 183.
674 mday             1.7   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
675                        {
676 mday             1.8      HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
677 kumpf            1.51     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
678 kumpf            1.53          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
679                                dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
680 kumpf            1.51     try
681                           {
682                              dst->run(1);
683                           }
684                           catch (...)
685                           {
686                              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
687                                  "Monitor::_dispatch: exception received");
688                           }
689                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
690                                  "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
691 r.kieninger      1.83  
692 mike             1.96     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
693 kumpf            1.68  
694                           // Once the HTTPConnection thread has set the status value to either
695                           // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
696                           // to the Monitor.  It is no longer permissible to access the connection
697                           // or the entry in the _entries table.
698 kumpf            1.50     if (dst->_connectionClosePending)
699                           {
700 kumpf            1.68        dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
701                           }
702                           else
703                           {
704                              dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
705 kumpf            1.50     }
706 mday             1.8      return 0;
707 mday             1.40  }
708                        
709 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2