(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             // Author: Mike Brasher (mbrasher@bmc.com)
 33             //
 34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
 36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 40 mike             1.2   //
 41                        //%/////////////////////////////////////////////////////////////////////////////
 42                        
 43 mike             1.107 #include "Network.h"
 44 mike             1.2   #include <Pegasus/Common/Config.h>
 45                        #include <cstring>
 46                        #include "Monitor.h"
 47                        #include "MessageQueue.h"
 48                        #include "Socket.h"
 49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
 50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
 51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
 52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
 53 mike             1.100 #include "ArrayIterator.h"
 54 mike             1.2   
 55                        PEGASUS_USING_STD;
 56                        
 57                        PEGASUS_NAMESPACE_BEGIN
 58                        
 59 mike             1.96  static AtomicInt _connections(0);
 60 mday             1.25  
 61 mike             1.2   ////////////////////////////////////////////////////////////////////////////////
 62                        //
 63 mike             1.106 // _getError()
 64                        //
 65                        ////////////////////////////////////////////////////////////////////////////////
 66                        
 67                        static inline int _getError()
 68                        {
 69                        #ifdef PEGASUS_OS_TYPE_WINDOWS
 70 mike             1.108     return WSAGetLastError();
 71 mike             1.106 #else
 72                            return errno;
 73                        #endif
 74                        }
 75                        
 76                        ////////////////////////////////////////////////////////////////////////////////
 77                        //
 78 mike             1.2   // Monitor
 79                        //
 80                        ////////////////////////////////////////////////////////////////////////////////
 81                        
 82 kumpf            1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 83 mike             1.2   Monitor::Monitor()
 84 kumpf            1.87     : _stopConnections(0),
 85 a.arora          1.73       _stopConnectionsSem(0),
 86 a.dunfey         1.76       _solicitSocketCount(0),
 87 a.dunfey         1.77       _tickle_client_socket(-1),
 88                             _tickle_server_socket(-1),
 89                             _tickle_peer_socket(-1)
 90 mike             1.2   {
 91 kumpf            1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 92 mike             1.2       Socket::initializeInterface();
 93 kumpf            1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 94 a.arora          1.73  
 95                            // setup the tickler
 96                            initializeTickler();
 97                        
 98 r.kieninger      1.83      // Start the count at 1 because initilizeTickler()
 99 a.arora          1.73      // has added an entry in the first position of the
100                            // _entries array
101                            for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
102 mday             1.37      {
103                               _MonitorEntry entry(0, 0, 0);
104                               _entries.append(entry);
105                            }
106 mday             1.18  }
107 mday             1.20  
108 mike             1.2   Monitor::~Monitor()
109                        {
110 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
111 a.dunfey         1.76  
112                            try{
113 a.dunfey         1.77          if(_tickle_peer_socket >= 0)
114 a.dunfey         1.76          {
115                                    Socket::close(_tickle_peer_socket);
116                                }
117 a.dunfey         1.77          if(_tickle_client_socket >= 0)
118 a.dunfey         1.76          {
119                                    Socket::close(_tickle_client_socket);
120                                }
121 a.dunfey         1.77          if(_tickle_server_socket >= 0)
122 a.dunfey         1.76          {
123                                    Socket::close(_tickle_server_socket);
124                                }
125                            }
126                            catch(...)
127                            {
128                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
129                                          "Failed to close tickle sockets");
130                            }
131                        
132 mike             1.2       Socket::uninitializeInterface();
133 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
134                                          "returning from monitor destructor");
135 mday             1.18  }
136                        
137 a.arora          1.73  void Monitor::initializeTickler(){
138 r.kieninger      1.83      /*
139                               NOTE: On any errors trying to
140                                     setup out tickle connection,
141 a.arora          1.73               throw an exception/end the server
142                            */
143                        
144                            /* setup the tickle server/listener */
145                        
146                            // get a socket for the server side
147 david.dillard    1.95      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
148 a.arora          1.73  	//handle error
149                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
150                        				 "Received error number $0 while creating the internal socket.",
151 mike             1.106 				 _getError());
152 a.arora          1.73  	throw Exception(parms);
153                            }
154 marek            1.104     
155                            // set TCP_NODELAY
156                            int opt = 1;
157                            setsockopt(_tickle_server_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
158 a.arora          1.73  
159                            // initialize the address
160                            memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
161                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
162                        #pragma convert(37)
163                        #endif
164                            _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
165                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
166                        #pragma convert(0)
167                        #endif
168                            _tickle_server_addr.sin_family = PF_INET;
169                            _tickle_server_addr.sin_port = 0;
170                        
171 mike             1.106     socklen_t _addr_size = sizeof(_tickle_server_addr);
172 a.arora          1.73  
173                            // bind server side to socket
174                            if((::bind(_tickle_server_socket,
175 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
176 a.arora          1.73  	       sizeof(_tickle_server_addr))) < 0){
177                        	// handle error
178 r.kieninger      1.83  #ifdef PEGASUS_OS_ZOS
179                            MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
180                        				 "Received error:$0 while binding the internal socket.",strerror(errno));
181                        #else
182 a.arora          1.73  	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
183                        				 "Received error number $0 while binding the internal socket.",
184 mike             1.106 				 _getError());
185 r.kieninger      1.83  #endif
186 a.arora          1.73          throw Exception(parms);
187                            }
188                        
189                            // tell the kernel we are a server
190                            if((::listen(_tickle_server_socket,3)) < 0){
191                        	// handle error
192                        	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
193                        			 "Received error number $0 while listening to the internal socket.",
194 mike             1.106 				 _getError());
195 a.arora          1.73  	throw Exception(parms);
196                            }
197 r.kieninger      1.83  
198 a.arora          1.73      // make sure we have the correct socket for our server
199                            int sock = ::getsockname(_tickle_server_socket,
200 kumpf            1.88                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
201                                           &_addr_size);
202 a.arora          1.73      if(sock < 0){
203                        	// handle error
204                        	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
205                        			 "Received error number $0 while getting the internal socket name.",
206 mike             1.106 				 _getError());
207 a.arora          1.73  	throw Exception(parms);
208                            }
209                        
210                            /* set up the tickle client/connector */
211 r.kieninger      1.83  
212 a.arora          1.73      // get a socket for our tickle client
213 david.dillard    1.95      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
214 a.arora          1.73  	// handle error
215                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
216                        			 "Received error number $0 while creating the internal client socket.",
217 mike             1.106 				 _getError());
218 a.arora          1.73  	throw Exception(parms);
219                            }
220 marek            1.104     
221                            // set TCP_NODELAY
222                            setsockopt(_tickle_client_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
223 a.arora          1.73  
224                            // setup the address of the client
225                            memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
226                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
227                        #pragma convert(37)
228                        #endif
229                            _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
230                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
231                        #pragma convert(0)
232                        #endif
233                            _tickle_client_addr.sin_family = PF_INET;
234                            _tickle_client_addr.sin_port = 0;
235                        
236                            // bind socket to client side
237                            if((::bind(_tickle_client_socket,
238 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
239 a.arora          1.73  	       sizeof(_tickle_client_addr))) < 0){
240                        	// handle error
241                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
242                        			 "Received error number $0 while binding the internal client socket.",
243 mike             1.106 				 _getError());
244 a.arora          1.73  	throw Exception(parms);
245                            }
246                        
247                            // connect to server side
248                            if((::connect(_tickle_client_socket,
249 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
250 a.arora          1.73  		  sizeof(_tickle_server_addr))) < 0){
251                        	// handle error
252                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
253                        			 "Received error number $0 while connecting the internal client socket.",
254 mike             1.106 				 _getError());
255 a.arora          1.73  	throw Exception(parms);
256                            }
257                        
258                            /* set up the slave connection */
259                            memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
260 mike             1.106     socklen_t peer_size = sizeof(_tickle_peer_addr);
261 r.kieninger      1.83      pegasus_sleep(1);
262 a.arora          1.73  
263                            // this call may fail, we will try a max of 20 times to establish this peer connection
264                            if((_tickle_peer_socket = ::accept(_tickle_server_socket,
265 kumpf            1.88              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
266                                    &peer_size)) < 0){
267 a.arora          1.73  #if !defined(PEGASUS_OS_TYPE_WINDOWS)
268                                // Only retry on non-windows platforms.
269                                if(_tickle_peer_socket == -1 && errno == EAGAIN)
270                                {
271 r.kieninger      1.83            int retries = 0;
272 a.arora          1.73            do
273                                  {
274                                    pegasus_sleep(1);
275                                    _tickle_peer_socket = ::accept(_tickle_server_socket,
276 kumpf            1.88                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
277                                        &peer_size);
278 a.arora          1.73              retries++;
279                                  } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
280                                }
281                        #endif
282                            }
283                            if(_tickle_peer_socket == -1){
284                        	// handle error
285                        	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
286                        			 "Received error number $0 while accepting the internal socket connection.",
287 mike             1.106 				 _getError());
288 a.arora          1.73  	throw Exception(parms);
289                            }
290                            // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
291                            // checks entries with IDLE state for events
292                            _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
293                            entry._status = _MonitorEntry::IDLE;
294                            _entries.append(entry);
295                        }
296                        
297                        void Monitor::tickle(void)
298                        {
299 sushma.fernandes 1.78      static char _buffer[] =
300 a.arora          1.73      {
301                              '0','0'
302                            };
303 r.kieninger      1.83  
304 sushma.fernandes 1.78      AutoMutex autoMutex(_tickle_mutex);
305 r.kieninger      1.83      Socket::disableBlocking(_tickle_client_socket);
306 sushma.fernandes 1.78      Socket::write(_tickle_client_socket,&_buffer, 2);
307 r.kieninger      1.83      Socket::enableBlocking(_tickle_client_socket);
308 sushma.fernandes 1.78  }
309                        
310                        void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
311                        {
312                            // Set the state to requested state
313                            _entries[index]._status = status;
314 a.arora          1.73  }
315                        
316 mike             1.2   Boolean Monitor::run(Uint32 milliseconds)
317                        {
318 mday             1.18  
319 mday             1.25      Boolean handled_events = false;
320 a.arora          1.73      int i = 0;
321 r.kieninger      1.83  
322 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
323 a.arora          1.73  
324 mday             1.25      fd_set fdread;
325                            FD_ZERO(&fdread);
326 a.arora          1.73  
327 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
328 r.kieninger      1.83  
329 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
330                        
331 r.kieninger      1.83      // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
332 mike             1.96      if (_stopConnections.get() == 1)
333 kumpf            1.48      {
334 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
335 kumpf            1.48          {
336 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
337 kumpf            1.48              {
338 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
339 kumpf            1.48                  {
340 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
341                                                entries[indx]._status.get() == _MonitorEntry::DYING )
342 kumpf            1.48                     {
343                                               // remove the entry
344 mike             1.100 		       entries[indx]._status = _MonitorEntry::EMPTY;
345 kumpf            1.48                     }
346                                           else
347                                           {
348                                               // set status to DYING
349 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
350 kumpf            1.48                     }
351                                       }
352                                   }
353                                }
354                                _stopConnections = 0;
355 a.arora          1.73  	_stopConnectionsSem.signal();
356 kumpf            1.48      }
357 kumpf            1.51  
358 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
359 kumpf            1.68      {
360 mike             1.100 			 const _MonitorEntry &entry = entries[indx];
361 mike             1.96         if ((entry._status.get() == _MonitorEntry::DYING) &&
362 brian.campbell   1.80  					 (entry._type == Monitor::CONNECTION))
363 kumpf            1.68         {
364 brian.campbell   1.80            MessageQueue *q = MessageQueue::lookup(entry.queueId);
365 kumpf            1.68            PEGASUS_ASSERT(q != 0);
366 brian.campbell   1.80            HTTPConnection &h = *static_cast<HTTPConnection *>(q);
367 r.kieninger      1.83  
368 brian.campbell   1.80  					if (h._connectionClosePending == false)
369                        						continue;
370                        
371                        					// NOTE: do not attempt to delete while there are pending responses
372 r.kieninger      1.83  					// coming thru. The last response to come thru after a
373 brian.campbell   1.80  					// _connectionClosePending will reset _responsePending to false
374                        					// and then cause the monitor to rerun this code and clean up.
375                        					// (see HTTPConnection.cpp)
376                        
377                        					if (h._responsePending == true)
378                        					{
379                        						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
380                        													"Ignoring connection delete request because "
381                        													"responses are still pending. "
382 r.kieninger      1.83  													"connection=0x%p, socket=%d\n",
383 brian.campbell   1.81  													(void *)&h, h.getSocket());
384 brian.campbell   1.80  						continue;
385                        					}
386                        					h._connectionClosePending = false;
387                                  MessageQueue &o = h.get_owner();
388                                  Message* message= new CloseConnectionMessage(entry.socket);
389 kumpf            1.68            message->dest = o.getQueueId();
390                        
391 r.kieninger      1.83            // HTTPAcceptor is responsible for closing the connection.
392 kumpf            1.68            // The lock is released to allow HTTPAcceptor to call
393 r.kieninger      1.83            // unsolicitSocketMessages to free the entry.
394 kumpf            1.68            // Once HTTPAcceptor completes processing of the close
395                                  // connection, the lock is re-requested and processing of
396                                  // the for loop continues.  This is safe with the current
397 mike             1.100           // implementation of the entries object.  Note that the
398                                  // loop condition accesses the entries.size() on each
399 kumpf            1.68            // iteration, so that a change in size while the mutex is
400                                  // unlocked will not result in an ArrayIndexOutOfBounds
401                                  // exception.
402                        
403 kumpf            1.94            autoEntryMutex.unlock();
404 kumpf            1.68            o.enqueue(message);
405 kumpf            1.94            autoEntryMutex.lock();
406 r.kieninger      1.102           // After enqueue a message and the autoEntryMutex has been released and locked again,
407                                  // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
408                                  entries.reset(_entries);
409 kumpf            1.68         }
410                            }
411                        
412 kumpf            1.51      Uint32 _idleEntries = 0;
413 r.kieninger      1.83  
414 a.arora          1.73      /*
415 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
416                                to the kernel as a parameter to SELECT.  This loop seems like a good
417                                place to calculate the max file descriptor (maximum socket number)
418                                because we have to traverse the entire array.
419 r.kieninger      1.83      */
420 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
421 mike             1.100     for( int indx = 0; indx < (int)entries.size(); indx++)
422 mike             1.2       {
423 mike             1.100        if(maxSocketCurrentPass < entries[indx].socket)
424                                    maxSocketCurrentPass = entries[indx].socket;
425 a.arora          1.73  
426 mike             1.100        if(entries[indx]._status.get() == _MonitorEntry::IDLE)
427 mday             1.25         {
428 david.dillard    1.95             _idleEntries++;
429 mike             1.100            FD_SET(entries[indx].socket, &fdread);
430 mday             1.25         }
431 mday             1.13      }
432 s.hills          1.62  
433 a.arora          1.73      /*
434 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
435                                descriptors starting at 0.
436 a.arora          1.73      */
437                            maxSocketCurrentPass++;
438                        
439 kumpf            1.94      autoEntryMutex.unlock();
440 david.dillard    1.95  
441                            //
442                            // The first argument to select() is ignored on Windows and it is not
443                            // a socket value.  The original code assumed that the number of sockets
444                            // and a socket value have the same type.  On Windows they do not.
445                            //
446                        #ifdef PEGASUS_OS_TYPE_WINDOWS
447                            int events = select(0, &fdread, NULL, NULL, &tv);
448                        #else
449 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
450 david.dillard    1.95  #endif
451 kumpf            1.94      autoEntryMutex.lock();
452 r.kieninger      1.102     // After enqueue a message and the autoEntryMutex has been released and locked again,
453                            // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
454                            entries.reset(_entries);
455 mike             1.106 
456                            if (events == PEGASUS_SOCKET_ERROR)
457 mday             1.13      {
458 kumpf            1.50         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
459                                  "Monitor::run - errorno = %d has occurred on select.", errno);
460                               // The EBADF error indicates that one or more or the file
461                               // descriptions was not valid. This could indicate that
462 mike             1.100        // the entries structure has been corrupted or that
463 kumpf            1.50         // we have a synchronization error.
464                        
465                               PEGASUS_ASSERT(errno != EBADF);
466                            }
467                            else if (events)
468                            {
469 kumpf            1.51         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
470 r.kieninger      1.83            "Monitor::run select event received events = %d, monitoring %d idle entries",
471 david.dillard    1.95             events, _idleEntries);
472 mike             1.100        for( int indx = 0; indx < (int)entries.size(); indx++)
473 mday             1.25         {
474 kumpf            1.53            // The Monitor should only look at entries in the table that are IDLE (i.e.,
475                                  // owned by the Monitor).
476 mike             1.100           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
477                                     (FD_ISSET(entries[indx].socket, &fdread)))
478 david.dillard    1.95            {
479 mike             1.100              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
480 kumpf            1.53               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
481                                          "Monitor::run indx = %d, queueId =  %d, q = %p",
482 mike             1.100                   indx, entries[indx].queueId, q);
483 kumpf            1.53               PEGASUS_ASSERT(q !=0);
484 mday             1.37  
485 david.dillard    1.95               try
486                                     {
487 mike             1.100                 if(entries[indx]._type == Monitor::CONNECTION)
488 david.dillard    1.95                  {
489 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
490 mike             1.100                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
491 david.dillard    1.95                     static_cast<HTTPConnection *>(q)->_entry_index = indx;
492 sushma.fernandes 1.78  
493                                           // Do not update the entry just yet. The entry gets updated once
494 r.kieninger      1.83                     // the request has been read.
495 mike             1.100                    //entries[indx]._status = _MonitorEntry::BUSY;
496 sushma.fernandes 1.78  
497 kumpf            1.66                     // If allocate_and_awaken failure, retry on next iteration
498 a.arora          1.73  /* Removed for PEP 183.
499 kumpf            1.69                     if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
500                                                   (void *)q, _dispatch))
501 kumpf            1.67                     {
502                                              Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503                                                  "Monitor::run: Insufficient resources to process request.");
504 mike             1.100                       entries[indx]._status = _MonitorEntry::IDLE;
505 kumpf            1.67                        return true;
506                                           }
507 a.arora          1.73  */
508                        // Added for PEP 183
509 david.dillard    1.95                     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
510                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
511 a.arora          1.73                           "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
512                                           dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
513                                           try
514                                           {
515                                               dst->run(1);
516                                           }
517 david.dillard    1.95                     catch (...)
518                                           {
519                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
520                                               "Monitor::_dispatch: exception received");
521                                           }
522                                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
523 a.arora          1.73                     "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
524                        
525 sushma.fernandes 1.78                     // It is possible the entry status may not be set to busy.
526 r.kieninger      1.83                     // The following will fail in that case.
527 mike             1.96     		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
528 a.arora          1.73  		   // Once the HTTPConnection thread has set the status value to either
529                        		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
530                        		   // to the Monitor.  It is no longer permissible to access the connection
531                        		   // or the entry in the _entries table.
532 sushma.fernandes 1.78  
533                                           // The following is not relevant as the worker thread or the
534                                           // reader thread will update the status of the entry.
535                        		   //if (dst->_connectionClosePending)
536 r.kieninger      1.83  		   //{
537 sushma.fernandes 1.78  		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
538                        		   //}
539                        		   //else
540                        		   //{
541                        		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
542 r.kieninger      1.83  		   //}
543                        // end Added for PEP 183
544 a.arora          1.73  		}
545 mike             1.100 	        else if( entries[indx]._type == Monitor::INTERNAL){
546 r.kieninger      1.83  			// set ourself to BUSY,
547                                                // read the data
548 a.arora          1.73                          // and set ourself back to IDLE
549 r.kieninger      1.83  
550 mike             1.100 		   	entries[indx]._status = _MonitorEntry::BUSY;
551 a.arora          1.73  			static char buffer[2];
552 mike             1.100       			Socket::disableBlocking(entries[indx].socket);
553                              			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
554                              			Socket::enableBlocking(entries[indx].socket);
555                        			entries[indx]._status = _MonitorEntry::IDLE;
556 mday             1.37  		}
557                        		else
558 mday             1.25  		{
559 kumpf            1.51                     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
560                                             "Non-connection entry, indx = %d, has been received.", indx);
561 mday             1.37  		   int events = 0;
562                        		   events |= SocketMessage::READ;
563 mike             1.100 		   Message *msg = new SocketMessage(entries[indx].socket, events);
564                        		   entries[indx]._status = _MonitorEntry::BUSY;
565 kumpf            1.94                     autoEntryMutex.unlock();
566 mday             1.37  		   q->enqueue(msg);
567 kumpf            1.94                     autoEntryMutex.lock();
568 r.kieninger      1.102            // After enqueue a message and the autoEntryMutex has been released and locked again,
569                                   // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
570                                   entries.reset(_entries);
571 mike             1.100 		   entries[indx]._status = _MonitorEntry::IDLE;
572 kumpf            1.94  
573 mday             1.25  		   return true;
574                        		}
575                        	     }
576 mday             1.37  	     catch(...)
577 mday             1.25  	     {
578                        	     }
579                        	     handled_events = true;
580                        	  }
581                               }
582 mday             1.24      }
583 kumpf            1.94  
584 mday             1.13      return(handled_events);
585 mike             1.2   }
586                        
587 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
588 kumpf            1.48  {
589                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
590 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
591 kumpf            1.48      _stopConnections = 1;
592 a.arora          1.73      tickle();
593 kumpf            1.48  
594 chuck            1.74      if (wait)
595 a.arora          1.73      {
596 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
597                              // caller of this function may unbind the ports while the monitor
598                              // is still accepting connections on them.
599 kumpf            1.101       _stopConnectionsSem.wait();
600 a.arora          1.73      }
601 r.kieninger      1.83  
602 kumpf            1.48      PEG_METHOD_EXIT();
603                        }
604 mday             1.25  
605 mday             1.37  
606 mday             1.25  int  Monitor::solicitSocketMessages(
607 mike             1.106     SocketHandle socket,
608 mike             1.2       Uint32 events,
609 r.kieninger      1.83      Uint32 queueId,
610 mday             1.8       int type)
611 mike             1.2   {
612 r.kieninger      1.83     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
613 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
614 a.arora          1.73     // Check to see if we need to dynamically grow the _entries array
615                           // We always want the _entries array to 2 bigger than the
616                           // current connections requested
617                           _solicitSocketCount++;  // bump the count
618                           int size = (int)_entries.size();
619 w.otsuka         1.89     if((int)_solicitSocketCount >= (size-1)){
620                                for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
621 a.arora          1.73                  _MonitorEntry entry(0, 0, 0);
622                                        _entries.append(entry);
623                                }
624                           }
625 kumpf            1.4   
626 a.arora          1.73     int index;
627                           for(index = 1; index < (int)_entries.size(); index++)
628 mday             1.25     {
629 a.arora          1.73        try
630 mday             1.37        {
631 mike             1.96           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
632 a.arora          1.73           {
633                                    _entries[index].socket = socket;
634                                    _entries[index].queueId  = queueId;
635                                    _entries[index]._type = type;
636                                    _entries[index]._status = _MonitorEntry::IDLE;
637 r.kieninger      1.83  
638 a.arora          1.73              return index;
639                                 }
640 mday             1.37        }
641                              catch(...)
642 mday             1.25        {
643                              }
644                           }
645 a.arora          1.73     _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
646 mday             1.25     PEG_METHOD_EXIT();
647 kumpf            1.50     return -1;
648 a.arora          1.73  
649 mike             1.2   }
650                        
651 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
652 mike             1.2   {
653 kumpf            1.50  
654 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
655 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
656 a.arora          1.73  
657                            /*
658                                Start at index = 1 because _entries[0] is the tickle entry which never needs
659                                to be EMPTY;
660                            */
661 w.otsuka         1.89      unsigned int index;
662 a.arora          1.73      for(index = 1; index < _entries.size(); index++)
663 mike             1.2       {
664 mday             1.25         if(_entries[index].socket == socket)
665                               {
666 a.arora          1.73            _entries[index]._status = _MonitorEntry::EMPTY;
667 david.dillard    1.95            _entries[index].socket = PEGASUS_INVALID_SOCKET;
668 a.arora          1.73            _solicitSocketCount--;
669                                  break;
670 mday             1.25         }
671 mike             1.2       }
672 a.arora          1.73  
673                            /*
674                        	Dynamic Contraction:
675                        	To remove excess entries we will start from the end of the _entries array
676                        	and remove all entries with EMPTY status until we find the first NON EMPTY.
677                        	This prevents the positions, of the NON EMPTY entries, from being changed.
678 r.kieninger      1.83      */
679 a.arora          1.73      index = _entries.size() - 1;
680 mike             1.96      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
681 a.arora          1.73  	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
682                                        _entries.remove(index);
683                        	index--;
684                            }
685 kumpf            1.4       PEG_METHOD_EXIT();
686 mike             1.2   }
687                        
688 a.arora          1.73  // Note: this is no longer called with PEP 183.
689 mday             1.7   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
690                        {
691 mday             1.8      HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
692 kumpf            1.51     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
693 kumpf            1.53          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
694                                dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
695 kumpf            1.51     try
696                           {
697                              dst->run(1);
698                           }
699                           catch (...)
700                           {
701                              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
702                                  "Monitor::_dispatch: exception received");
703                           }
704                           Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
705                                  "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
706 r.kieninger      1.83  
707 mike             1.96     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
708 kumpf            1.68  
709                           // Once the HTTPConnection thread has set the status value to either
710                           // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
711                           // to the Monitor.  It is no longer permissible to access the connection
712                           // or the entry in the _entries table.
713 kumpf            1.50     if (dst->_connectionClosePending)
714                           {
715 kumpf            1.68        dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
716                           }
717                           else
718                           {
719                              dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
720 kumpf            1.50     }
721 mday             1.8      return 0;
722 mday             1.40  }
723                        
724 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2