(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.90 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 david.dillard 1.95 //
 19 kumpf         1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike          1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21                    // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 kumpf         1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23                    // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24                    // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike          1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26                    // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //==============================================================================
 29                    //
 30                    // Author: Mike Brasher (mbrasher@bmc.com)
 31                    //
 32 r.kieninger   1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 33 a.arora       1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
 34 alagaraja     1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 35 sushma.fernandes 1.78 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 36 joyce.j          1.84 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 37 kumpf            1.85 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 38 mike             1.2  //
 39                       //%/////////////////////////////////////////////////////////////////////////////
 40                       
 41                       #include <Pegasus/Common/Config.h>
 42 mday             1.40 
 43 mike             1.2  #include <cstring>
 44                       #include "Monitor.h"
 45                       #include "MessageQueue.h"
 46                       #include "Socket.h"
 47 kumpf            1.4  #include <Pegasus/Common/Tracer.h>
 48 mday             1.7  #include <Pegasus/Common/HTTPConnection.h>
 49 kumpf            1.69 #include <Pegasus/Common/MessageQueueService.h>
 50 a.arora          1.73 #include <Pegasus/Common/Exception.h>
 51 mike             1.2  
 52                       #ifdef PEGASUS_OS_TYPE_WINDOWS
 53                       # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 54                       #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 55                       of <winsock.h>. It may have been indirectly included (e.g., by including \
 56 david.dillard    1.95 <windows.h>). Find inclusion of that header which is visible to this \
 57 mike             1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 58                       otherwise, less than 64 clients (the default) will be able to connect to the \
 59                       CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 60 mday             1.5  
 61 mike             1.2  # endif
 62                       # define FD_SETSIZE 1024
 63 mday             1.5  # include <windows.h>
 64 mike             1.2  #else
 65                       # include <sys/types.h>
 66                       # include <sys/socket.h>
 67                       # include <sys/time.h>
 68                       # include <netinet/in.h>
 69                       # include <netdb.h>
 70                       # include <arpa/inet.h>
 71                       #endif
 72                       
 73                       PEGASUS_USING_STD;
 74                       
 75                       PEGASUS_NAMESPACE_BEGIN
 76                       
 77 kumpf            1.86 // Define a platform-neutral socket length type
 78 gs.keenan        1.91 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_OS_VMS)
 79 kumpf            1.86 typedef size_t PEGASUS_SOCKLEN_T;
 80                       #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
 81                       typedef socklen_t PEGASUS_SOCKLEN_T;
 82                       #else
 83                       typedef int PEGASUS_SOCKLEN_T;
 84                       #endif
 85 mday             1.18 
 86 mike             1.96 static AtomicInt _connections(0);
 87 mday             1.25 
 88 mike             1.2  ////////////////////////////////////////////////////////////////////////////////
 89                       //
 90                       // Monitor
 91                       //
 92                       ////////////////////////////////////////////////////////////////////////////////
 93                       
 94 kumpf            1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 95 mike             1.2  Monitor::Monitor()
 96 kumpf            1.87    : _stopConnections(0),
 97 a.arora          1.73      _stopConnectionsSem(0),
 98 a.dunfey         1.76      _solicitSocketCount(0),
 99 a.dunfey         1.77      _tickle_client_socket(-1),
100                            _tickle_server_socket(-1),
101                            _tickle_peer_socket(-1)
102 mike             1.2  {
103 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
104 mike             1.2      Socket::initializeInterface();
105 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
106 a.arora          1.73 
107                           // setup the tickler
108                           initializeTickler();
109                       
110 r.kieninger      1.83     // Start the count at 1 because initilizeTickler()
111 a.arora          1.73     // has added an entry in the first position of the
112                           // _entries array
113                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
114 mday             1.37     {
115                              _MonitorEntry entry(0, 0, 0);
116                              _entries.append(entry);
117                           }
118 mday             1.18 }
119 mday             1.20 
120 mike             1.2  Monitor::~Monitor()
121                       {
122 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
123 a.dunfey         1.76 
124                           try{
125 a.dunfey         1.77         if(_tickle_peer_socket >= 0)
126 a.dunfey         1.76         {
127                                   Socket::close(_tickle_peer_socket);
128                               }
129 a.dunfey         1.77         if(_tickle_client_socket >= 0)
130 a.dunfey         1.76         {
131                                   Socket::close(_tickle_client_socket);
132                               }
133 a.dunfey         1.77         if(_tickle_server_socket >= 0)
134 a.dunfey         1.76         {
135                                   Socket::close(_tickle_server_socket);
136                               }
137                           }
138                           catch(...)
139                           {
140                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
141                                         "Failed to close tickle sockets");
142                           }
143                       
144 mike             1.2      Socket::uninitializeInterface();
145 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
146                                         "returning from monitor destructor");
147 mday             1.18 }
148                       
149 a.arora          1.73 void Monitor::initializeTickler(){
150 r.kieninger      1.83     /*
151                              NOTE: On any errors trying to
152                                    setup out tickle connection,
153 a.arora          1.73              throw an exception/end the server
154                           */
155                       
156                           /* setup the tickle server/listener */
157                       
158                           // get a socket for the server side
159 david.dillard    1.95     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
160 a.arora          1.73 	//handle error
161                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
162                       				 "Received error number $0 while creating the internal socket.",
163                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
164                       				 errno);
165                       #else
166                       				 WSAGetLastError());
167                       #endif
168                       	throw Exception(parms);
169                           }
170                       
171                           // initialize the address
172                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
173                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
174                       #pragma convert(37)
175                       #endif
176                           _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
177                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
178                       #pragma convert(0)
179                       #endif
180                           _tickle_server_addr.sin_family = PF_INET;
181 a.arora          1.73     _tickle_server_addr.sin_port = 0;
182                       
183 kumpf            1.86     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
184 a.arora          1.73 
185                           // bind server side to socket
186                           if((::bind(_tickle_server_socket,
187 kumpf            1.88                reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
188 a.arora          1.73 	       sizeof(_tickle_server_addr))) < 0){
189                       	// handle error
190 r.kieninger      1.83 #ifdef PEGASUS_OS_ZOS
191                           MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
192                       				 "Received error:$0 while binding the internal socket.",strerror(errno));
193                       #else
194 a.arora          1.73 	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
195                       				 "Received error number $0 while binding the internal socket.",
196                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
197                       				 errno);
198                       #else
199                       				 WSAGetLastError());
200                       #endif
201 r.kieninger      1.83 #endif
202 a.arora          1.73         throw Exception(parms);
203                           }
204                       
205                           // tell the kernel we are a server
206                           if((::listen(_tickle_server_socket,3)) < 0){
207                       	// handle error
208                       	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
209                       			 "Received error number $0 while listening to the internal socket.",
210                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
211                       				 errno);
212                       #else
213                       				 WSAGetLastError());
214                       #endif
215                       	throw Exception(parms);
216                           }
217 r.kieninger      1.83 
218 a.arora          1.73     // make sure we have the correct socket for our server
219                           int sock = ::getsockname(_tickle_server_socket,
220 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
221                                          &_addr_size);
222 a.arora          1.73     if(sock < 0){
223                       	// handle error
224                       	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
225                       			 "Received error number $0 while getting the internal socket name.",
226                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
227                       				 errno);
228                       #else
229                       				 WSAGetLastError());
230                       #endif
231                       	throw Exception(parms);
232                           }
233                       
234                           /* set up the tickle client/connector */
235 r.kieninger      1.83 
236 a.arora          1.73     // get a socket for our tickle client
237 david.dillard    1.95     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
238 a.arora          1.73 	// handle error
239                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
240                       			 "Received error number $0 while creating the internal client socket.",
241                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
242                       				 errno);
243                       #else
244                       				 WSAGetLastError());
245                       #endif
246                       	throw Exception(parms);
247                           }
248                       
249                           // setup the address of the client
250                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
251                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
252                       #pragma convert(37)
253                       #endif
254                           _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
255                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
256                       #pragma convert(0)
257                       #endif
258                           _tickle_client_addr.sin_family = PF_INET;
259 a.arora          1.73     _tickle_client_addr.sin_port = 0;
260                       
261                           // bind socket to client side
262                           if((::bind(_tickle_client_socket,
263 kumpf            1.88                reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
264 a.arora          1.73 	       sizeof(_tickle_client_addr))) < 0){
265                       	// handle error
266                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
267                       			 "Received error number $0 while binding the internal client socket.",
268                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
269                       				 errno);
270                       #else
271                       				 WSAGetLastError());
272                       #endif
273                       	throw Exception(parms);
274                           }
275                       
276                           // connect to server side
277                           if((::connect(_tickle_client_socket,
278 kumpf            1.88                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
279 a.arora          1.73 		  sizeof(_tickle_server_addr))) < 0){
280                       	// handle error
281                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
282                       			 "Received error number $0 while connecting the internal client socket.",
283                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
284                       				 errno);
285                       #else
286                       				 WSAGetLastError());
287                       #endif
288                       	throw Exception(parms);
289                           }
290                       
291                           /* set up the slave connection */
292                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
293 kumpf            1.86     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
294 r.kieninger      1.83     pegasus_sleep(1);
295 a.arora          1.73 
296                           // this call may fail, we will try a max of 20 times to establish this peer connection
297                           if((_tickle_peer_socket = ::accept(_tickle_server_socket,
298 kumpf            1.88             reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
299                                   &peer_size)) < 0){
300 a.arora          1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
301                               // Only retry on non-windows platforms.
302                               if(_tickle_peer_socket == -1 && errno == EAGAIN)
303                               {
304 r.kieninger      1.83           int retries = 0;
305 a.arora          1.73           do
306                                 {
307                                   pegasus_sleep(1);
308                                   _tickle_peer_socket = ::accept(_tickle_server_socket,
309 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
310                                       &peer_size);
311 a.arora          1.73             retries++;
312                                 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
313                               }
314                       #endif
315                           }
316                           if(_tickle_peer_socket == -1){
317                       	// handle error
318                       	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
319                       			 "Received error number $0 while accepting the internal socket connection.",
320                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
321                       				 errno);
322                       #else
323                       				 WSAGetLastError());
324                       #endif
325                       	throw Exception(parms);
326                           }
327                           // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
328                           // checks entries with IDLE state for events
329                           _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
330                           entry._status = _MonitorEntry::IDLE;
331                           _entries.append(entry);
332 a.arora          1.73 }
333                       
334                       void Monitor::tickle(void)
335                       {
336 sushma.fernandes 1.78     static char _buffer[] =
337 a.arora          1.73     {
338                             '0','0'
339                           };
340 r.kieninger      1.83 
341 sushma.fernandes 1.78     AutoMutex autoMutex(_tickle_mutex);
342 r.kieninger      1.83     Socket::disableBlocking(_tickle_client_socket);
343 sushma.fernandes 1.78     Socket::write(_tickle_client_socket,&_buffer, 2);
344 r.kieninger      1.83     Socket::enableBlocking(_tickle_client_socket);
345 sushma.fernandes 1.78 }
346                       
347                       void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
348                       {
349                           // Set the state to requested state
350                           _entries[index]._status = status;
351 a.arora          1.73 }
352                       
353 mike             1.2  Boolean Monitor::run(Uint32 milliseconds)
354                       {
355 mday             1.18 
356 mday             1.25     Boolean handled_events = false;
357 a.arora          1.73     int i = 0;
358 r.kieninger      1.83 
359 kumpf            1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
360 a.arora          1.73 
361 mday             1.25     fd_set fdread;
362                           FD_ZERO(&fdread);
363 a.arora          1.73 
364 kumpf            1.94     AutoMutex autoEntryMutex(_entry_mut);
365 r.kieninger      1.83 
366                           // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
367 mike             1.96     if (_stopConnections.get() == 1)
368 kumpf            1.48     {
369                               for ( int indx = 0; indx < (int)_entries.size(); indx++)
370                               {
371                                   if (_entries[indx]._type == Monitor::ACCEPTOR)
372                                   {
373 mike             1.96                 if ( _entries[indx]._status.get() != _MonitorEntry::EMPTY)
374 kumpf            1.48                 {
375 mike             1.96                    if ( _entries[indx]._status.get() == _MonitorEntry::IDLE ||
376                                               _entries[indx]._status.get() == _MonitorEntry::DYING )
377 kumpf            1.48                    {
378                                              // remove the entry
379                       		       _entries[indx]._status = _MonitorEntry::EMPTY;
380                                          }
381                                          else
382                                          {
383                                              // set status to DYING
384 kumpf            1.52                       _entries[indx]._status = _MonitorEntry::DYING;
385 kumpf            1.48                    }
386                                      }
387                                  }
388                               }
389                               _stopConnections = 0;
390 a.arora          1.73 	_stopConnectionsSem.signal();
391 kumpf            1.48     }
392 kumpf            1.51 
393 kumpf            1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
394                           {
395 brian.campbell   1.80 			 const _MonitorEntry &entry = _entries[indx];
396 mike             1.96        if ((entry._status.get() == _MonitorEntry::DYING) &&
397 brian.campbell   1.80 					 (entry._type == Monitor::CONNECTION))
398 kumpf            1.68        {
399 brian.campbell   1.80           MessageQueue *q = MessageQueue::lookup(entry.queueId);
400 kumpf            1.68           PEGASUS_ASSERT(q != 0);
401 brian.campbell   1.80           HTTPConnection &h = *static_cast<HTTPConnection *>(q);
402 r.kieninger      1.83 
403 brian.campbell   1.80 					if (h._connectionClosePending == false)
404                       						continue;
405                       
406                       					// NOTE: do not attempt to delete while there are pending responses
407 r.kieninger      1.83 					// coming thru. The last response to come thru after a
408 brian.campbell   1.80 					// _connectionClosePending will reset _responsePending to false
409                       					// and then cause the monitor to rerun this code and clean up.
410                       					// (see HTTPConnection.cpp)
411                       
412                       					if (h._responsePending == true)
413                       					{
414                       						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
415                       													"Ignoring connection delete request because "
416                       													"responses are still pending. "
417 r.kieninger      1.83 													"connection=0x%p, socket=%d\n",
418 brian.campbell   1.81 													(void *)&h, h.getSocket());
419 brian.campbell   1.80 						continue;
420                       					}
421                       					h._connectionClosePending = false;
422                                 MessageQueue &o = h.get_owner();
423                                 Message* message= new CloseConnectionMessage(entry.socket);
424 kumpf            1.68           message->dest = o.getQueueId();
425                       
426 r.kieninger      1.83           // HTTPAcceptor is responsible for closing the connection.
427 kumpf            1.68           // The lock is released to allow HTTPAcceptor to call
428 r.kieninger      1.83           // unsolicitSocketMessages to free the entry.
429 kumpf            1.68           // Once HTTPAcceptor completes processing of the close
430                                 // connection, the lock is re-requested and processing of
431                                 // the for loop continues.  This is safe with the current
432                                 // implementation of the _entries object.  Note that the
433                                 // loop condition accesses the _entries.size() on each
434                                 // iteration, so that a change in size while the mutex is
435                                 // unlocked will not result in an ArrayIndexOutOfBounds
436                                 // exception.
437                       
438 kumpf            1.94           autoEntryMutex.unlock();
439 kumpf            1.68           o.enqueue(message);
440 kumpf            1.94           autoEntryMutex.lock();
441 kumpf            1.68        }
442                           }
443                       
444 kumpf            1.51     Uint32 _idleEntries = 0;
445 r.kieninger      1.83 
446 a.arora          1.73     /*
447 david.dillard    1.95         We will keep track of the maximum socket number and pass this value
448                               to the kernel as a parameter to SELECT.  This loop seems like a good
449                               place to calculate the max file descriptor (maximum socket number)
450                               because we have to traverse the entire array.
451 r.kieninger      1.83     */
452 david.dillard    1.95     PEGASUS_SOCKET maxSocketCurrentPass = 0;
453 mday             1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
454 mike             1.2      {
455 a.arora          1.73        if(maxSocketCurrentPass < _entries[indx].socket)
456 david.dillard    1.95             maxSocketCurrentPass = _entries[indx].socket;
457 a.arora          1.73 
458 mike             1.96        if(_entries[indx]._status.get() == _MonitorEntry::IDLE)
459 mday             1.25        {
460 david.dillard    1.95            _idleEntries++;
461                                  FD_SET(_entries[indx].socket, &fdread);
462 mday             1.25        }
463 mday             1.13     }
464 s.hills          1.62 
465 a.arora          1.73     /*
466 david.dillard    1.95         Add 1 then assign maxSocket accordingly. We add 1 to account for
467                               descriptors starting at 0.
468 a.arora          1.73     */
469                           maxSocketCurrentPass++;
470                       
471 kumpf            1.94     autoEntryMutex.unlock();
472 david.dillard    1.95 
473                           //
474                           // The first argument to select() is ignored on Windows and it is not
475                           // a socket value.  The original code assumed that the number of sockets
476                           // and a socket value have the same type.  On Windows they do not.
477                           //
478                       #ifdef PEGASUS_OS_TYPE_WINDOWS
479                           int events = select(0, &fdread, NULL, NULL, &tv);
480                       #else
481 a.arora          1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
482 david.dillard    1.95 #endif
483 kumpf            1.94     autoEntryMutex.lock();
484 mday             1.25 
485 mike             1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
486 kumpf            1.50     if(events == SOCKET_ERROR)
487 mike             1.2  #else
488 kumpf            1.50     if(events == -1)
489 mike             1.2  #endif
490 mday             1.13     {
491 kumpf            1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
492                                 "Monitor::run - errorno = %d has occurred on select.", errno);
493                              // The EBADF error indicates that one or more or the file
494                              // descriptions was not valid. This could indicate that
495                              // the _entries structure has been corrupted or that
496                              // we have a synchronization error.
497                       
498                              PEGASUS_ASSERT(errno != EBADF);
499                           }
500                           else if (events)
501                           {
502 kumpf            1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
503 r.kieninger      1.83           "Monitor::run select event received events = %d, monitoring %d idle entries",
504 david.dillard    1.95            events, _idleEntries);
505 mday             1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
506                              {
507 kumpf            1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
508                                 // owned by the Monitor).
509 mike             1.96           if((_entries[indx]._status.get() == _MonitorEntry::IDLE) &&
510 david.dillard    1.95              (FD_ISSET(_entries[indx].socket, &fdread)))
511                                 {
512                                    MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
513 kumpf            1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
514                                         "Monitor::run indx = %d, queueId =  %d, q = %p",
515                                         indx, _entries[indx].queueId, q);
516                                    PEGASUS_ASSERT(q !=0);
517 mday             1.37 
518 david.dillard    1.95              try
519                                    {
520                                       if(_entries[indx]._type == Monitor::CONNECTION)
521                                       {
522 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
523                                            "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
524 david.dillard    1.95                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
525 sushma.fernandes 1.78 
526                                          // Do not update the entry just yet. The entry gets updated once
527 r.kieninger      1.83                    // the request has been read.
528 david.dillard    1.95                    //_entries[indx]._status = _MonitorEntry::BUSY;
529 sushma.fernandes 1.78 
530 kumpf            1.66                    // If allocate_and_awaken failure, retry on next iteration
531 a.arora          1.73 /* Removed for PEP 183.
532 kumpf            1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
533                                                  (void *)q, _dispatch))
534 kumpf            1.67                    {
535                                             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
536                                                 "Monitor::run: Insufficient resources to process request.");
537                                             _entries[indx]._status = _MonitorEntry::IDLE;
538                                             return true;
539                                          }
540 a.arora          1.73 */
541                       // Added for PEP 183
542 david.dillard    1.95                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
543                                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
544 a.arora          1.73                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
545                                          dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
546                                          try
547                                          {
548                                              dst->run(1);
549                                          }
550 david.dillard    1.95                    catch (...)
551                                          {
552                                              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
553                                              "Monitor::_dispatch: exception received");
554                                          }
555                                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
556 a.arora          1.73                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
557                       
558 sushma.fernandes 1.78                    // It is possible the entry status may not be set to busy.
559 r.kieninger      1.83                    // The following will fail in that case.
560 mike             1.96    		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
561 a.arora          1.73 		   // Once the HTTPConnection thread has set the status value to either
562                       		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
563                       		   // to the Monitor.  It is no longer permissible to access the connection
564                       		   // or the entry in the _entries table.
565 sushma.fernandes 1.78 
566                                          // The following is not relevant as the worker thread or the
567                                          // reader thread will update the status of the entry.
568                       		   //if (dst->_connectionClosePending)
569 r.kieninger      1.83 		   //{
570 sushma.fernandes 1.78 		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
571                       		   //}
572                       		   //else
573                       		   //{
574                       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
575 r.kieninger      1.83 		   //}
576                       // end Added for PEP 183
577 a.arora          1.73 		}
578                       	        else if( _entries[indx]._type == Monitor::INTERNAL){
579 r.kieninger      1.83 			// set ourself to BUSY,
580                                               // read the data
581 a.arora          1.73                         // and set ourself back to IDLE
582 r.kieninger      1.83 
583 mike             1.96 		   	_entries[indx]._status.get() == _MonitorEntry::BUSY;
584 a.arora          1.73 			static char buffer[2];
585                             			Socket::disableBlocking(_entries[indx].socket);
586                             			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
587                             			Socket::enableBlocking(_entries[indx].socket);
588 mike             1.96 			_entries[indx]._status.get() == _MonitorEntry::IDLE;
589 mday             1.37 		}
590                       		else
591 mday             1.25 		{
592 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
593                                            "Non-connection entry, indx = %d, has been received.", indx);
594 mday             1.37 		   int events = 0;
595                       		   events |= SocketMessage::READ;
596                       		   Message *msg = new SocketMessage(_entries[indx].socket, events);
597                       		   _entries[indx]._status = _MonitorEntry::BUSY;
598 kumpf            1.94                    autoEntryMutex.unlock();
599 mday             1.37 		   q->enqueue(msg);
600 kumpf            1.94                    autoEntryMutex.lock();
601 mday             1.37 		   _entries[indx]._status = _MonitorEntry::IDLE;
602 kumpf            1.94 
603 mday             1.25 		   return true;
604                       		}
605                       	     }
606 mday             1.37 	     catch(...)
607 mday             1.25 	     {
608                       	     }
609                       	     handled_events = true;
610                       	  }
611                              }
612 mday             1.24     }
613 kumpf            1.94 
614 mday             1.13     return(handled_events);
615 mike             1.2  }
616                       
617 chuck            1.74 void Monitor::stopListeningForConnections(Boolean wait)
618 kumpf            1.48 {
619                           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
620 r.kieninger      1.83     // set boolean then tickle the server to recognize _stopConnections
621 kumpf            1.48     _stopConnections = 1;
622 a.arora          1.73     tickle();
623 kumpf            1.48 
624 chuck            1.74     if (wait)
625 a.arora          1.73     {
626 chuck            1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
627                             // caller of this function may unbind the ports while the monitor
628                             // is still accepting connections on them.
629                             try
630                       	{
631                       	  _stopConnectionsSem.time_wait(10000);
632                       	}
633                             catch (TimeOut &)
634                       	{
635                       	  // The monitor is probably busy processng a very long request, and is
636                       	  // not accepting connections.  Let the caller unbind the ports.
637                       	}
638 a.arora          1.73     }
639 r.kieninger      1.83 
640 kumpf            1.48     PEG_METHOD_EXIT();
641                       }
642 mday             1.25 
643 mday             1.37 
644 mday             1.25 int  Monitor::solicitSocketMessages(
645 david.dillard    1.95     PEGASUS_SOCKET socket,
646 mike             1.2      Uint32 events,
647 r.kieninger      1.83     Uint32 queueId,
648 mday             1.8      int type)
649 mike             1.2  {
650 r.kieninger      1.83    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
651 alagaraja        1.75    AutoMutex autoMut(_entry_mut);
652 a.arora          1.73    // Check to see if we need to dynamically grow the _entries array
653                          // We always want the _entries array to 2 bigger than the
654                          // current connections requested
655                          _solicitSocketCount++;  // bump the count
656                          int size = (int)_entries.size();
657 w.otsuka         1.89    if((int)_solicitSocketCount >= (size-1)){
658                               for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
659 a.arora          1.73                 _MonitorEntry entry(0, 0, 0);
660                                       _entries.append(entry);
661                               }
662                          }
663 kumpf            1.4  
664 a.arora          1.73    int index;
665                          for(index = 1; index < (int)_entries.size(); index++)
666 mday             1.25    {
667 a.arora          1.73       try
668 mday             1.37       {
669 mike             1.96          if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
670 a.arora          1.73          {
671                                   _entries[index].socket = socket;
672                                   _entries[index].queueId  = queueId;
673                                   _entries[index]._type = type;
674                                   _entries[index]._status = _MonitorEntry::IDLE;
675 r.kieninger      1.83 
676 a.arora          1.73             return index;
677                                }
678 mday             1.37       }
679                             catch(...)
680 mday             1.25       {
681                             }
682                          }
683 a.arora          1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
684 mday             1.25    PEG_METHOD_EXIT();
685 kumpf            1.50    return -1;
686 a.arora          1.73 
687 mike             1.2  }
688                       
689 david.dillard    1.95 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
690 mike             1.2  {
691 kumpf            1.50 
692 mday             1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
693 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
694 a.arora          1.73 
695                           /*
696                               Start at index = 1 because _entries[0] is the tickle entry which never needs
697                               to be EMPTY;
698                           */
699 w.otsuka         1.89     unsigned int index;
700 a.arora          1.73     for(index = 1; index < _entries.size(); index++)
701 mike             1.2      {
702 mday             1.25        if(_entries[index].socket == socket)
703                              {
704 a.arora          1.73           _entries[index]._status = _MonitorEntry::EMPTY;
705 david.dillard    1.95           _entries[index].socket = PEGASUS_INVALID_SOCKET;
706 a.arora          1.73           _solicitSocketCount--;
707                                 break;
708 mday             1.25        }
709 mike             1.2      }
710 a.arora          1.73 
711                           /*
712                       	Dynamic Contraction:
713                       	To remove excess entries we will start from the end of the _entries array
714                       	and remove all entries with EMPTY status until we find the first NON EMPTY.
715                       	This prevents the positions, of the NON EMPTY entries, from being changed.
716 r.kieninger      1.83     */
717 a.arora          1.73     index = _entries.size() - 1;
718 mike             1.96     while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
719 a.arora          1.73 	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
720                                       _entries.remove(index);
721                       	index--;
722                           }
723 kumpf            1.4      PEG_METHOD_EXIT();
724 mike             1.2  }
725                       
726 a.arora          1.73 // Note: this is no longer called with PEP 183.
727 mday             1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
728                       {
729 mday             1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
730 kumpf            1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
731 kumpf            1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
732                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
733 kumpf            1.51    try
734                          {
735                             dst->run(1);
736                          }
737                          catch (...)
738                          {
739                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
740                                 "Monitor::_dispatch: exception received");
741                          }
742                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
743                                 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
744 r.kieninger      1.83 
745 mike             1.96    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
746 kumpf            1.68 
747                          // Once the HTTPConnection thread has set the status value to either
748                          // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
749                          // to the Monitor.  It is no longer permissible to access the connection
750                          // or the entry in the _entries table.
751 kumpf            1.50    if (dst->_connectionClosePending)
752                          {
753 kumpf            1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
754                          }
755                          else
756                          {
757                             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
758 kumpf            1.50    }
759 mday             1.8     return 0;
760 mday             1.40 }
761                       
762 mike             1.2  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2