(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             // Author: Mike Brasher (mbrasher@bmc.com)
 33             //
 34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
 36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 40 mike             1.2   //
 41                        //%/////////////////////////////////////////////////////////////////////////////
 42                        
 43 mike             1.107 #include "Network.h"
 44 mike             1.2   #include <Pegasus/Common/Config.h>
 45                        #include <cstring>
 46                        #include "Monitor.h"
 47                        #include "MessageQueue.h"
 48                        #include "Socket.h"
 49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
 50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
 51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
 52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
 53 mike             1.100 #include "ArrayIterator.h"
 54 mike             1.2   
 55                        PEGASUS_USING_STD;
 56                        
 57                        PEGASUS_NAMESPACE_BEGIN
 58                        
 59 mike             1.96  static AtomicInt _connections(0);
 60 mday             1.25  
 61 mike             1.2   ////////////////////////////////////////////////////////////////////////////////
 62                        //
 63                        // Monitor
 64                        //
 65                        ////////////////////////////////////////////////////////////////////////////////
 66                        
 67 kumpf            1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 68 mike             1.2   Monitor::Monitor()
 69 kumpf            1.87     : _stopConnections(0),
 70 a.arora          1.73       _stopConnectionsSem(0),
 71 a.dunfey         1.76       _solicitSocketCount(0),
 72 a.dunfey         1.77       _tickle_client_socket(-1),
 73                             _tickle_server_socket(-1),
 74                             _tickle_peer_socket(-1)
 75 mike             1.2   {
 76 kumpf            1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 77 mike             1.2       Socket::initializeInterface();
 78 kumpf            1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 79 a.arora          1.73  
 80                            // setup the tickler
 81                            initializeTickler();
 82                        
 83 r.kieninger      1.83      // Start the count at 1 because initilizeTickler()
 84 a.arora          1.73      // has added an entry in the first position of the
 85                            // _entries array
 86                            for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
 87 mday             1.37      {
 88                               _MonitorEntry entry(0, 0, 0);
 89                               _entries.append(entry);
 90                            }
 91 mday             1.18  }
 92 mday             1.20  
 93 mike             1.2   Monitor::~Monitor()
 94                        {
 95 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 96 a.dunfey         1.76  
 97                            try{
 98 a.dunfey         1.77          if(_tickle_peer_socket >= 0)
 99 a.dunfey         1.76          {
100                                    Socket::close(_tickle_peer_socket);
101                                }
102 a.dunfey         1.77          if(_tickle_client_socket >= 0)
103 a.dunfey         1.76          {
104                                    Socket::close(_tickle_client_socket);
105                                }
106 a.dunfey         1.77          if(_tickle_server_socket >= 0)
107 a.dunfey         1.76          {
108                                    Socket::close(_tickle_server_socket);
109                                }
110                            }
111                            catch(...)
112                            {
113                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
114                                          "Failed to close tickle sockets");
115                            }
116                        
117 mike             1.2       Socket::uninitializeInterface();
118 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
119                                          "returning from monitor destructor");
120 mday             1.18  }
121                        
122 a.arora          1.73  void Monitor::initializeTickler(){
123 r.kieninger      1.83      /*
124                               NOTE: On any errors trying to
125                                     setup out tickle connection,
126 a.arora          1.73               throw an exception/end the server
127                            */
128                        
129                            /* setup the tickle server/listener */
130                        
131                            // get a socket for the server side
132 david.dillard    1.95      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
133 a.arora          1.73  	//handle error
134                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
135                        				 "Received error number $0 while creating the internal socket.",
136 kumpf            1.110 				 getSocketError());
137 a.arora          1.73  	throw Exception(parms);
138                            }
139 marek            1.104     
140                            // set TCP_NODELAY
141                            int opt = 1;
142                            setsockopt(_tickle_server_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
143 a.arora          1.73  
144                            // initialize the address
145                            memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
146                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
147                        #pragma convert(37)
148                        #endif
149                            _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
150                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
151                        #pragma convert(0)
152                        #endif
153                            _tickle_server_addr.sin_family = PF_INET;
154                            _tickle_server_addr.sin_port = 0;
155                        
156 mike             1.109     SocketLength _addr_size = sizeof(_tickle_server_addr);
157 a.arora          1.73  
158                            // bind server side to socket
159                            if((::bind(_tickle_server_socket,
160 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
161 a.arora          1.73  	       sizeof(_tickle_server_addr))) < 0){
162                        	// handle error
163 r.kieninger      1.83  #ifdef PEGASUS_OS_ZOS
164                            MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
165                        				 "Received error:$0 while binding the internal socket.",strerror(errno));
166                        #else
167 a.arora          1.73  	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
168                        				 "Received error number $0 while binding the internal socket.",
169 kumpf            1.110 				 getSocketError());
170 r.kieninger      1.83  #endif
171 a.arora          1.73          throw Exception(parms);
172                            }
173                        
174                            // tell the kernel we are a server
175                            if((::listen(_tickle_server_socket,3)) < 0){
176                        	// handle error
177                        	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
178                        			 "Received error number $0 while listening to the internal socket.",
179 kumpf            1.110 				 getSocketError());
180 a.arora          1.73  	throw Exception(parms);
181                            }
182 r.kieninger      1.83  
183 a.arora          1.73      // make sure we have the correct socket for our server
184                            int sock = ::getsockname(_tickle_server_socket,
185 kumpf            1.88                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
186                                           &_addr_size);
187 a.arora          1.73      if(sock < 0){
188                        	// handle error
189                        	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
190                        			 "Received error number $0 while getting the internal socket name.",
191 kumpf            1.110 				 getSocketError());
192 a.arora          1.73  	throw Exception(parms);
193                            }
194                        
195                            /* set up the tickle client/connector */
196 r.kieninger      1.83  
197 a.arora          1.73      // get a socket for our tickle client
198 david.dillard    1.95      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
199 a.arora          1.73  	// handle error
200                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
201                        			 "Received error number $0 while creating the internal client socket.",
202 kumpf            1.110 				 getSocketError());
203 a.arora          1.73  	throw Exception(parms);
204                            }
205 marek            1.104     
206                            // set TCP_NODELAY
207                            setsockopt(_tickle_client_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
208 a.arora          1.73  
209                            // setup the address of the client
210                            memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
211                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
212                        #pragma convert(37)
213                        #endif
214                            _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
215                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
216                        #pragma convert(0)
217                        #endif
218                            _tickle_client_addr.sin_family = PF_INET;
219                            _tickle_client_addr.sin_port = 0;
220                        
221                            // bind socket to client side
222                            if((::bind(_tickle_client_socket,
223 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
224 a.arora          1.73  	       sizeof(_tickle_client_addr))) < 0){
225                        	// handle error
226                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
227                        			 "Received error number $0 while binding the internal client socket.",
228 kumpf            1.110 				 getSocketError());
229 a.arora          1.73  	throw Exception(parms);
230                            }
231                        
232                            // connect to server side
233                            if((::connect(_tickle_client_socket,
234 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
235 a.arora          1.73  		  sizeof(_tickle_server_addr))) < 0){
236                        	// handle error
237                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
238                        			 "Received error number $0 while connecting the internal client socket.",
239 kumpf            1.110 				 getSocketError());
240 a.arora          1.73  	throw Exception(parms);
241                            }
242                        
243                            /* set up the slave connection */
244                            memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
245 mike             1.109     SocketLength peer_size = sizeof(_tickle_peer_addr);
246 mike             1.111.2.1     Threads::sleep(1);
247 a.arora          1.73      
248                                // this call may fail, we will try a max of 20 times to establish this peer connection
249                                if((_tickle_peer_socket = ::accept(_tickle_server_socket,
250 kumpf            1.88                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
251                                        &peer_size)) < 0){
252 a.arora          1.73      #if !defined(PEGASUS_OS_TYPE_WINDOWS)
253                                    // Only retry on non-windows platforms.
254                                    if(_tickle_peer_socket == -1 && errno == EAGAIN)
255                                    {
256 r.kieninger      1.83                int retries = 0;
257 a.arora          1.73                do
258                                      {
259 mike             1.111.2.1             Threads::sleep(1);
260 a.arora          1.73                  _tickle_peer_socket = ::accept(_tickle_server_socket,
261 kumpf            1.88                      reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
262                                            &peer_size);
263 a.arora          1.73                  retries++;
264                                      } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
265                                    }
266                            #endif
267                                }
268                                if(_tickle_peer_socket == -1){
269                            	// handle error
270                            	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
271                            			 "Received error number $0 while accepting the internal socket connection.",
272 kumpf            1.110     				 getSocketError());
273 a.arora          1.73      	throw Exception(parms);
274                                }
275 marek            1.111     
276                                Socket::disableBlocking(_tickle_peer_socket);
277                                Socket::disableBlocking(_tickle_client_socket);
278                            
279 a.arora          1.73          // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
280                                // checks entries with IDLE state for events
281                                _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
282                                entry._status = _MonitorEntry::IDLE;
283                                _entries.append(entry);
284                            }
285                            
286                            void Monitor::tickle(void)
287                            {
288 sushma.fernandes 1.78          static char _buffer[] =
289 a.arora          1.73          {
290                                  '0','0'
291                                };
292 r.kieninger      1.83      
293 sushma.fernandes 1.78          AutoMutex autoMutex(_tickle_mutex);
294                                Socket::write(_tickle_client_socket,&_buffer, 2);
295                            }
296                            
297                            void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
298                            {
299                                // Set the state to requested state
300                                _entries[index]._status = status;
301 a.arora          1.73      }
302                            
303 mike             1.2       Boolean Monitor::run(Uint32 milliseconds)
304                            {
305 mday             1.18      
306 mday             1.25          Boolean handled_events = false;
307 a.arora          1.73          int i = 0;
308 r.kieninger      1.83      
309 kumpf            1.36          struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
310 a.arora          1.73      
311 mday             1.25          fd_set fdread;
312                                FD_ZERO(&fdread);
313 a.arora          1.73      
314 kumpf            1.94          AutoMutex autoEntryMutex(_entry_mut);
315 r.kieninger      1.83      
316 mike             1.100         ArrayIterator<_MonitorEntry> entries(_entries);
317                            
318 r.kieninger      1.83          // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
319 mike             1.96          if (_stopConnections.get() == 1)
320 kumpf            1.48          {
321 mike             1.100             for ( int indx = 0; indx < (int)entries.size(); indx++)
322 kumpf            1.48              {
323 mike             1.100                 if (entries[indx]._type == Monitor::ACCEPTOR)
324 kumpf            1.48                  {
325 mike             1.100                     if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
326 kumpf            1.48                      {
327 mike             1.100                        if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
328                                                    entries[indx]._status.get() == _MonitorEntry::DYING )
329 kumpf            1.48                         {
330                                                   // remove the entry
331 mike             1.100     		       entries[indx]._status = _MonitorEntry::EMPTY;
332 kumpf            1.48                         }
333                                               else
334                                               {
335                                                   // set status to DYING
336 mike             1.100                           entries[indx]._status = _MonitorEntry::DYING;
337 kumpf            1.48                         }
338                                           }
339                                       }
340                                    }
341                                    _stopConnections = 0;
342 a.arora          1.73      	_stopConnectionsSem.signal();
343 kumpf            1.48          }
344 kumpf            1.51      
345 mike             1.100         for( int indx = 0; indx < (int)entries.size(); indx++)
346 kumpf            1.68          {
347 mike             1.100     			 const _MonitorEntry &entry = entries[indx];
348 mike             1.96             if ((entry._status.get() == _MonitorEntry::DYING) &&
349 brian.campbell   1.80      					 (entry._type == Monitor::CONNECTION))
350 kumpf            1.68             {
351 brian.campbell   1.80                MessageQueue *q = MessageQueue::lookup(entry.queueId);
352 kumpf            1.68                PEGASUS_ASSERT(q != 0);
353 brian.campbell   1.80                HTTPConnection &h = *static_cast<HTTPConnection *>(q);
354 r.kieninger      1.83      
355 brian.campbell   1.80      					if (h._connectionClosePending == false)
356                            						continue;
357                            
358                            					// NOTE: do not attempt to delete while there are pending responses
359 r.kieninger      1.83      					// coming thru. The last response to come thru after a
360 brian.campbell   1.80      					// _connectionClosePending will reset _responsePending to false
361                            					// and then cause the monitor to rerun this code and clean up.
362                            					// (see HTTPConnection.cpp)
363                            
364                            					if (h._responsePending == true)
365                            					{
366                            						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
367                            													"Ignoring connection delete request because "
368                            													"responses are still pending. "
369 r.kieninger      1.83      													"connection=0x%p, socket=%d\n",
370 brian.campbell   1.81      													(void *)&h, h.getSocket());
371 brian.campbell   1.80      						continue;
372                            					}
373                            					h._connectionClosePending = false;
374                                      MessageQueue &o = h.get_owner();
375                                      Message* message= new CloseConnectionMessage(entry.socket);
376 kumpf            1.68                message->dest = o.getQueueId();
377                            
378 r.kieninger      1.83                // HTTPAcceptor is responsible for closing the connection.
379 kumpf            1.68                // The lock is released to allow HTTPAcceptor to call
380 r.kieninger      1.83                // unsolicitSocketMessages to free the entry.
381 kumpf            1.68                // Once HTTPAcceptor completes processing of the close
382                                      // connection, the lock is re-requested and processing of
383                                      // the for loop continues.  This is safe with the current
384 mike             1.100               // implementation of the entries object.  Note that the
385                                      // loop condition accesses the entries.size() on each
386 kumpf            1.68                // iteration, so that a change in size while the mutex is
387                                      // unlocked will not result in an ArrayIndexOutOfBounds
388                                      // exception.
389                            
390 mike             1.111.2.2           _entry_mut.unlock();
391 kumpf            1.68                o.enqueue(message);
392 mike             1.111.2.2           _entry_mut.lock();
393 r.kieninger      1.102               // After enqueue a message and the autoEntryMutex has been released and locked again,
394                                      // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
395                                      entries.reset(_entries);
396 kumpf            1.68             }
397                                }
398                            
399 kumpf            1.51          Uint32 _idleEntries = 0;
400 r.kieninger      1.83      
401 a.arora          1.73          /*
402 david.dillard    1.95              We will keep track of the maximum socket number and pass this value
403                                    to the kernel as a parameter to SELECT.  This loop seems like a good
404                                    place to calculate the max file descriptor (maximum socket number)
405                                    because we have to traverse the entire array.
406 r.kieninger      1.83          */
407 mike             1.106         SocketHandle maxSocketCurrentPass = 0;
408 mike             1.100         for( int indx = 0; indx < (int)entries.size(); indx++)
409 mike             1.2           {
410 mike             1.100            if(maxSocketCurrentPass < entries[indx].socket)
411                                        maxSocketCurrentPass = entries[indx].socket;
412 a.arora          1.73      
413 mike             1.100            if(entries[indx]._status.get() == _MonitorEntry::IDLE)
414 mday             1.25             {
415 david.dillard    1.95                 _idleEntries++;
416 mike             1.100                FD_SET(entries[indx].socket, &fdread);
417 mday             1.25             }
418 mday             1.13          }
419 s.hills          1.62      
420 a.arora          1.73          /*
421 david.dillard    1.95              Add 1 then assign maxSocket accordingly. We add 1 to account for
422                                    descriptors starting at 0.
423 a.arora          1.73          */
424                                maxSocketCurrentPass++;
425                            
426 mike             1.111.2.2     _entry_mut.unlock();
427 david.dillard    1.95      
428                                //
429                                // The first argument to select() is ignored on Windows and it is not
430                                // a socket value.  The original code assumed that the number of sockets
431                                // and a socket value have the same type.  On Windows they do not.
432                                //
433                            #ifdef PEGASUS_OS_TYPE_WINDOWS
434                                int events = select(0, &fdread, NULL, NULL, &tv);
435                            #else
436 a.arora          1.73          int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
437 david.dillard    1.95      #endif
438 mike             1.111.2.2     _entry_mut.lock();
439 r.kieninger      1.102         // After enqueue a message and the autoEntryMutex has been released and locked again,
440                                // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
441                                entries.reset(_entries);
442 mike             1.106     
443                                if (events == PEGASUS_SOCKET_ERROR)
444 mday             1.13          {
445 kumpf            1.50             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
446                                      "Monitor::run - errorno = %d has occurred on select.", errno);
447                                   // The EBADF error indicates that one or more or the file
448                                   // descriptions was not valid. This could indicate that
449 mike             1.100            // the entries structure has been corrupted or that
450 kumpf            1.50             // we have a synchronization error.
451                            
452                                   PEGASUS_ASSERT(errno != EBADF);
453                                }
454                                else if (events)
455                                {
456 kumpf            1.51             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
457 r.kieninger      1.83                "Monitor::run select event received events = %d, monitoring %d idle entries",
458 david.dillard    1.95                 events, _idleEntries);
459 mike             1.100            for( int indx = 0; indx < (int)entries.size(); indx++)
460 mday             1.25             {
461 kumpf            1.53                // The Monitor should only look at entries in the table that are IDLE (i.e.,
462                                      // owned by the Monitor).
463 mike             1.100               if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
464                                         (FD_ISSET(entries[indx].socket, &fdread)))
465 david.dillard    1.95                {
466 mike             1.100                  MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
467 kumpf            1.53                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
468                                              "Monitor::run indx = %d, queueId =  %d, q = %p",
469 mike             1.100                       indx, entries[indx].queueId, q);
470 kumpf            1.53                   PEGASUS_ASSERT(q !=0);
471 mday             1.37      
472 david.dillard    1.95                   try
473                                         {
474 mike             1.100                     if(entries[indx]._type == Monitor::CONNECTION)
475 david.dillard    1.95                      {
476 kumpf            1.51                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
477 mike             1.100                          "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
478 david.dillard    1.95                         static_cast<HTTPConnection *>(q)->_entry_index = indx;
479 sushma.fernandes 1.78      
480                                               // Do not update the entry just yet. The entry gets updated once
481 r.kieninger      1.83                         // the request has been read.
482 mike             1.100                        //entries[indx]._status = _MonitorEntry::BUSY;
483 sushma.fernandes 1.78      
484 kumpf            1.66                         // If allocate_and_awaken failure, retry on next iteration
485 a.arora          1.73      /* Removed for PEP 183.
486 kumpf            1.69                         if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
487                                                       (void *)q, _dispatch))
488 kumpf            1.67                         {
489                                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
490                                                      "Monitor::run: Insufficient resources to process request.");
491 mike             1.100                           entries[indx]._status = _MonitorEntry::IDLE;
492 kumpf            1.67                            return true;
493                                               }
494 a.arora          1.73      */
495                            // Added for PEP 183
496 david.dillard    1.95                         HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
497                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
498 a.arora          1.73                               "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
499                                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
500                                               try
501                                               {
502                                                   dst->run(1);
503                                               }
504 david.dillard    1.95                         catch (...)
505                                               {
506                                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
507                                                   "Monitor::_dispatch: exception received");
508                                               }
509                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
510 a.arora          1.73                         "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
511                            
512 sushma.fernandes 1.78                         // It is possible the entry status may not be set to busy.
513 r.kieninger      1.83                         // The following will fail in that case.
514 mike             1.96         		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
515 a.arora          1.73      		   // Once the HTTPConnection thread has set the status value to either
516                            		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
517                            		   // to the Monitor.  It is no longer permissible to access the connection
518                            		   // or the entry in the _entries table.
519 sushma.fernandes 1.78      
520                                               // The following is not relevant as the worker thread or the
521                                               // reader thread will update the status of the entry.
522                            		   //if (dst->_connectionClosePending)
523 r.kieninger      1.83      		   //{
524 sushma.fernandes 1.78      		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
525                            		   //}
526                            		   //else
527                            		   //{
528                            		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
529 r.kieninger      1.83      		   //}
530                            // end Added for PEP 183
531 a.arora          1.73      		}
532 mike             1.100     	        else if( entries[indx]._type == Monitor::INTERNAL){
533 r.kieninger      1.83      			// set ourself to BUSY,
534                                                    // read the data
535 a.arora          1.73                              // and set ourself back to IDLE
536 r.kieninger      1.83      
537 mike             1.100     		   	entries[indx]._status = _MonitorEntry::BUSY;
538 a.arora          1.73      			static char buffer[2];
539 mike             1.100           			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
540                            			entries[indx]._status = _MonitorEntry::IDLE;
541 mday             1.37      		}
542                            		else
543 mday             1.25      		{
544 kumpf            1.51                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
545                                                 "Non-connection entry, indx = %d, has been received.", indx);
546 mday             1.37      		   int events = 0;
547                            		   events |= SocketMessage::READ;
548 mike             1.100     		   Message *msg = new SocketMessage(entries[indx].socket, events);
549                            		   entries[indx]._status = _MonitorEntry::BUSY;
550 mike             1.111.2.2                    _entry_mut.unlock();
551 mday             1.37      		   q->enqueue(msg);
552 mike             1.111.2.2                    _entry_mut.lock();
553 r.kieninger      1.102                // After enqueue a message and the autoEntryMutex has been released and locked again,
554                                       // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
555                                       entries.reset(_entries);
556 mike             1.100     		   entries[indx]._status = _MonitorEntry::IDLE;
557 kumpf            1.94      
558 mday             1.25      		   return true;
559                            		}
560                            	     }
561 mday             1.37      	     catch(...)
562 mday             1.25      	     {
563                            	     }
564                            	     handled_events = true;
565                            	  }
566                                   }
567 mday             1.24          }
568 kumpf            1.94      
569 mday             1.13          return(handled_events);
570 mike             1.2       }
571                            
572 chuck            1.74      void Monitor::stopListeningForConnections(Boolean wait)
573 kumpf            1.48      {
574                                PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
575 r.kieninger      1.83          // set boolean then tickle the server to recognize _stopConnections
576 kumpf            1.48          _stopConnections = 1;
577 a.arora          1.73          tickle();
578 kumpf            1.48      
579 chuck            1.74          if (wait)
580 a.arora          1.73          {
581 chuck            1.74            // Wait for the monitor to notice _stopConnections.  Otherwise the
582                                  // caller of this function may unbind the ports while the monitor
583                                  // is still accepting connections on them.
584 kumpf            1.101           _stopConnectionsSem.wait();
585 a.arora          1.73          }
586 r.kieninger      1.83      
587 kumpf            1.48          PEG_METHOD_EXIT();
588                            }
589 mday             1.25      
590 mday             1.37      
591 mday             1.25      int  Monitor::solicitSocketMessages(
592 mike             1.106         SocketHandle socket,
593 mike             1.2           Uint32 events,
594 r.kieninger      1.83          Uint32 queueId,
595 mday             1.8           int type)
596 mike             1.2       {
597 r.kieninger      1.83         PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
598 alagaraja        1.75         AutoMutex autoMut(_entry_mut);
599 a.arora          1.73         // Check to see if we need to dynamically grow the _entries array
600                               // We always want the _entries array to 2 bigger than the
601                               // current connections requested
602                               _solicitSocketCount++;  // bump the count
603                               int size = (int)_entries.size();
604 w.otsuka         1.89         if((int)_solicitSocketCount >= (size-1)){
605                                    for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
606 a.arora          1.73                      _MonitorEntry entry(0, 0, 0);
607                                            _entries.append(entry);
608                                    }
609                               }
610 kumpf            1.4       
611 a.arora          1.73         int index;
612                               for(index = 1; index < (int)_entries.size(); index++)
613 mday             1.25         {
614 a.arora          1.73            try
615 mday             1.37            {
616 mike             1.96               if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
617 a.arora          1.73               {
618                                        _entries[index].socket = socket;
619                                        _entries[index].queueId  = queueId;
620                                        _entries[index]._type = type;
621                                        _entries[index]._status = _MonitorEntry::IDLE;
622 r.kieninger      1.83      
623 a.arora          1.73                  return index;
624                                     }
625 mday             1.37            }
626                                  catch(...)
627 mday             1.25            {
628                                  }
629                               }
630 a.arora          1.73         _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
631 mday             1.25         PEG_METHOD_EXIT();
632 kumpf            1.50         return -1;
633 a.arora          1.73      
634 mike             1.2       }
635                            
636 mike             1.106     void Monitor::unsolicitSocketMessages(SocketHandle socket)
637 mike             1.2       {
638 kumpf            1.50      
639 mday             1.25          PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
640 alagaraja        1.75          AutoMutex autoMut(_entry_mut);
641 a.arora          1.73      
642                                /*
643                                    Start at index = 1 because _entries[0] is the tickle entry which never needs
644                                    to be EMPTY;
645                                */
646 w.otsuka         1.89          unsigned int index;
647 a.arora          1.73          for(index = 1; index < _entries.size(); index++)
648 mike             1.2           {
649 mday             1.25             if(_entries[index].socket == socket)
650                                   {
651 a.arora          1.73                _entries[index]._status = _MonitorEntry::EMPTY;
652 david.dillard    1.95                _entries[index].socket = PEGASUS_INVALID_SOCKET;
653 a.arora          1.73                _solicitSocketCount--;
654                                      break;
655 mday             1.25             }
656 mike             1.2           }
657 a.arora          1.73      
658                                /*
659                            	Dynamic Contraction:
660                            	To remove excess entries we will start from the end of the _entries array
661                            	and remove all entries with EMPTY status until we find the first NON EMPTY.
662                            	This prevents the positions, of the NON EMPTY entries, from being changed.
663 r.kieninger      1.83          */
664 a.arora          1.73          index = _entries.size() - 1;
665 mike             1.96          while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
666 a.arora          1.73      	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
667                                            _entries.remove(index);
668                            	index--;
669                                }
670 kumpf            1.4           PEG_METHOD_EXIT();
671 mike             1.2       }
672                            
673 a.arora          1.73      // Note: this is no longer called with PEP 183.
674 mike             1.111.2.1 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
675 mday             1.7       {
676 mday             1.8          HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
677 kumpf            1.51         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
678 kumpf            1.53              "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
679                                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
680 kumpf            1.51         try
681                               {
682                                  dst->run(1);
683                               }
684                               catch (...)
685                               {
686                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
687                                      "Monitor::_dispatch: exception received");
688                               }
689                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
690                                      "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
691 r.kieninger      1.83      
692 mike             1.96         PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
693 kumpf            1.68      
694                               // Once the HTTPConnection thread has set the status value to either
695                               // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
696                               // to the Monitor.  It is no longer permissible to access the connection
697                               // or the entry in the _entries table.
698 kumpf            1.50         if (dst->_connectionClosePending)
699                               {
700 kumpf            1.68            dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
701                               }
702                               else
703                               {
704                                  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
705 kumpf            1.50         }
706 mday             1.8          return 0;
707 mday             1.40      }
708                            
709 mike             1.2       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2