(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             // Author: Mike Brasher (mbrasher@bmc.com)
 33             //
 34 r.kieninger 1.83  // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 35 a.arora     1.71  //              Amit K Arora (Bug#1153) amita@in.ibm.com
 36 alagaraja   1.75  //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 37 sushma.fernandes 1.78  //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 38 joyce.j          1.84  //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 39 kumpf            1.85  //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 40 mike             1.2   //
 41                        //%/////////////////////////////////////////////////////////////////////////////
 42                        
 43                        #include <Pegasus/Common/Config.h>
 44 mday             1.40  
 45 mike             1.2   #include <cstring>
 46                        #include "Monitor.h"
 47                        #include "MessageQueue.h"
 48                        #include "Socket.h"
 49 kumpf            1.4   #include <Pegasus/Common/Tracer.h>
 50 mday             1.7   #include <Pegasus/Common/HTTPConnection.h>
 51 kumpf            1.69  #include <Pegasus/Common/MessageQueueService.h>
 52 a.arora          1.73  #include <Pegasus/Common/Exception.h>
 53 mike             1.100 #include "ArrayIterator.h"
 54 mike             1.2   
 55                        #ifdef PEGASUS_OS_TYPE_WINDOWS
 56                        # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 57                        #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 58                        of <winsock.h>. It may have been indirectly included (e.g., by including \
 59 david.dillard    1.95  <windows.h>). Find inclusion of that header which is visible to this \
 60 mike             1.2   compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 61                        otherwise, less than 64 clients (the default) will be able to connect to the \
 62                        CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 63 mday             1.5   
 64 mike             1.2   # endif
 65                        # define FD_SETSIZE 1024
 66 mday             1.5   # include <windows.h>
 67 mike             1.2   #else
 68                        # include <sys/types.h>
 69                        # include <sys/socket.h>
 70                        # include <sys/time.h>
 71                        # include <netinet/in.h>
 72                        # include <netdb.h>
 73                        # include <arpa/inet.h>
 74                        #endif
 75                        
 76                        PEGASUS_USING_STD;
 77                        
 78                        PEGASUS_NAMESPACE_BEGIN
 79                        
 80 mike             1.96  static AtomicInt _connections(0);
 81 mday             1.25  
 82 mike             1.2   ////////////////////////////////////////////////////////////////////////////////
 83                        //
 84                        // Monitor
 85                        //
 86                        ////////////////////////////////////////////////////////////////////////////////
 87                        
 88 kumpf            1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 89 mike             1.2   Monitor::Monitor()
 90 kumpf            1.87     : _stopConnections(0),
 91 a.arora          1.73       _stopConnectionsSem(0),
 92 a.dunfey         1.76       _solicitSocketCount(0),
 93 a.dunfey         1.77       _tickle_client_socket(-1),
 94                             _tickle_server_socket(-1),
 95                             _tickle_peer_socket(-1)
 96 mike             1.2   {
 97 kumpf            1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 98 mike             1.2       Socket::initializeInterface();
 99 kumpf            1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
100 a.arora          1.73  
101                            // setup the tickler
102                            initializeTickler();
103                        
104 r.kieninger      1.83      // Start the count at 1 because initilizeTickler()
105 a.arora          1.73      // has added an entry in the first position of the
106                            // _entries array
107                            for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
108 mday             1.37      {
109                               _MonitorEntry entry(0, 0, 0);
110                               _entries.append(entry);
111                            }
112 mday             1.18  }
113 mday             1.20  
114 mike             1.2   Monitor::~Monitor()
115                        {
116 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
117 a.dunfey         1.76  
118                            try{
119 a.dunfey         1.77          if(_tickle_peer_socket >= 0)
120 a.dunfey         1.76          {
121                                    Socket::close(_tickle_peer_socket);
122                                }
123 a.dunfey         1.77          if(_tickle_client_socket >= 0)
124 a.dunfey         1.76          {
125                                    Socket::close(_tickle_client_socket);
126                                }
127 a.dunfey         1.77          if(_tickle_server_socket >= 0)
128 a.dunfey         1.76          {
129                                    Socket::close(_tickle_server_socket);
130                                }
131                            }
132                            catch(...)
133                            {
134                                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
135                                          "Failed to close tickle sockets");
136                            }
137                        
138 mike             1.2       Socket::uninitializeInterface();
139 kumpf            1.11      Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
140                                          "returning from monitor destructor");
141 mday             1.18  }
142                        
143 a.arora          1.73  void Monitor::initializeTickler(){
144 r.kieninger      1.83      /*
145                               NOTE: On any errors trying to
146                                     setup out tickle connection,
147 a.arora          1.73               throw an exception/end the server
148                            */
149                        
150                            /* setup the tickle server/listener */
151                        
152                            // get a socket for the server side
153 david.dillard    1.95      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
154 a.arora          1.73  	//handle error
155                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
156                        				 "Received error number $0 while creating the internal socket.",
157                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
158                        				 errno);
159                        #else
160                        				 WSAGetLastError());
161                        #endif
162                        	throw Exception(parms);
163                            }
164                        
165                            // initialize the address
166                            memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
167                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
168                        #pragma convert(37)
169                        #endif
170                            _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
171                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
172                        #pragma convert(0)
173                        #endif
174                            _tickle_server_addr.sin_family = PF_INET;
175 a.arora          1.73      _tickle_server_addr.sin_port = 0;
176                        
177 kumpf            1.86      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
178 a.arora          1.73  
179                            // bind server side to socket
180                            if((::bind(_tickle_server_socket,
181 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
182 a.arora          1.73  	       sizeof(_tickle_server_addr))) < 0){
183                        	// handle error
184 r.kieninger      1.83  #ifdef PEGASUS_OS_ZOS
185                            MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
186                        				 "Received error:$0 while binding the internal socket.",strerror(errno));
187                        #else
188 a.arora          1.73  	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
189                        				 "Received error number $0 while binding the internal socket.",
190                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
191                        				 errno);
192                        #else
193                        				 WSAGetLastError());
194                        #endif
195 r.kieninger      1.83  #endif
196 a.arora          1.73          throw Exception(parms);
197                            }
198                        
199                            // tell the kernel we are a server
200                            if((::listen(_tickle_server_socket,3)) < 0){
201                        	// handle error
202                        	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
203                        			 "Received error number $0 while listening to the internal socket.",
204                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
205                        				 errno);
206                        #else
207                        				 WSAGetLastError());
208                        #endif
209                        	throw Exception(parms);
210                            }
211 r.kieninger      1.83  
212 a.arora          1.73      // make sure we have the correct socket for our server
213                            int sock = ::getsockname(_tickle_server_socket,
214 kumpf            1.88                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
215                                           &_addr_size);
216 a.arora          1.73      if(sock < 0){
217                        	// handle error
218                        	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
219                        			 "Received error number $0 while getting the internal socket name.",
220                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
221                        				 errno);
222                        #else
223                        				 WSAGetLastError());
224                        #endif
225                        	throw Exception(parms);
226                            }
227                        
228                            /* set up the tickle client/connector */
229 r.kieninger      1.83  
230 a.arora          1.73      // get a socket for our tickle client
231 david.dillard    1.95      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
232 a.arora          1.73  	// handle error
233                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
234                        			 "Received error number $0 while creating the internal client socket.",
235                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
236                        				 errno);
237                        #else
238                        				 WSAGetLastError());
239                        #endif
240                        	throw Exception(parms);
241                            }
242                        
243                            // setup the address of the client
244                            memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
245                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
246                        #pragma convert(37)
247                        #endif
248                            _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
249                        #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
250                        #pragma convert(0)
251                        #endif
252                            _tickle_client_addr.sin_family = PF_INET;
253 a.arora          1.73      _tickle_client_addr.sin_port = 0;
254                        
255                            // bind socket to client side
256                            if((::bind(_tickle_client_socket,
257 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
258 a.arora          1.73  	       sizeof(_tickle_client_addr))) < 0){
259                        	// handle error
260                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
261                        			 "Received error number $0 while binding the internal client socket.",
262                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
263                        				 errno);
264                        #else
265                        				 WSAGetLastError());
266                        #endif
267                        	throw Exception(parms);
268                            }
269                        
270                            // connect to server side
271                            if((::connect(_tickle_client_socket,
272 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
273 a.arora          1.73  		  sizeof(_tickle_server_addr))) < 0){
274                        	// handle error
275                        	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
276                        			 "Received error number $0 while connecting the internal client socket.",
277                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
278                        				 errno);
279                        #else
280                        				 WSAGetLastError());
281                        #endif
282                        	throw Exception(parms);
283                            }
284                        
285                            /* set up the slave connection */
286                            memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
287 kumpf            1.86      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
288 r.kieninger      1.83      pegasus_sleep(1);
289 a.arora          1.73  
290                            // this call may fail, we will try a max of 20 times to establish this peer connection
291                            if((_tickle_peer_socket = ::accept(_tickle_server_socket,
292 kumpf            1.88              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
293                                    &peer_size)) < 0){
294 a.arora          1.73  #if !defined(PEGASUS_OS_TYPE_WINDOWS)
295                                // Only retry on non-windows platforms.
296                                if(_tickle_peer_socket == -1 && errno == EAGAIN)
297                                {
298 r.kieninger      1.83            int retries = 0;
299 a.arora          1.73            do
300                                  {
301                                    pegasus_sleep(1);
302                                    _tickle_peer_socket = ::accept(_tickle_server_socket,
303 kumpf            1.88                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
304                                        &peer_size);
305 a.arora          1.73              retries++;
306                                  } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
307                                }
308                        #endif
309                            }
310                            if(_tickle_peer_socket == -1){
311                        	// handle error
312                        	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
313                        			 "Received error number $0 while accepting the internal socket connection.",
314                        #if !defined(PEGASUS_OS_TYPE_WINDOWS)
315                        				 errno);
316                        #else
317                        				 WSAGetLastError());
318                        #endif
319                        	throw Exception(parms);
320                            }
321 marek            1.103.2.1 
322                                Socket::disableBlocking(_tickle_peer_socket);
323                                Socket::disableBlocking(_tickle_client_socket);
324                            
325 a.arora          1.73          // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
326                                // checks entries with IDLE state for events
327                                _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
328                                entry._status = _MonitorEntry::IDLE;
329                                _entries.append(entry);
330                            }
331                            
332                            void Monitor::tickle(void)
333                            {
334 sushma.fernandes 1.78          static char _buffer[] =
335 a.arora          1.73          {
336                                  '0','0'
337                                };
338 r.kieninger      1.83      
339 sushma.fernandes 1.78          AutoMutex autoMutex(_tickle_mutex);
340                                Socket::write(_tickle_client_socket,&_buffer, 2);
341                            }
342                            
343                            void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
344                            {
345                                // Set the state to requested state
346                                _entries[index]._status = status;
347 a.arora          1.73      }
348                            
349 kumpf            1.103.2.2 void Monitor::run(Uint32 milliseconds)
350 mike             1.2       {
351 mday             1.18      
352 a.arora          1.73          int i = 0;
353 r.kieninger      1.83      
354 kumpf            1.36          struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
355 a.arora          1.73      
356 mday             1.25          fd_set fdread;
357                                FD_ZERO(&fdread);
358 a.arora          1.73      
359 kumpf            1.94          AutoMutex autoEntryMutex(_entry_mut);
360 r.kieninger      1.83      
361 mike             1.100         ArrayIterator<_MonitorEntry> entries(_entries);
362                            
363 r.kieninger      1.83          // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
364 mike             1.96          if (_stopConnections.get() == 1)
365 kumpf            1.48          {
366 mike             1.100             for ( int indx = 0; indx < (int)entries.size(); indx++)
367 kumpf            1.48              {
368 mike             1.100                 if (entries[indx]._type == Monitor::ACCEPTOR)
369 kumpf            1.48                  {
370 mike             1.100                     if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
371 kumpf            1.48                      {
372 mike             1.100                        if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
373                                                    entries[indx]._status.get() == _MonitorEntry::DYING )
374 kumpf            1.48                         {
375                                                   // remove the entry
376 mike             1.100     		       entries[indx]._status = _MonitorEntry::EMPTY;
377 kumpf            1.48                         }
378                                               else
379                                               {
380                                                   // set status to DYING
381 mike             1.100                           entries[indx]._status = _MonitorEntry::DYING;
382 kumpf            1.48                         }
383                                           }
384                                       }
385                                    }
386                                    _stopConnections = 0;
387 a.arora          1.73      	_stopConnectionsSem.signal();
388 kumpf            1.48          }
389 kumpf            1.51      
390 mike             1.100         for( int indx = 0; indx < (int)entries.size(); indx++)
391 kumpf            1.68          {
392 mike             1.100     			 const _MonitorEntry &entry = entries[indx];
393 mike             1.96             if ((entry._status.get() == _MonitorEntry::DYING) &&
394 brian.campbell   1.80      					 (entry._type == Monitor::CONNECTION))
395 kumpf            1.68             {
396 brian.campbell   1.80                MessageQueue *q = MessageQueue::lookup(entry.queueId);
397 kumpf            1.68                PEGASUS_ASSERT(q != 0);
398 brian.campbell   1.80                HTTPConnection &h = *static_cast<HTTPConnection *>(q);
399 r.kieninger      1.83      
400 brian.campbell   1.80      					if (h._connectionClosePending == false)
401                            						continue;
402                            
403                            					// NOTE: do not attempt to delete while there are pending responses
404 r.kieninger      1.83      					// coming thru. The last response to come thru after a
405 brian.campbell   1.80      					// _connectionClosePending will reset _responsePending to false
406                            					// and then cause the monitor to rerun this code and clean up.
407                            					// (see HTTPConnection.cpp)
408                            
409                            					if (h._responsePending == true)
410                            					{
411                            						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
412                            													"Ignoring connection delete request because "
413                            													"responses are still pending. "
414 r.kieninger      1.83      													"connection=0x%p, socket=%d\n",
415 brian.campbell   1.81      													(void *)&h, h.getSocket());
416 brian.campbell   1.80      						continue;
417                            					}
418                            					h._connectionClosePending = false;
419                                      MessageQueue &o = h.get_owner();
420                                      Message* message= new CloseConnectionMessage(entry.socket);
421 kumpf            1.68                message->dest = o.getQueueId();
422                            
423 r.kieninger      1.83                // HTTPAcceptor is responsible for closing the connection.
424 kumpf            1.68                // The lock is released to allow HTTPAcceptor to call
425 r.kieninger      1.83                // unsolicitSocketMessages to free the entry.
426 kumpf            1.68                // Once HTTPAcceptor completes processing of the close
427                                      // connection, the lock is re-requested and processing of
428                                      // the for loop continues.  This is safe with the current
429 mike             1.100               // implementation of the entries object.  Note that the
430                                      // loop condition accesses the entries.size() on each
431 kumpf            1.68                // iteration, so that a change in size while the mutex is
432                                      // unlocked will not result in an ArrayIndexOutOfBounds
433                                      // exception.
434                            
435 kumpf            1.94                autoEntryMutex.unlock();
436 kumpf            1.68                o.enqueue(message);
437 kumpf            1.94                autoEntryMutex.lock();
438 r.kieninger      1.102               // After enqueue a message and the autoEntryMutex has been released and locked again,
439                                      // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
440                                      entries.reset(_entries);
441 kumpf            1.68             }
442                                }
443                            
444 kumpf            1.51          Uint32 _idleEntries = 0;
445 r.kieninger      1.83      
446 a.arora          1.73          /*
447 david.dillard    1.95              We will keep track of the maximum socket number and pass this value
448                                    to the kernel as a parameter to SELECT.  This loop seems like a good
449                                    place to calculate the max file descriptor (maximum socket number)
450                                    because we have to traverse the entire array.
451 r.kieninger      1.83          */
452 david.dillard    1.95          PEGASUS_SOCKET maxSocketCurrentPass = 0;
453 mike             1.100         for( int indx = 0; indx < (int)entries.size(); indx++)
454 mike             1.2           {
455 mike             1.100            if(maxSocketCurrentPass < entries[indx].socket)
456                                        maxSocketCurrentPass = entries[indx].socket;
457 a.arora          1.73      
458 mike             1.100            if(entries[indx]._status.get() == _MonitorEntry::IDLE)
459 mday             1.25             {
460 david.dillard    1.95                 _idleEntries++;
461 mike             1.100                FD_SET(entries[indx].socket, &fdread);
462 mday             1.25             }
463 mday             1.13          }
464 s.hills          1.62      
465 a.arora          1.73          /*
466 david.dillard    1.95              Add 1 then assign maxSocket accordingly. We add 1 to account for
467                                    descriptors starting at 0.
468 a.arora          1.73          */
469                                maxSocketCurrentPass++;
470                            
471 kumpf            1.94          autoEntryMutex.unlock();
472 david.dillard    1.95      
473                                //
474                                // The first argument to select() is ignored on Windows and it is not
475                                // a socket value.  The original code assumed that the number of sockets
476                                // and a socket value have the same type.  On Windows they do not.
477                                //
478                            #ifdef PEGASUS_OS_TYPE_WINDOWS
479                                int events = select(0, &fdread, NULL, NULL, &tv);
480                            #else
481 a.arora          1.73          int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
482 david.dillard    1.95      #endif
483 kumpf            1.94          autoEntryMutex.lock();
484 r.kieninger      1.102         // After enqueue a message and the autoEntryMutex has been released and locked again,
485                                // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
486                                entries.reset(_entries);
487 mike             1.2       #ifdef PEGASUS_OS_TYPE_WINDOWS
488 kumpf            1.50          if(events == SOCKET_ERROR)
489 mike             1.2       #else
490 kumpf            1.50          if(events == -1)
491 mike             1.2       #endif
492 mday             1.13          {
493 kumpf            1.50             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
494                                      "Monitor::run - errorno = %d has occurred on select.", errno);
495                                   // The EBADF error indicates that one or more or the file
496                                   // descriptions was not valid. This could indicate that
497 mike             1.100            // the entries structure has been corrupted or that
498 kumpf            1.50             // we have a synchronization error.
499                            
500                                   PEGASUS_ASSERT(errno != EBADF);
501                                }
502                                else if (events)
503                                {
504 kumpf            1.51             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
505 r.kieninger      1.83                "Monitor::run select event received events = %d, monitoring %d idle entries",
506 david.dillard    1.95                 events, _idleEntries);
507 mike             1.100            for( int indx = 0; indx < (int)entries.size(); indx++)
508 mday             1.25             {
509 kumpf            1.53                // The Monitor should only look at entries in the table that are IDLE (i.e.,
510                                      // owned by the Monitor).
511 mike             1.100               if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
512                                         (FD_ISSET(entries[indx].socket, &fdread)))
513 david.dillard    1.95                {
514 mike             1.100                  MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
515 kumpf            1.53                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
516                                              "Monitor::run indx = %d, queueId =  %d, q = %p",
517 mike             1.100                       indx, entries[indx].queueId, q);
518 kumpf            1.53                   PEGASUS_ASSERT(q !=0);
519 mday             1.37      
520 david.dillard    1.95                   try
521                                         {
522 mike             1.100                     if(entries[indx]._type == Monitor::CONNECTION)
523 david.dillard    1.95                      {
524 kumpf            1.51                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
525 mike             1.100                          "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
526 david.dillard    1.95                         static_cast<HTTPConnection *>(q)->_entry_index = indx;
527 sushma.fernandes 1.78      
528                                               // Do not update the entry just yet. The entry gets updated once
529 r.kieninger      1.83                         // the request has been read.
530 mike             1.100                        //entries[indx]._status = _MonitorEntry::BUSY;
531 sushma.fernandes 1.78      
532 kumpf            1.66                         // If allocate_and_awaken failure, retry on next iteration
533 a.arora          1.73      /* Removed for PEP 183.
534 kumpf            1.69                         if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
535                                                       (void *)q, _dispatch))
536 kumpf            1.67                         {
537                                                  Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
538                                                      "Monitor::run: Insufficient resources to process request.");
539 mike             1.100                           entries[indx]._status = _MonitorEntry::IDLE;
540 kumpf            1.67                         }
541 a.arora          1.73      */
542                            // Added for PEP 183
543 david.dillard    1.95                         HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
544                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
545 a.arora          1.73                               "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
546                                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
547                                               try
548                                               {
549                                                   dst->run(1);
550                                               }
551 david.dillard    1.95                         catch (...)
552                                               {
553                                                   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554                                                   "Monitor::_dispatch: exception received");
555                                               }
556                                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
557 a.arora          1.73                         "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
558                            
559 sushma.fernandes 1.78                         // It is possible the entry status may not be set to busy.
560 r.kieninger      1.83                         // The following will fail in that case.
561 mike             1.96         		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
562 a.arora          1.73      		   // Once the HTTPConnection thread has set the status value to either
563                            		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
564                            		   // to the Monitor.  It is no longer permissible to access the connection
565                            		   // or the entry in the _entries table.
566 sushma.fernandes 1.78      
567                                               // The following is not relevant as the worker thread or the
568                                               // reader thread will update the status of the entry.
569                            		   //if (dst->_connectionClosePending)
570 r.kieninger      1.83      		   //{
571 sushma.fernandes 1.78      		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
572                            		   //}
573                            		   //else
574                            		   //{
575                            		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
576 r.kieninger      1.83      		   //}
577                            // end Added for PEP 183
578 a.arora          1.73      		}
579 mike             1.100     	        else if( entries[indx]._type == Monitor::INTERNAL){
580 r.kieninger      1.83      			// set ourself to BUSY,
581                                                    // read the data
582 a.arora          1.73                              // and set ourself back to IDLE
583 r.kieninger      1.83      
584 mike             1.100     		   	entries[indx]._status = _MonitorEntry::BUSY;
585 a.arora          1.73      			static char buffer[2];
586 mike             1.100           			Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
587                            			entries[indx]._status = _MonitorEntry::IDLE;
588 mday             1.37      		}
589                            		else
590 mday             1.25      		{
591 kumpf            1.51                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
592                                                 "Non-connection entry, indx = %d, has been received.", indx);
593 mday             1.37      		   int events = 0;
594                            		   events |= SocketMessage::READ;
595 mike             1.100     		   Message *msg = new SocketMessage(entries[indx].socket, events);
596                            		   entries[indx]._status = _MonitorEntry::BUSY;
597 kumpf            1.94                         autoEntryMutex.unlock();
598 mday             1.37      		   q->enqueue(msg);
599 kumpf            1.94                         autoEntryMutex.lock();
600 r.kieninger      1.102                // After enqueue a message and the autoEntryMutex has been released and locked again,
601                                       // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
602                                       entries.reset(_entries);
603 mike             1.100     		   entries[indx]._status = _MonitorEntry::IDLE;
604 mday             1.25      		}
605                            	     }
606 mday             1.37      	     catch(...)
607 mday             1.25      	     {
608                            	     }
609                            	  }
610                                   }
611 mday             1.24          }
612 mike             1.2       }
613                            
614 chuck            1.74      void Monitor::stopListeningForConnections(Boolean wait)
615 kumpf            1.48      {
616                                PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
617 r.kieninger      1.83          // set boolean then tickle the server to recognize _stopConnections
618 kumpf            1.48          _stopConnections = 1;
619 a.arora          1.73          tickle();
620 kumpf            1.48      
621 chuck            1.74          if (wait)
622 a.arora          1.73          {
623 chuck            1.74            // Wait for the monitor to notice _stopConnections.  Otherwise the
624                                  // caller of this function may unbind the ports while the monitor
625                                  // is still accepting connections on them.
626 kumpf            1.101           _stopConnectionsSem.wait();
627 a.arora          1.73          }
628 r.kieninger      1.83      
629 kumpf            1.48          PEG_METHOD_EXIT();
630                            }
631 mday             1.25      
632 mday             1.37      
633 mday             1.25      int  Monitor::solicitSocketMessages(
634 david.dillard    1.95          PEGASUS_SOCKET socket,
635 mike             1.2           Uint32 events,
636 r.kieninger      1.83          Uint32 queueId,
637 mday             1.8           int type)
638 mike             1.2       {
639 r.kieninger      1.83         PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
640 alagaraja        1.75         AutoMutex autoMut(_entry_mut);
641 a.arora          1.73         // Check to see if we need to dynamically grow the _entries array
642                               // We always want the _entries array to 2 bigger than the
643                               // current connections requested
644                               _solicitSocketCount++;  // bump the count
645                               int size = (int)_entries.size();
646 w.otsuka         1.89         if((int)_solicitSocketCount >= (size-1)){
647                                    for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
648 a.arora          1.73                      _MonitorEntry entry(0, 0, 0);
649                                            _entries.append(entry);
650                                    }
651                               }
652 kumpf            1.4       
653 a.arora          1.73         int index;
654                               for(index = 1; index < (int)_entries.size(); index++)
655 mday             1.25         {
656 a.arora          1.73            try
657 mday             1.37            {
658 mike             1.96               if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
659 a.arora          1.73               {
660                                        _entries[index].socket = socket;
661                                        _entries[index].queueId  = queueId;
662                                        _entries[index]._type = type;
663                                        _entries[index]._status = _MonitorEntry::IDLE;
664 r.kieninger      1.83      
665 a.arora          1.73                  return index;
666                                     }
667 mday             1.37            }
668                                  catch(...)
669 mday             1.25            {
670                                  }
671                               }
672 a.arora          1.73         _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
673 mday             1.25         PEG_METHOD_EXIT();
674 kumpf            1.50         return -1;
675 a.arora          1.73      
676 mike             1.2       }
677                            
678 david.dillard    1.95      void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
679 mike             1.2       {
680 kumpf            1.50      
681 mday             1.25          PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
682 alagaraja        1.75          AutoMutex autoMut(_entry_mut);
683 a.arora          1.73      
684                                /*
685                                    Start at index = 1 because _entries[0] is the tickle entry which never needs
686                                    to be EMPTY;
687                                */
688 w.otsuka         1.89          unsigned int index;
689 a.arora          1.73          for(index = 1; index < _entries.size(); index++)
690 mike             1.2           {
691 mday             1.25             if(_entries[index].socket == socket)
692                                   {
693 a.arora          1.73                _entries[index]._status = _MonitorEntry::EMPTY;
694 david.dillard    1.95                _entries[index].socket = PEGASUS_INVALID_SOCKET;
695 a.arora          1.73                _solicitSocketCount--;
696                                      break;
697 mday             1.25             }
698 mike             1.2           }
699 a.arora          1.73      
700                                /*
701                            	Dynamic Contraction:
702                            	To remove excess entries we will start from the end of the _entries array
703                            	and remove all entries with EMPTY status until we find the first NON EMPTY.
704                            	This prevents the positions, of the NON EMPTY entries, from being changed.
705 r.kieninger      1.83          */
706 a.arora          1.73          index = _entries.size() - 1;
707 mike             1.96          while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
708 a.arora          1.73      	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
709                                            _entries.remove(index);
710                            	index--;
711                                }
712 kumpf            1.4           PEG_METHOD_EXIT();
713 mike             1.2       }
714                            
715 a.arora          1.73      // Note: this is no longer called with PEP 183.
716 mday             1.7       PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
717                            {
718 mday             1.8          HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
719 kumpf            1.51         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
720 kumpf            1.53              "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
721                                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
722 kumpf            1.51         try
723                               {
724                                  dst->run(1);
725                               }
726                               catch (...)
727                               {
728                                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
729                                      "Monitor::_dispatch: exception received");
730                               }
731                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
732                                      "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
733 r.kieninger      1.83      
734 mike             1.96         PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
735 kumpf            1.68      
736                               // Once the HTTPConnection thread has set the status value to either
737                               // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
738                               // to the Monitor.  It is no longer permissible to access the connection
739                               // or the entry in the _entries table.
740 kumpf            1.50         if (dst->_connectionClosePending)
741                               {
742 kumpf            1.68            dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
743                               }
744                               else
745                               {
746                                  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
747 kumpf            1.50         }
748 mday             1.8          return 0;
749 mday             1.40      }
750                            
751 mike             1.2       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2