(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.79 //%2004////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 mike  1.2  //
 10            // Permission is hereby granted, free of charge, to any person obtaining a copy
 11 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
 12            // deal in the Software without restriction, including without limitation the
 13            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 14 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 15            // furnished to do so, subject to the following conditions:
 16 r.kieninger 1.83 //
 17 kumpf       1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 18 mike        1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 19                  // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 20 kumpf       1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 21                  // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 22                  // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 23 mike        1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 24                  // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 25                  //
 26                  //==============================================================================
 27                  //
 28                  // Author: Mike Brasher (mbrasher@bmc.com)
 29                  //
 30 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 31 a.arora     1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
 32 alagaraja   1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 33 sushma.fernandes 1.78 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 34 joyce.j          1.84 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 35 kumpf            1.85 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 36 mike             1.2  //
 37                       //%/////////////////////////////////////////////////////////////////////////////
 38                       
 39                       #include <Pegasus/Common/Config.h>
 40 mday             1.40 
 41 mike             1.2  #include <cstring>
 42                       #include "Monitor.h"
 43                       #include "MessageQueue.h"
 44                       #include "Socket.h"
 45 kumpf            1.4  #include <Pegasus/Common/Tracer.h>
 46 mday             1.7  #include <Pegasus/Common/HTTPConnection.h>
 47 kumpf            1.69 #include <Pegasus/Common/MessageQueueService.h>
 48 a.arora          1.73 #include <Pegasus/Common/Exception.h>
 49 mike             1.2  
 50                       #ifdef PEGASUS_OS_TYPE_WINDOWS
 51                       # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 52                       #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 53                       of <winsock.h>. It may have been indirectly included (e.g., by including \
 54 mday             1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
 55 mike             1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 56                       otherwise, less than 64 clients (the default) will be able to connect to the \
 57                       CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 58 mday             1.5  
 59 mike             1.2  # endif
 60                       # define FD_SETSIZE 1024
 61 mday             1.5  # include <windows.h>
 62 mike             1.2  #else
 63                       # include <sys/types.h>
 64                       # include <sys/socket.h>
 65                       # include <sys/time.h>
 66                       # include <netinet/in.h>
 67                       # include <netdb.h>
 68                       # include <arpa/inet.h>
 69                       #endif
 70                       
 71                       PEGASUS_USING_STD;
 72                       
 73                       PEGASUS_NAMESPACE_BEGIN
 74                       
 75 kumpf            1.86 // Define a platform-neutral socket length type
 76                       #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
 77                       typedef size_t PEGASUS_SOCKLEN_T;
 78                       #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
 79                       typedef socklen_t PEGASUS_SOCKLEN_T;
 80                       #else
 81                       typedef int PEGASUS_SOCKLEN_T;
 82                       #endif
 83 mday             1.18 
 84 mday             1.25 static AtomicInt _connections = 0;
 85                       
 86                       static struct timeval create_time = {0, 1};
 87 mday             1.38 static struct timeval destroy_time = {300, 0};
 88 mday             1.26 static struct timeval deadlock_time = {0, 0};
 89 mday             1.18 
 90 mike             1.2  ////////////////////////////////////////////////////////////////////////////////
 91                       //
 92                       // MonitorRep
 93                       //
 94                       ////////////////////////////////////////////////////////////////////////////////
 95                       
 96                       struct MonitorRep
 97                       {
 98                           fd_set rd_fd_set;
 99                           fd_set wr_fd_set;
100                           fd_set ex_fd_set;
101                           fd_set active_rd_fd_set;
102                           fd_set active_wr_fd_set;
103                           fd_set active_ex_fd_set;
104                       };
105                       
106                       ////////////////////////////////////////////////////////////////////////////////
107                       //
108                       // Monitor
109                       //
110                       ////////////////////////////////////////////////////////////////////////////////
111 mike             1.2  
112 kumpf            1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
113 mike             1.2  Monitor::Monitor()
114 r.kieninger      1.83    : _module_handle(0),
115 a.arora          1.73      _controller(0),
116                            _async(false),
117                            _stopConnections(0),
118                            _stopConnectionsSem(0),
119 a.dunfey         1.76      _solicitSocketCount(0),
120 a.dunfey         1.77      _tickle_client_socket(-1),
121                            _tickle_server_socket(-1),
122                            _tickle_peer_socket(-1)
123 mike             1.2  {
124 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
125 mike             1.2      Socket::initializeInterface();
126 mday             1.25     _rep = 0;
127 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
128 a.arora          1.73 
129                           // setup the tickler
130                           initializeTickler();
131 r.kieninger      1.83 
132                           // Start the count at 1 because initilizeTickler()
133                           // has added an entry in the first position of the
134 a.arora          1.73     // _entries array
135                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
136 mday             1.37     {
137                              _MonitorEntry entry(0, 0, 0);
138                              _entries.append(entry);
139                           }
140 mike             1.2  }
141                       
142 mday             1.18 Monitor::Monitor(Boolean async)
143 a.arora          1.73    : _module_handle(0),
144                            _controller(0),
145                            _async(async),
146                            _stopConnections(0),
147                            _stopConnectionsSem(0),
148 a.dunfey         1.76      _solicitSocketCount(0),
149 a.dunfey         1.77      _tickle_client_socket(-1),
150                            _tickle_server_socket(-1),
151                            _tickle_peer_socket(-1)
152 mday             1.18 {
153 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
154 mday             1.18     Socket::initializeInterface();
155 mday             1.25     _rep = 0;
156 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
157 a.arora          1.73 
158                           // setup the tickler
159                           initializeTickler();
160                       
161 r.kieninger      1.83     // Start the count at 1 because initilizeTickler()
162 a.arora          1.73     // has added an entry in the first position of the
163                           // _entries array
164                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
165 mday             1.37     {
166                              _MonitorEntry entry(0, 0, 0);
167                              _entries.append(entry);
168                           }
169 mday             1.18 }
170 mday             1.20 
171 mike             1.2  Monitor::~Monitor()
172                       {
173 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
174                                         "deregistering with module controller");
175 kumpf            1.10 
176 joyce.j          1.84     if(_module_handle.get() != NULL)
177 mday             1.8      {
178                              _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
179 joyce.j          1.84        _controller.reset();
180                              _module_handle.reset();
181 mday             1.8      }
182 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
183 kumpf            1.48 
184 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
185 a.dunfey         1.76 
186                           try{
187 a.dunfey         1.77         if(_tickle_peer_socket >= 0)
188 a.dunfey         1.76         {
189                                   Socket::close(_tickle_peer_socket);
190                               }
191 a.dunfey         1.77         if(_tickle_client_socket >= 0)
192 a.dunfey         1.76         {
193                                   Socket::close(_tickle_client_socket);
194                               }
195 a.dunfey         1.77         if(_tickle_server_socket >= 0)
196 a.dunfey         1.76         {
197                                   Socket::close(_tickle_server_socket);
198                               }
199                           }
200                           catch(...)
201                           {
202                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
203                                         "Failed to close tickle sockets");
204                           }
205                       
206 mike             1.2      Socket::uninitializeInterface();
207 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
208                                         "returning from monitor destructor");
209 mday             1.18 }
210                       
211 a.arora          1.73 void Monitor::initializeTickler(){
212 r.kieninger      1.83     /*
213                              NOTE: On any errors trying to
214                                    setup out tickle connection,
215 a.arora          1.73              throw an exception/end the server
216                           */
217                       
218                           /* setup the tickle server/listener */
219                       
220                           // get a socket for the server side
221                           if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
222                       	//handle error
223                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
224                       				 "Received error number $0 while creating the internal socket.",
225                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
226                       				 errno);
227                       #else
228                       				 WSAGetLastError());
229                       #endif
230                       	throw Exception(parms);
231                           }
232                       
233                           // initialize the address
234                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
235                       #ifdef PEGASUS_OS_ZOS
236 a.arora          1.73     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
237                       #else
238                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
239                       #pragma convert(37)
240                       #endif
241                           _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
242                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
243                       #pragma convert(0)
244                       #endif
245                       #endif
246                           _tickle_server_addr.sin_family = PF_INET;
247                           _tickle_server_addr.sin_port = 0;
248                       
249 kumpf            1.86     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
250 a.arora          1.73 
251                           // bind server side to socket
252                           if((::bind(_tickle_server_socket,
253 r.kieninger      1.83 	       (struct sockaddr *)&_tickle_server_addr,
254 a.arora          1.73 	       sizeof(_tickle_server_addr))) < 0){
255                       	// handle error
256 r.kieninger      1.83 #ifdef PEGASUS_OS_ZOS
257                           MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
258                       				 "Received error:$0 while binding the internal socket.",strerror(errno));
259                       #else
260 a.arora          1.73 	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
261                       				 "Received error number $0 while binding the internal socket.",
262                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
263                       				 errno);
264                       #else
265                       				 WSAGetLastError());
266                       #endif
267 r.kieninger      1.83 #endif
268 a.arora          1.73         throw Exception(parms);
269                           }
270                       
271                           // tell the kernel we are a server
272                           if((::listen(_tickle_server_socket,3)) < 0){
273                       	// handle error
274                       	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
275                       			 "Received error number $0 while listening to the internal socket.",
276                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
277                       				 errno);
278                       #else
279                       				 WSAGetLastError());
280                       #endif
281                       	throw Exception(parms);
282                           }
283 r.kieninger      1.83 
284 a.arora          1.73     // make sure we have the correct socket for our server
285                           int sock = ::getsockname(_tickle_server_socket,
286                       			     (struct sockaddr*)&_tickle_server_addr,
287 r.kieninger      1.83 			     &_addr_size);
288 a.arora          1.73     if(sock < 0){
289                       	// handle error
290                       	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
291                       			 "Received error number $0 while getting the internal socket name.",
292                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
293                       				 errno);
294                       #else
295                       				 WSAGetLastError());
296                       #endif
297                       	throw Exception(parms);
298                           }
299                       
300                           /* set up the tickle client/connector */
301 r.kieninger      1.83 
302 a.arora          1.73     // get a socket for our tickle client
303                           if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
304                       	// handle error
305                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
306                       			 "Received error number $0 while creating the internal client socket.",
307                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
308                       				 errno);
309                       #else
310                       				 WSAGetLastError());
311                       #endif
312                       	throw Exception(parms);
313                           }
314                       
315                           // setup the address of the client
316                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
317                       #ifdef PEGASUS_OS_ZOS
318                           _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
319                       #else
320                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
321                       #pragma convert(37)
322                       #endif
323 a.arora          1.73     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
324                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
325                       #pragma convert(0)
326                       #endif
327                       #endif
328                           _tickle_client_addr.sin_family = PF_INET;
329                           _tickle_client_addr.sin_port = 0;
330                       
331                           // bind socket to client side
332                           if((::bind(_tickle_client_socket,
333                       	       (struct sockaddr*)&_tickle_client_addr,
334                       	       sizeof(_tickle_client_addr))) < 0){
335                       	// handle error
336                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
337                       			 "Received error number $0 while binding the internal client socket.",
338                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
339                       				 errno);
340                       #else
341                       				 WSAGetLastError());
342                       #endif
343                       	throw Exception(parms);
344 a.arora          1.73     }
345                       
346                           // connect to server side
347                           if((::connect(_tickle_client_socket,
348                       		  (struct sockaddr*)&_tickle_server_addr,
349                       		  sizeof(_tickle_server_addr))) < 0){
350                       	// handle error
351                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
352                       			 "Received error number $0 while connecting the internal client socket.",
353                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
354                       				 errno);
355                       #else
356                       				 WSAGetLastError());
357                       #endif
358                       	throw Exception(parms);
359                           }
360                       
361                           /* set up the slave connection */
362                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
363 kumpf            1.86     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
364 r.kieninger      1.83     pegasus_sleep(1);
365 a.arora          1.73 
366                           // this call may fail, we will try a max of 20 times to establish this peer connection
367                           if((_tickle_peer_socket = ::accept(_tickle_server_socket,
368                       				       (struct sockaddr*)&_tickle_peer_addr,
369                       				       &peer_size)) < 0){
370                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
371                               // Only retry on non-windows platforms.
372                               if(_tickle_peer_socket == -1 && errno == EAGAIN)
373                               {
374 r.kieninger      1.83           int retries = 0;
375 a.arora          1.73           do
376                                 {
377                                   pegasus_sleep(1);
378                                   _tickle_peer_socket = ::accept(_tickle_server_socket,
379                       					   (struct sockaddr*)&_tickle_peer_addr,
380                       					   &peer_size);
381                                   retries++;
382                                 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
383                               }
384                       #endif
385                           }
386                           if(_tickle_peer_socket == -1){
387                       	// handle error
388                       	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
389                       			 "Received error number $0 while accepting the internal socket connection.",
390                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
391                       				 errno);
392                       #else
393                       				 WSAGetLastError());
394                       #endif
395                       	throw Exception(parms);
396 a.arora          1.73     }
397                           // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
398                           // checks entries with IDLE state for events
399                           _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
400                           entry._status = _MonitorEntry::IDLE;
401                           _entries.append(entry);
402                       }
403                       
404                       void Monitor::tickle(void)
405                       {
406 sushma.fernandes 1.78     static char _buffer[] =
407 a.arora          1.73     {
408                             '0','0'
409                           };
410 r.kieninger      1.83 
411 sushma.fernandes 1.78     AutoMutex autoMutex(_tickle_mutex);
412 r.kieninger      1.83     Socket::disableBlocking(_tickle_client_socket);
413 sushma.fernandes 1.78     Socket::write(_tickle_client_socket,&_buffer, 2);
414 r.kieninger      1.83     Socket::enableBlocking(_tickle_client_socket);
415 sushma.fernandes 1.78 }
416                       
417                       void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
418                       {
419                           // Set the state to requested state
420                           _entries[index]._status = status;
421 a.arora          1.73 }
422                       
423 mike             1.2  Boolean Monitor::run(Uint32 milliseconds)
424                       {
425 mday             1.18 
426 mday             1.25     Boolean handled_events = false;
427 a.arora          1.73     int i = 0;
428 r.kieninger      1.83 
429 kumpf            1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
430 a.arora          1.73 
431 mday             1.25     fd_set fdread;
432                           FD_ZERO(&fdread);
433 a.arora          1.73 
434 mday             1.37     _entry_mut.lock(pegasus_thread_self());
435 r.kieninger      1.83 
436                           // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
437                           if (_stopConnections == 1)
438 kumpf            1.48     {
439                               for ( int indx = 0; indx < (int)_entries.size(); indx++)
440                               {
441                                   if (_entries[indx]._type == Monitor::ACCEPTOR)
442                                   {
443                                       if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
444                                       {
445                                          if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
446                                               _entries[indx]._status.value() == _MonitorEntry::DYING )
447                                          {
448                                              // remove the entry
449                       		       _entries[indx]._status = _MonitorEntry::EMPTY;
450                                          }
451                                          else
452                                          {
453                                              // set status to DYING
454 kumpf            1.52                       _entries[indx]._status = _MonitorEntry::DYING;
455 kumpf            1.48                    }
456                                      }
457                                  }
458                               }
459                               _stopConnections = 0;
460 a.arora          1.73 	_stopConnectionsSem.signal();
461 kumpf            1.48     }
462 kumpf            1.51 
463 kumpf            1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
464                           {
465 brian.campbell   1.80 			 const _MonitorEntry &entry = _entries[indx];
466                              if ((entry._status.value() == _MonitorEntry::DYING) &&
467                       					 (entry._type == Monitor::CONNECTION))
468 kumpf            1.68        {
469 brian.campbell   1.80           MessageQueue *q = MessageQueue::lookup(entry.queueId);
470 kumpf            1.68           PEGASUS_ASSERT(q != 0);
471 brian.campbell   1.80           HTTPConnection &h = *static_cast<HTTPConnection *>(q);
472 r.kieninger      1.83 
473 brian.campbell   1.80 					if (h._connectionClosePending == false)
474                       						continue;
475                       
476                       					// NOTE: do not attempt to delete while there are pending responses
477 r.kieninger      1.83 					// coming thru. The last response to come thru after a
478 brian.campbell   1.80 					// _connectionClosePending will reset _responsePending to false
479                       					// and then cause the monitor to rerun this code and clean up.
480                       					// (see HTTPConnection.cpp)
481                       
482                       					if (h._responsePending == true)
483                       					{
484                       						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
485                       													"Ignoring connection delete request because "
486                       													"responses are still pending. "
487 r.kieninger      1.83 													"connection=0x%p, socket=%d\n",
488 brian.campbell   1.81 													(void *)&h, h.getSocket());
489 brian.campbell   1.80 						continue;
490                       					}
491                       					h._connectionClosePending = false;
492                                 MessageQueue &o = h.get_owner();
493                                 Message* message= new CloseConnectionMessage(entry.socket);
494 kumpf            1.68           message->dest = o.getQueueId();
495                       
496 r.kieninger      1.83           // HTTPAcceptor is responsible for closing the connection.
497 kumpf            1.68           // The lock is released to allow HTTPAcceptor to call
498 r.kieninger      1.83           // unsolicitSocketMessages to free the entry.
499 kumpf            1.68           // Once HTTPAcceptor completes processing of the close
500                                 // connection, the lock is re-requested and processing of
501                                 // the for loop continues.  This is safe with the current
502                                 // implementation of the _entries object.  Note that the
503                                 // loop condition accesses the _entries.size() on each
504                                 // iteration, so that a change in size while the mutex is
505                                 // unlocked will not result in an ArrayIndexOutOfBounds
506                                 // exception.
507                       
508                                 _entry_mut.unlock();
509                                 o.enqueue(message);
510                                 _entry_mut.lock(pegasus_thread_self());
511                              }
512                           }
513                       
514 kumpf            1.51     Uint32 _idleEntries = 0;
515 r.kieninger      1.83 
516 a.arora          1.73     /*
517                       	We will keep track of the maximum socket number and pass this value
518 r.kieninger      1.83 	to the kernel as a parameter to SELECT.  This loop seems like a good
519 a.arora          1.73 	place to calculate the max file descriptor (maximum socket number)
520                       	because we have to traverse the entire array.
521 r.kieninger      1.83     */
522 a.arora          1.73     int maxSocketCurrentPass = 0;
523 mday             1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
524 mike             1.2      {
525 a.arora          1.73        if(maxSocketCurrentPass < _entries[indx].socket)
526                       	  maxSocketCurrentPass = _entries[indx].socket;
527                       
528 mday             1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
529 mday             1.25        {
530 kumpf            1.51 	  _idleEntries++;
531 mday             1.25 	  FD_SET(_entries[indx].socket, &fdread);
532                              }
533 mday             1.13     }
534 s.hills          1.62 
535 a.arora          1.73     /*
536                       	Add 1 then assign maxSocket accordingly. We add 1 to account for
537                       	descriptors starting at 0.
538                           */
539                           maxSocketCurrentPass++;
540                       
541 r.kieninger      1.83     _entry_mut.unlock();
542 a.arora          1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
543 kumpf            1.51    _entry_mut.lock(pegasus_thread_self());
544 mday             1.25 
545 mike             1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
546 kumpf            1.50     if(events == SOCKET_ERROR)
547 mike             1.2  #else
548 kumpf            1.50     if(events == -1)
549 mike             1.2  #endif
550 mday             1.13     {
551 kumpf            1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
552                                 "Monitor::run - errorno = %d has occurred on select.", errno);
553                              // The EBADF error indicates that one or more or the file
554                              // descriptions was not valid. This could indicate that
555                              // the _entries structure has been corrupted or that
556                              // we have a synchronization error.
557                       
558                              PEGASUS_ASSERT(errno != EBADF);
559                           }
560                           else if (events)
561                           {
562 kumpf            1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
563 r.kieninger      1.83           "Monitor::run select event received events = %d, monitoring %d idle entries",
564 kumpf            1.51 	   events, _idleEntries);
565 mday             1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
566                              {
567 kumpf            1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
568                                 // owned by the Monitor).
569 r.kieninger      1.83 	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
570 kumpf            1.53 	     (FD_ISSET(_entries[indx].socket, &fdread)))
571 mday             1.25 	  {
572                       	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
573 kumpf            1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
574                                         "Monitor::run indx = %d, queueId =  %d, q = %p",
575                                         indx, _entries[indx].queueId, q);
576                                    PEGASUS_ASSERT(q !=0);
577 mday             1.37 
578 r.kieninger      1.83 	     try
579 mday             1.25 	     {
580 mday             1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
581                       		{
582 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
583                                            "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
584 mday             1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
585 sushma.fernandes 1.78 
586                                          // Do not update the entry just yet. The entry gets updated once
587 r.kieninger      1.83                    // the request has been read.
588 sushma.fernandes 1.78 		   //_entries[indx]._status = _MonitorEntry::BUSY;
589                       
590 kumpf            1.66                    // If allocate_and_awaken failure, retry on next iteration
591 a.arora          1.73 /* Removed for PEP 183.
592 kumpf            1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
593                                                  (void *)q, _dispatch))
594 kumpf            1.67                    {
595                                             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
596                                                 "Monitor::run: Insufficient resources to process request.");
597                                             _entries[indx]._status = _MonitorEntry::IDLE;
598                                             _entry_mut.unlock();
599                                             return true;
600                                          }
601 a.arora          1.73 */
602                       // Added for PEP 183
603                       		   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
604                         			 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
605                                                "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
606                                          dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
607                                          try
608                                          {
609                                              dst->run(1);
610                                          }
611                          		   catch (...)
612                          		   {
613                             			Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
614                                 		"Monitor::_dispatch: exception received");
615                          		   }
616                          		   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
617                                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
618                       
619 sushma.fernandes 1.78                    // It is possible the entry status may not be set to busy.
620 r.kieninger      1.83                    // The following will fail in that case.
621                          		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
622 a.arora          1.73 		   // Once the HTTPConnection thread has set the status value to either
623                       		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
624                       		   // to the Monitor.  It is no longer permissible to access the connection
625                       		   // or the entry in the _entries table.
626 sushma.fernandes 1.78 
627                                          // The following is not relevant as the worker thread or the
628                                          // reader thread will update the status of the entry.
629                       		   //if (dst->_connectionClosePending)
630 r.kieninger      1.83 		   //{
631 sushma.fernandes 1.78 		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
632                       		   //}
633                       		   //else
634                       		   //{
635                       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
636 r.kieninger      1.83 		   //}
637                       // end Added for PEP 183
638 a.arora          1.73 		}
639                       	        else if( _entries[indx]._type == Monitor::INTERNAL){
640 r.kieninger      1.83 			// set ourself to BUSY,
641                                               // read the data
642 a.arora          1.73                         // and set ourself back to IDLE
643 r.kieninger      1.83 
644 a.arora          1.73 		   	_entries[indx]._status == _MonitorEntry::BUSY;
645                       			static char buffer[2];
646                             			Socket::disableBlocking(_entries[indx].socket);
647                             			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
648                             			Socket::enableBlocking(_entries[indx].socket);
649                       			_entries[indx]._status == _MonitorEntry::IDLE;
650 mday             1.37 		}
651                       		else
652 mday             1.25 		{
653 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
654                                            "Non-connection entry, indx = %d, has been received.", indx);
655 mday             1.37 		   int events = 0;
656                       		   events |= SocketMessage::READ;
657                       		   Message *msg = new SocketMessage(_entries[indx].socket, events);
658                       		   _entries[indx]._status = _MonitorEntry::BUSY;
659                       		   _entry_mut.unlock();
660 mday             1.27 
661 mday             1.37 		   q->enqueue(msg);
662                       		   _entries[indx]._status = _MonitorEntry::IDLE;
663 mday             1.25 		   return true;
664                       		}
665                       	     }
666 mday             1.37 	     catch(...)
667 mday             1.25 	     {
668                       	     }
669                       	     handled_events = true;
670                       	  }
671                              }
672 mday             1.24     }
673 mday             1.37     _entry_mut.unlock();
674 mday             1.13     return(handled_events);
675 mike             1.2  }
676                       
677 chuck            1.74 void Monitor::stopListeningForConnections(Boolean wait)
678 kumpf            1.48 {
679                           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
680 r.kieninger      1.83     // set boolean then tickle the server to recognize _stopConnections
681 kumpf            1.48     _stopConnections = 1;
682 a.arora          1.73     tickle();
683 kumpf            1.48 
684 chuck            1.74     if (wait)
685 a.arora          1.73     {
686 chuck            1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
687                             // caller of this function may unbind the ports while the monitor
688                             // is still accepting connections on them.
689                             try
690                       	{
691                       	  _stopConnectionsSem.time_wait(10000);
692                       	}
693                             catch (TimeOut &)
694                       	{
695                       	  // The monitor is probably busy processng a very long request, and is
696                       	  // not accepting connections.  Let the caller unbind the ports.
697                       	}
698 a.arora          1.73     }
699 r.kieninger      1.83 
700 kumpf            1.48     PEG_METHOD_EXIT();
701                       }
702 mday             1.25 
703 mday             1.37 
704 mday             1.25 int  Monitor::solicitSocketMessages(
705 r.kieninger      1.83     Sint32 socket,
706 mike             1.2      Uint32 events,
707 r.kieninger      1.83     Uint32 queueId,
708 mday             1.8      int type)
709 mike             1.2  {
710 r.kieninger      1.83    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
711 alagaraja        1.75    AutoMutex autoMut(_entry_mut);
712 a.arora          1.73    // Check to see if we need to dynamically grow the _entries array
713                          // We always want the _entries array to 2 bigger than the
714                          // current connections requested
715                          _solicitSocketCount++;  // bump the count
716                          int size = (int)_entries.size();
717                          if(_solicitSocketCount >= (size-1)){
718                               for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
719                                       _MonitorEntry entry(0, 0, 0);
720                                       _entries.append(entry);
721                               }
722                          }
723 kumpf            1.4  
724 a.arora          1.73    int index;
725                          for(index = 1; index < (int)_entries.size(); index++)
726 mday             1.25    {
727 a.arora          1.73       try
728 mday             1.37       {
729 a.arora          1.73          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
730                                {
731                                   _entries[index].socket = socket;
732                                   _entries[index].queueId  = queueId;
733                                   _entries[index]._type = type;
734                                   _entries[index]._status = _MonitorEntry::IDLE;
735 r.kieninger      1.83 
736 a.arora          1.73             return index;
737                                }
738 mday             1.37       }
739                             catch(...)
740 mday             1.25       {
741                             }
742                          }
743 a.arora          1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
744 mday             1.25    PEG_METHOD_EXIT();
745 kumpf            1.50    return -1;
746 a.arora          1.73 
747 mike             1.2  }
748                       
749 mday             1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
750 mike             1.2  {
751 kumpf            1.50 
752 mday             1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
753 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
754 a.arora          1.73 
755                           /*
756                               Start at index = 1 because _entries[0] is the tickle entry which never needs
757                               to be EMPTY;
758                           */
759                           int index;
760                           for(index = 1; index < _entries.size(); index++)
761 mike             1.2      {
762 mday             1.25        if(_entries[index].socket == socket)
763                              {
764 a.arora          1.73           _entries[index]._status = _MonitorEntry::EMPTY;
765                                 _entries[index].socket = -1;
766                                 _solicitSocketCount--;
767                                 break;
768 mday             1.25        }
769 mike             1.2      }
770 a.arora          1.73 
771                           /*
772                       	Dynamic Contraction:
773                       	To remove excess entries we will start from the end of the _entries array
774                       	and remove all entries with EMPTY status until we find the first NON EMPTY.
775                       	This prevents the positions, of the NON EMPTY entries, from being changed.
776 r.kieninger      1.83     */
777 a.arora          1.73     index = _entries.size() - 1;
778                           while(_entries[index]._status == _MonitorEntry::EMPTY){
779                       	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
780                                       _entries.remove(index);
781                       	index--;
782                           }
783                       
784 kumpf            1.4      PEG_METHOD_EXIT();
785 mike             1.2  }
786                       
787 a.arora          1.73 // Note: this is no longer called with PEP 183.
788 mday             1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
789                       {
790 mday             1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
791 kumpf            1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
792 kumpf            1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
793                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
794 kumpf            1.51    try
795                          {
796                             dst->run(1);
797                          }
798                          catch (...)
799                          {
800                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
801                                 "Monitor::_dispatch: exception received");
802                          }
803                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
804                                 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
805 r.kieninger      1.83 
806 kumpf            1.53    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
807 kumpf            1.68 
808                          // Once the HTTPConnection thread has set the status value to either
809                          // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
810                          // to the Monitor.  It is no longer permissible to access the connection
811                          // or the entry in the _entries table.
812 kumpf            1.50    if (dst->_connectionClosePending)
813                          {
814 kumpf            1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
815                          }
816                          else
817                          {
818                             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
819 kumpf            1.50    }
820 mday             1.8     return 0;
821 mday             1.40 }
822                       
823 mike             1.2  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2