(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.90 //%2005////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 mike  1.2  //
 12            // Permission is hereby granted, free of charge, to any person obtaining a copy
 13 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
 14            // deal in the Software without restriction, including without limitation the
 15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 16 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 17            // furnished to do so, subject to the following conditions:
 18 karl  1.90 // 
 19 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 20 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 21            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 22 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 23            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 24            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 25 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 26            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27            //
 28            //==============================================================================
 29            //
 30            // Author: Mike Brasher (mbrasher@bmc.com)
 31            //
 32 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 33 a.arora     1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
 34 alagaraja   1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 35 sushma.fernandes 1.78 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 36 joyce.j          1.84 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 37 kumpf            1.85 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 38 mike             1.2  //
 39                       //%/////////////////////////////////////////////////////////////////////////////
 40                       
 41                       #include <Pegasus/Common/Config.h>
 42 mday             1.40 
 43 mike             1.2  #include <cstring>
 44                       #include "Monitor.h"
 45                       #include "MessageQueue.h"
 46                       #include "Socket.h"
 47 kumpf            1.4  #include <Pegasus/Common/Tracer.h>
 48 mday             1.7  #include <Pegasus/Common/HTTPConnection.h>
 49 kumpf            1.69 #include <Pegasus/Common/MessageQueueService.h>
 50 a.arora          1.73 #include <Pegasus/Common/Exception.h>
 51 mike             1.2  
 52                       #ifdef PEGASUS_OS_TYPE_WINDOWS
 53                       # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 54                       #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 55                       of <winsock.h>. It may have been indirectly included (e.g., by including \
 56 mday             1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
 57 mike             1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 58                       otherwise, less than 64 clients (the default) will be able to connect to the \
 59                       CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 60 mday             1.5  
 61 mike             1.2  # endif
 62                       # define FD_SETSIZE 1024
 63 mday             1.5  # include <windows.h>
 64 mike             1.2  #else
 65                       # include <sys/types.h>
 66                       # include <sys/socket.h>
 67                       # include <sys/time.h>
 68                       # include <netinet/in.h>
 69                       # include <netdb.h>
 70                       # include <arpa/inet.h>
 71                       #endif
 72                       
 73                       PEGASUS_USING_STD;
 74                       
 75                       PEGASUS_NAMESPACE_BEGIN
 76                       
 77 kumpf            1.86 // Define a platform-neutral socket length type
 78 gs.keenan        1.91 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_OS_VMS)
 79 kumpf            1.86 typedef size_t PEGASUS_SOCKLEN_T;
 80                       #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
 81                       typedef socklen_t PEGASUS_SOCKLEN_T;
 82                       #else
 83                       typedef int PEGASUS_SOCKLEN_T;
 84                       #endif
 85 mday             1.18 
 86 mday             1.25 static AtomicInt _connections = 0;
 87                       
 88 mike             1.2  ////////////////////////////////////////////////////////////////////////////////
 89                       //
 90                       // Monitor
 91                       //
 92                       ////////////////////////////////////////////////////////////////////////////////
 93                       
 94 kumpf            1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 95 mike             1.2  Monitor::Monitor()
 96 kumpf            1.87    : _stopConnections(0),
 97 a.arora          1.73      _stopConnectionsSem(0),
 98 a.dunfey         1.76      _solicitSocketCount(0),
 99 a.dunfey         1.77      _tickle_client_socket(-1),
100                            _tickle_server_socket(-1),
101                            _tickle_peer_socket(-1)
102 mike             1.2  {
103 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
104 mike             1.2      Socket::initializeInterface();
105 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
106 a.arora          1.73 
107                           // setup the tickler
108                           initializeTickler();
109                       
110 r.kieninger      1.83     // Start the count at 1 because initilizeTickler()
111 a.arora          1.73     // has added an entry in the first position of the
112                           // _entries array
113                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
114 mday             1.37     {
115                              _MonitorEntry entry(0, 0, 0);
116                              _entries.append(entry);
117                           }
118 mday             1.18 }
119 mday             1.20 
120 mike             1.2  Monitor::~Monitor()
121                       {
122 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
123 a.dunfey         1.76 
124                           try{
125 a.dunfey         1.77         if(_tickle_peer_socket >= 0)
126 a.dunfey         1.76         {
127                                   Socket::close(_tickle_peer_socket);
128                               }
129 a.dunfey         1.77         if(_tickle_client_socket >= 0)
130 a.dunfey         1.76         {
131                                   Socket::close(_tickle_client_socket);
132                               }
133 a.dunfey         1.77         if(_tickle_server_socket >= 0)
134 a.dunfey         1.76         {
135                                   Socket::close(_tickle_server_socket);
136                               }
137                           }
138                           catch(...)
139                           {
140                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
141                                         "Failed to close tickle sockets");
142                           }
143                       
144 mike             1.2      Socket::uninitializeInterface();
145 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
146                                         "returning from monitor destructor");
147 mday             1.18 }
148                       
149 a.arora          1.73 void Monitor::initializeTickler(){
150 r.kieninger      1.83     /*
151                              NOTE: On any errors trying to
152                                    setup out tickle connection,
153 a.arora          1.73              throw an exception/end the server
154                           */
155                       
156                           /* setup the tickle server/listener */
157                       
158                           // get a socket for the server side
159                           if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
160                       	//handle error
161                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
162                       				 "Received error number $0 while creating the internal socket.",
163                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
164                       				 errno);
165                       #else
166                       				 WSAGetLastError());
167                       #endif
168                       	throw Exception(parms);
169                           }
170                       
171                           // initialize the address
172                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
173                       #ifdef PEGASUS_OS_ZOS
174 a.arora          1.73     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
175                       #else
176                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
177                       #pragma convert(37)
178                       #endif
179                           _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
180                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
181                       #pragma convert(0)
182                       #endif
183                       #endif
184                           _tickle_server_addr.sin_family = PF_INET;
185                           _tickle_server_addr.sin_port = 0;
186                       
187 kumpf            1.86     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
188 a.arora          1.73 
189                           // bind server side to socket
190                           if((::bind(_tickle_server_socket,
191 kumpf            1.88                reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
192 a.arora          1.73 	       sizeof(_tickle_server_addr))) < 0){
193                       	// handle error
194 r.kieninger      1.83 #ifdef PEGASUS_OS_ZOS
195                           MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
196                       				 "Received error:$0 while binding the internal socket.",strerror(errno));
197                       #else
198 a.arora          1.73 	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
199                       				 "Received error number $0 while binding the internal socket.",
200                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
201                       				 errno);
202                       #else
203                       				 WSAGetLastError());
204                       #endif
205 r.kieninger      1.83 #endif
206 a.arora          1.73         throw Exception(parms);
207                           }
208                       
209                           // tell the kernel we are a server
210                           if((::listen(_tickle_server_socket,3)) < 0){
211                       	// handle error
212                       	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
213                       			 "Received error number $0 while listening to the internal socket.",
214                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
215                       				 errno);
216                       #else
217                       				 WSAGetLastError());
218                       #endif
219                       	throw Exception(parms);
220                           }
221 r.kieninger      1.83 
222 a.arora          1.73     // make sure we have the correct socket for our server
223                           int sock = ::getsockname(_tickle_server_socket,
224 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
225                                          &_addr_size);
226 a.arora          1.73     if(sock < 0){
227                       	// handle error
228                       	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
229                       			 "Received error number $0 while getting the internal socket name.",
230                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
231                       				 errno);
232                       #else
233                       				 WSAGetLastError());
234                       #endif
235                       	throw Exception(parms);
236                           }
237                       
238                           /* set up the tickle client/connector */
239 r.kieninger      1.83 
240 a.arora          1.73     // get a socket for our tickle client
241                           if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
242                       	// handle error
243                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
244                       			 "Received error number $0 while creating the internal client socket.",
245                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
246                       				 errno);
247                       #else
248                       				 WSAGetLastError());
249                       #endif
250                       	throw Exception(parms);
251                           }
252                       
253                           // setup the address of the client
254                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
255                       #ifdef PEGASUS_OS_ZOS
256                           _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
257                       #else
258                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
259                       #pragma convert(37)
260                       #endif
261 a.arora          1.73     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
262                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
263                       #pragma convert(0)
264                       #endif
265                       #endif
266                           _tickle_client_addr.sin_family = PF_INET;
267                           _tickle_client_addr.sin_port = 0;
268                       
269                           // bind socket to client side
270                           if((::bind(_tickle_client_socket,
271 kumpf            1.88                reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
272 a.arora          1.73 	       sizeof(_tickle_client_addr))) < 0){
273                       	// handle error
274                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
275                       			 "Received error number $0 while binding the internal client socket.",
276                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
277                       				 errno);
278                       #else
279                       				 WSAGetLastError());
280                       #endif
281                       	throw Exception(parms);
282                           }
283                       
284                           // connect to server side
285                           if((::connect(_tickle_client_socket,
286 kumpf            1.88                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
287 a.arora          1.73 		  sizeof(_tickle_server_addr))) < 0){
288                       	// handle error
289                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
290                       			 "Received error number $0 while connecting the internal client socket.",
291                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
292                       				 errno);
293                       #else
294                       				 WSAGetLastError());
295                       #endif
296                       	throw Exception(parms);
297                           }
298                       
299                           /* set up the slave connection */
300                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
301 kumpf            1.86     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
302 r.kieninger      1.83     pegasus_sleep(1);
303 a.arora          1.73 
304                           // this call may fail, we will try a max of 20 times to establish this peer connection
305                           if((_tickle_peer_socket = ::accept(_tickle_server_socket,
306 kumpf            1.88             reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
307                                   &peer_size)) < 0){
308 a.arora          1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
309                               // Only retry on non-windows platforms.
310                               if(_tickle_peer_socket == -1 && errno == EAGAIN)
311                               {
312 r.kieninger      1.83           int retries = 0;
313 a.arora          1.73           do
314                                 {
315                                   pegasus_sleep(1);
316                                   _tickle_peer_socket = ::accept(_tickle_server_socket,
317 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
318                                       &peer_size);
319 a.arora          1.73             retries++;
320                                 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
321                               }
322                       #endif
323                           }
324                           if(_tickle_peer_socket == -1){
325                       	// handle error
326                       	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
327                       			 "Received error number $0 while accepting the internal socket connection.",
328                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
329                       				 errno);
330                       #else
331                       				 WSAGetLastError());
332                       #endif
333                       	throw Exception(parms);
334                           }
335                           // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
336                           // checks entries with IDLE state for events
337                           _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
338                           entry._status = _MonitorEntry::IDLE;
339                           _entries.append(entry);
340 a.arora          1.73 }
341                       
342                       void Monitor::tickle(void)
343                       {
344 sushma.fernandes 1.78     static char _buffer[] =
345 a.arora          1.73     {
346                             '0','0'
347                           };
348 r.kieninger      1.83 
349 sushma.fernandes 1.78     AutoMutex autoMutex(_tickle_mutex);
350 r.kieninger      1.83     Socket::disableBlocking(_tickle_client_socket);
351 sushma.fernandes 1.78     Socket::write(_tickle_client_socket,&_buffer, 2);
352 r.kieninger      1.83     Socket::enableBlocking(_tickle_client_socket);
353 sushma.fernandes 1.78 }
354                       
355                       void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
356                       {
357                           // Set the state to requested state
358                           _entries[index]._status = status;
359 a.arora          1.73 }
360                       
361 mike             1.2  Boolean Monitor::run(Uint32 milliseconds)
362                       {
363 mday             1.18 
364 mday             1.25     Boolean handled_events = false;
365 a.arora          1.73     int i = 0;
366 r.kieninger      1.83 
367 kumpf            1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
368 a.arora          1.73 
369 mday             1.25     fd_set fdread;
370                           FD_ZERO(&fdread);
371 a.arora          1.73 
372 kumpf            1.94     AutoMutex autoEntryMutex(_entry_mut);
373 r.kieninger      1.83 
374                           // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
375                           if (_stopConnections == 1)
376 kumpf            1.48     {
377                               for ( int indx = 0; indx < (int)_entries.size(); indx++)
378                               {
379                                   if (_entries[indx]._type == Monitor::ACCEPTOR)
380                                   {
381                                       if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
382                                       {
383                                          if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
384                                               _entries[indx]._status.value() == _MonitorEntry::DYING )
385                                          {
386                                              // remove the entry
387                       		       _entries[indx]._status = _MonitorEntry::EMPTY;
388                                          }
389                                          else
390                                          {
391                                              // set status to DYING
392 kumpf            1.52                       _entries[indx]._status = _MonitorEntry::DYING;
393 kumpf            1.48                    }
394                                      }
395                                  }
396                               }
397                               _stopConnections = 0;
398 a.arora          1.73 	_stopConnectionsSem.signal();
399 kumpf            1.48     }
400 kumpf            1.51 
401 kumpf            1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
402                           {
403 brian.campbell   1.80 			 const _MonitorEntry &entry = _entries[indx];
404                              if ((entry._status.value() == _MonitorEntry::DYING) &&
405                       					 (entry._type == Monitor::CONNECTION))
406 kumpf            1.68        {
407 brian.campbell   1.80           MessageQueue *q = MessageQueue::lookup(entry.queueId);
408 kumpf            1.68           PEGASUS_ASSERT(q != 0);
409 brian.campbell   1.80           HTTPConnection &h = *static_cast<HTTPConnection *>(q);
410 r.kieninger      1.83 
411 brian.campbell   1.80 					if (h._connectionClosePending == false)
412                       						continue;
413                       
414                       					// NOTE: do not attempt to delete while there are pending responses
415 r.kieninger      1.83 					// coming thru. The last response to come thru after a
416 brian.campbell   1.80 					// _connectionClosePending will reset _responsePending to false
417                       					// and then cause the monitor to rerun this code and clean up.
418                       					// (see HTTPConnection.cpp)
419                       
420                       					if (h._responsePending == true)
421                       					{
422                       						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
423                       													"Ignoring connection delete request because "
424                       													"responses are still pending. "
425 r.kieninger      1.83 													"connection=0x%p, socket=%d\n",
426 brian.campbell   1.81 													(void *)&h, h.getSocket());
427 brian.campbell   1.80 						continue;
428                       					}
429                       					h._connectionClosePending = false;
430                                 MessageQueue &o = h.get_owner();
431                                 Message* message= new CloseConnectionMessage(entry.socket);
432 kumpf            1.68           message->dest = o.getQueueId();
433                       
434 r.kieninger      1.83           // HTTPAcceptor is responsible for closing the connection.
435 kumpf            1.68           // The lock is released to allow HTTPAcceptor to call
436 r.kieninger      1.83           // unsolicitSocketMessages to free the entry.
437 kumpf            1.68           // Once HTTPAcceptor completes processing of the close
438                                 // connection, the lock is re-requested and processing of
439                                 // the for loop continues.  This is safe with the current
440                                 // implementation of the _entries object.  Note that the
441                                 // loop condition accesses the _entries.size() on each
442                                 // iteration, so that a change in size while the mutex is
443                                 // unlocked will not result in an ArrayIndexOutOfBounds
444                                 // exception.
445                       
446 kumpf            1.94           autoEntryMutex.unlock();
447 kumpf            1.68           o.enqueue(message);
448 kumpf            1.94           autoEntryMutex.lock();
449 kumpf            1.68        }
450                           }
451                       
452 kumpf            1.51     Uint32 _idleEntries = 0;
453 r.kieninger      1.83 
454 a.arora          1.73     /*
455                       	We will keep track of the maximum socket number and pass this value
456 r.kieninger      1.83 	to the kernel as a parameter to SELECT.  This loop seems like a good
457 a.arora          1.73 	place to calculate the max file descriptor (maximum socket number)
458                       	because we have to traverse the entire array.
459 r.kieninger      1.83     */
460 a.arora          1.73     int maxSocketCurrentPass = 0;
461 mday             1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
462 mike             1.2      {
463 a.arora          1.73        if(maxSocketCurrentPass < _entries[indx].socket)
464                       	  maxSocketCurrentPass = _entries[indx].socket;
465                       
466 mday             1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
467 mday             1.25        {
468 kumpf            1.51 	  _idleEntries++;
469 mday             1.25 	  FD_SET(_entries[indx].socket, &fdread);
470                              }
471 mday             1.13     }
472 s.hills          1.62 
473 a.arora          1.73     /*
474                       	Add 1 then assign maxSocket accordingly. We add 1 to account for
475                       	descriptors starting at 0.
476                           */
477                           maxSocketCurrentPass++;
478                       
479 kumpf            1.94     autoEntryMutex.unlock();
480 a.arora          1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
481 kumpf            1.94     autoEntryMutex.lock();
482 mday             1.25 
483 mike             1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
484 kumpf            1.50     if(events == SOCKET_ERROR)
485 mike             1.2  #else
486 kumpf            1.50     if(events == -1)
487 mike             1.2  #endif
488 mday             1.13     {
489 kumpf            1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
490                                 "Monitor::run - errorno = %d has occurred on select.", errno);
491                              // The EBADF error indicates that one or more or the file
492                              // descriptions was not valid. This could indicate that
493                              // the _entries structure has been corrupted or that
494                              // we have a synchronization error.
495                       
496                              PEGASUS_ASSERT(errno != EBADF);
497                           }
498                           else if (events)
499                           {
500 kumpf            1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
501 r.kieninger      1.83           "Monitor::run select event received events = %d, monitoring %d idle entries",
502 kumpf            1.51 	   events, _idleEntries);
503 mday             1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
504                              {
505 kumpf            1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
506                                 // owned by the Monitor).
507 r.kieninger      1.83 	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
508 kumpf            1.53 	     (FD_ISSET(_entries[indx].socket, &fdread)))
509 mday             1.25 	  {
510                       	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
511 kumpf            1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
512                                         "Monitor::run indx = %d, queueId =  %d, q = %p",
513                                         indx, _entries[indx].queueId, q);
514                                    PEGASUS_ASSERT(q !=0);
515 mday             1.37 
516 r.kieninger      1.83 	     try
517 mday             1.25 	     {
518 mday             1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
519                       		{
520 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
521                                            "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
522 mday             1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
523 sushma.fernandes 1.78 
524                                          // Do not update the entry just yet. The entry gets updated once
525 r.kieninger      1.83                    // the request has been read.
526 sushma.fernandes 1.78 		   //_entries[indx]._status = _MonitorEntry::BUSY;
527                       
528 kumpf            1.66                    // If allocate_and_awaken failure, retry on next iteration
529 a.arora          1.73 /* Removed for PEP 183.
530 kumpf            1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
531                                                  (void *)q, _dispatch))
532 kumpf            1.67                    {
533                                             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
534                                                 "Monitor::run: Insufficient resources to process request.");
535                                             _entries[indx]._status = _MonitorEntry::IDLE;
536                                             return true;
537                                          }
538 a.arora          1.73 */
539                       // Added for PEP 183
540                       		   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
541                         			 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
542                                                "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
543                                          dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
544                                          try
545                                          {
546                                              dst->run(1);
547                                          }
548                          		   catch (...)
549                          		   {
550                             			Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
551                                 		"Monitor::_dispatch: exception received");
552                          		   }
553                          		   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554                                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
555                       
556 sushma.fernandes 1.78                    // It is possible the entry status may not be set to busy.
557 r.kieninger      1.83                    // The following will fail in that case.
558                          		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
559 a.arora          1.73 		   // Once the HTTPConnection thread has set the status value to either
560                       		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
561                       		   // to the Monitor.  It is no longer permissible to access the connection
562                       		   // or the entry in the _entries table.
563 sushma.fernandes 1.78 
564                                          // The following is not relevant as the worker thread or the
565                                          // reader thread will update the status of the entry.
566                       		   //if (dst->_connectionClosePending)
567 r.kieninger      1.83 		   //{
568 sushma.fernandes 1.78 		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
569                       		   //}
570                       		   //else
571                       		   //{
572                       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
573 r.kieninger      1.83 		   //}
574                       // end Added for PEP 183
575 a.arora          1.73 		}
576                       	        else if( _entries[indx]._type == Monitor::INTERNAL){
577 r.kieninger      1.83 			// set ourself to BUSY,
578                                               // read the data
579 a.arora          1.73                         // and set ourself back to IDLE
580 r.kieninger      1.83 
581 a.arora          1.73 		   	_entries[indx]._status == _MonitorEntry::BUSY;
582                       			static char buffer[2];
583                             			Socket::disableBlocking(_entries[indx].socket);
584                             			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
585                             			Socket::enableBlocking(_entries[indx].socket);
586                       			_entries[indx]._status == _MonitorEntry::IDLE;
587 mday             1.37 		}
588                       		else
589 mday             1.25 		{
590 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
591                                            "Non-connection entry, indx = %d, has been received.", indx);
592 mday             1.37 		   int events = 0;
593                       		   events |= SocketMessage::READ;
594                       		   Message *msg = new SocketMessage(_entries[indx].socket, events);
595                       		   _entries[indx]._status = _MonitorEntry::BUSY;
596 kumpf            1.94                    autoEntryMutex.unlock();
597 mday             1.37 		   q->enqueue(msg);
598 kumpf            1.94                    autoEntryMutex.lock();
599 mday             1.37 		   _entries[indx]._status = _MonitorEntry::IDLE;
600 kumpf            1.94 
601 mday             1.25 		   return true;
602                       		}
603                       	     }
604 mday             1.37 	     catch(...)
605 mday             1.25 	     {
606                       	     }
607                       	     handled_events = true;
608                       	  }
609                              }
610 mday             1.24     }
611 kumpf            1.94 
612 mday             1.13     return(handled_events);
613 mike             1.2  }
614                       
615 chuck            1.74 void Monitor::stopListeningForConnections(Boolean wait)
616 kumpf            1.48 {
617                           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
618 r.kieninger      1.83     // set boolean then tickle the server to recognize _stopConnections
619 kumpf            1.48     _stopConnections = 1;
620 a.arora          1.73     tickle();
621 kumpf            1.48 
622 chuck            1.74     if (wait)
623 a.arora          1.73     {
624 chuck            1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
625                             // caller of this function may unbind the ports while the monitor
626                             // is still accepting connections on them.
627                             try
628                       	{
629                       	  _stopConnectionsSem.time_wait(10000);
630                       	}
631                             catch (TimeOut &)
632                       	{
633                       	  // The monitor is probably busy processng a very long request, and is
634                       	  // not accepting connections.  Let the caller unbind the ports.
635                       	}
636 a.arora          1.73     }
637 r.kieninger      1.83 
638 kumpf            1.48     PEG_METHOD_EXIT();
639                       }
640 mday             1.25 
641 mday             1.37 
642 mday             1.25 int  Monitor::solicitSocketMessages(
643 r.kieninger      1.83     Sint32 socket,
644 mike             1.2      Uint32 events,
645 r.kieninger      1.83     Uint32 queueId,
646 mday             1.8      int type)
647 mike             1.2  {
648 r.kieninger      1.83    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
649 alagaraja        1.75    AutoMutex autoMut(_entry_mut);
650 a.arora          1.73    // Check to see if we need to dynamically grow the _entries array
651                          // We always want the _entries array to 2 bigger than the
652                          // current connections requested
653                          _solicitSocketCount++;  // bump the count
654                          int size = (int)_entries.size();
655 w.otsuka         1.89    if((int)_solicitSocketCount >= (size-1)){
656                               for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
657 a.arora          1.73                 _MonitorEntry entry(0, 0, 0);
658                                       _entries.append(entry);
659                               }
660                          }
661 kumpf            1.4  
662 a.arora          1.73    int index;
663                          for(index = 1; index < (int)_entries.size(); index++)
664 mday             1.25    {
665 a.arora          1.73       try
666 mday             1.37       {
667 a.arora          1.73          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
668                                {
669                                   _entries[index].socket = socket;
670                                   _entries[index].queueId  = queueId;
671                                   _entries[index]._type = type;
672                                   _entries[index]._status = _MonitorEntry::IDLE;
673 r.kieninger      1.83 
674 a.arora          1.73             return index;
675                                }
676 mday             1.37       }
677                             catch(...)
678 mday             1.25       {
679                             }
680                          }
681 a.arora          1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
682 mday             1.25    PEG_METHOD_EXIT();
683 kumpf            1.50    return -1;
684 a.arora          1.73 
685 mike             1.2  }
686                       
687 mday             1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
688 mike             1.2  {
689 kumpf            1.50 
690 mday             1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
691 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
692 a.arora          1.73 
693                           /*
694                               Start at index = 1 because _entries[0] is the tickle entry which never needs
695                               to be EMPTY;
696                           */
697 w.otsuka         1.89     unsigned int index;
698 a.arora          1.73     for(index = 1; index < _entries.size(); index++)
699 mike             1.2      {
700 mday             1.25        if(_entries[index].socket == socket)
701                              {
702 a.arora          1.73           _entries[index]._status = _MonitorEntry::EMPTY;
703                                 _entries[index].socket = -1;
704                                 _solicitSocketCount--;
705                                 break;
706 mday             1.25        }
707 mike             1.2      }
708 a.arora          1.73 
709                           /*
710                       	Dynamic Contraction:
711                       	To remove excess entries we will start from the end of the _entries array
712                       	and remove all entries with EMPTY status until we find the first NON EMPTY.
713                       	This prevents the positions, of the NON EMPTY entries, from being changed.
714 r.kieninger      1.83     */
715 a.arora          1.73     index = _entries.size() - 1;
716                           while(_entries[index]._status == _MonitorEntry::EMPTY){
717                       	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
718                                       _entries.remove(index);
719                       	index--;
720                           }
721 kumpf            1.4      PEG_METHOD_EXIT();
722 mike             1.2  }
723                       
724 a.arora          1.73 // Note: this is no longer called with PEP 183.
725 mday             1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
726                       {
727 mday             1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
728 kumpf            1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
729 kumpf            1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
730                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
731 kumpf            1.51    try
732                          {
733                             dst->run(1);
734                          }
735                          catch (...)
736                          {
737                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
738                                 "Monitor::_dispatch: exception received");
739                          }
740                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
741                                 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
742 r.kieninger      1.83 
743 kumpf            1.53    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
744 kumpf            1.68 
745                          // Once the HTTPConnection thread has set the status value to either
746                          // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
747                          // to the Monitor.  It is no longer permissible to access the connection
748                          // or the entry in the _entries table.
749 kumpf            1.50    if (dst->_connectionClosePending)
750                          {
751 kumpf            1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
752                          }
753                          else
754                          {
755                             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
756 kumpf            1.50    }
757 mday             1.8     return 0;
758 mday             1.40 }
759                       
760 mike             1.2  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2