(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.79 //%2004////////////////////////////////////////////////////////////////////////
  2 mike  1.2  //
  3 karl  1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 mike  1.2  //
 10            // Permission is hereby granted, free of charge, to any person obtaining a copy
 11 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
 12            // deal in the Software without restriction, including without limitation the
 13            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 14 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 15            // furnished to do so, subject to the following conditions:
 16 r.kieninger 1.83 //
 17 kumpf       1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 18 mike        1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 19                  // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 20 kumpf       1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 21                  // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 22                  // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 23 mike        1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 24                  // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 25                  //
 26                  //==============================================================================
 27                  //
 28                  // Author: Mike Brasher (mbrasher@bmc.com)
 29                  //
 30 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
 31 a.arora     1.71 //              Amit K Arora (Bug#1153) amita@in.ibm.com
 32 alagaraja   1.75 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
 33 sushma.fernandes 1.78 //              Sushma Fernandes (sushma@hp.com) for Bug#2057
 34 joyce.j          1.84 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
 35 kumpf            1.85 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
 36 mike             1.2  //
 37                       //%/////////////////////////////////////////////////////////////////////////////
 38                       
 39                       #include <Pegasus/Common/Config.h>
 40 mday             1.40 
 41 mike             1.2  #include <cstring>
 42                       #include "Monitor.h"
 43                       #include "MessageQueue.h"
 44                       #include "Socket.h"
 45 kumpf            1.4  #include <Pegasus/Common/Tracer.h>
 46 mday             1.7  #include <Pegasus/Common/HTTPConnection.h>
 47 kumpf            1.69 #include <Pegasus/Common/MessageQueueService.h>
 48 a.arora          1.73 #include <Pegasus/Common/Exception.h>
 49 mike             1.2  
 50                       #ifdef PEGASUS_OS_TYPE_WINDOWS
 51                       # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 52                       #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 53                       of <winsock.h>. It may have been indirectly included (e.g., by including \
 54 mday             1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
 55 mike             1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 56                       otherwise, less than 64 clients (the default) will be able to connect to the \
 57                       CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 58 mday             1.5  
 59 mike             1.2  # endif
 60                       # define FD_SETSIZE 1024
 61 mday             1.5  # include <windows.h>
 62 mike             1.2  #else
 63                       # include <sys/types.h>
 64                       # include <sys/socket.h>
 65                       # include <sys/time.h>
 66                       # include <netinet/in.h>
 67                       # include <netdb.h>
 68                       # include <arpa/inet.h>
 69                       #endif
 70                       
 71                       PEGASUS_USING_STD;
 72                       
 73                       PEGASUS_NAMESPACE_BEGIN
 74                       
 75 kumpf            1.86 // Define a platform-neutral socket length type
 76                       #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
 77                       typedef size_t PEGASUS_SOCKLEN_T;
 78                       #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
 79                       typedef socklen_t PEGASUS_SOCKLEN_T;
 80                       #else
 81                       typedef int PEGASUS_SOCKLEN_T;
 82                       #endif
 83 mday             1.18 
 84 mday             1.25 static AtomicInt _connections = 0;
 85                       
 86                       static struct timeval create_time = {0, 1};
 87 mday             1.38 static struct timeval destroy_time = {300, 0};
 88 mday             1.26 static struct timeval deadlock_time = {0, 0};
 89 mday             1.18 
 90 mike             1.2  ////////////////////////////////////////////////////////////////////////////////
 91                       //
 92                       // Monitor
 93                       //
 94                       ////////////////////////////////////////////////////////////////////////////////
 95                       
 96 kumpf            1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 97 mike             1.2  Monitor::Monitor()
 98 kumpf            1.87    : _stopConnections(0),
 99 a.arora          1.73      _stopConnectionsSem(0),
100 a.dunfey         1.76      _solicitSocketCount(0),
101 a.dunfey         1.77      _tickle_client_socket(-1),
102                            _tickle_server_socket(-1),
103                            _tickle_peer_socket(-1)
104 mike             1.2  {
105 kumpf            1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
106 mike             1.2      Socket::initializeInterface();
107 kumpf            1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
108 a.arora          1.73 
109                           // setup the tickler
110                           initializeTickler();
111                       
112 r.kieninger      1.83     // Start the count at 1 because initilizeTickler()
113 a.arora          1.73     // has added an entry in the first position of the
114                           // _entries array
115                           for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
116 mday             1.37     {
117                              _MonitorEntry entry(0, 0, 0);
118                              _entries.append(entry);
119                           }
120 mday             1.18 }
121 mday             1.20 
122 mike             1.2  Monitor::~Monitor()
123                       {
124 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
125 a.dunfey         1.76 
126                           try{
127 a.dunfey         1.77         if(_tickle_peer_socket >= 0)
128 a.dunfey         1.76         {
129                                   Socket::close(_tickle_peer_socket);
130                               }
131 a.dunfey         1.77         if(_tickle_client_socket >= 0)
132 a.dunfey         1.76         {
133                                   Socket::close(_tickle_client_socket);
134                               }
135 a.dunfey         1.77         if(_tickle_server_socket >= 0)
136 a.dunfey         1.76         {
137                                   Socket::close(_tickle_server_socket);
138                               }
139                           }
140                           catch(...)
141                           {
142                               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
143                                         "Failed to close tickle sockets");
144                           }
145                       
146 mike             1.2      Socket::uninitializeInterface();
147 kumpf            1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
148                                         "returning from monitor destructor");
149 mday             1.18 }
150                       
151 a.arora          1.73 void Monitor::initializeTickler(){
152 r.kieninger      1.83     /*
153                              NOTE: On any errors trying to
154                                    setup out tickle connection,
155 a.arora          1.73              throw an exception/end the server
156                           */
157                       
158                           /* setup the tickle server/listener */
159                       
160                           // get a socket for the server side
161                           if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
162                       	//handle error
163                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
164                       				 "Received error number $0 while creating the internal socket.",
165                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
166                       				 errno);
167                       #else
168                       				 WSAGetLastError());
169                       #endif
170                       	throw Exception(parms);
171                           }
172                       
173                           // initialize the address
174                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
175                       #ifdef PEGASUS_OS_ZOS
176 a.arora          1.73     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
177                       #else
178                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
179                       #pragma convert(37)
180                       #endif
181                           _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
182                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
183                       #pragma convert(0)
184                       #endif
185                       #endif
186                           _tickle_server_addr.sin_family = PF_INET;
187                           _tickle_server_addr.sin_port = 0;
188                       
189 kumpf            1.86     PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
190 a.arora          1.73 
191                           // bind server side to socket
192                           if((::bind(_tickle_server_socket,
193 kumpf            1.88                reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
194 a.arora          1.73 	       sizeof(_tickle_server_addr))) < 0){
195                       	// handle error
196 r.kieninger      1.83 #ifdef PEGASUS_OS_ZOS
197                           MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
198                       				 "Received error:$0 while binding the internal socket.",strerror(errno));
199                       #else
200 a.arora          1.73 	MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
201                       				 "Received error number $0 while binding the internal socket.",
202                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
203                       				 errno);
204                       #else
205                       				 WSAGetLastError());
206                       #endif
207 r.kieninger      1.83 #endif
208 a.arora          1.73         throw Exception(parms);
209                           }
210                       
211                           // tell the kernel we are a server
212                           if((::listen(_tickle_server_socket,3)) < 0){
213                       	// handle error
214                       	MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
215                       			 "Received error number $0 while listening to the internal socket.",
216                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
217                       				 errno);
218                       #else
219                       				 WSAGetLastError());
220                       #endif
221                       	throw Exception(parms);
222                           }
223 r.kieninger      1.83 
224 a.arora          1.73     // make sure we have the correct socket for our server
225                           int sock = ::getsockname(_tickle_server_socket,
226 kumpf            1.88                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
227                                          &_addr_size);
228 a.arora          1.73     if(sock < 0){
229                       	// handle error
230                       	MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
231                       			 "Received error number $0 while getting the internal socket name.",
232                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
233                       				 errno);
234                       #else
235                       				 WSAGetLastError());
236                       #endif
237                       	throw Exception(parms);
238                           }
239                       
240                           /* set up the tickle client/connector */
241 r.kieninger      1.83 
242 a.arora          1.73     // get a socket for our tickle client
243                           if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
244                       	// handle error
245                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
246                       			 "Received error number $0 while creating the internal client socket.",
247                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
248                       				 errno);
249                       #else
250                       				 WSAGetLastError());
251                       #endif
252                       	throw Exception(parms);
253                           }
254                       
255                           // setup the address of the client
256                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
257                       #ifdef PEGASUS_OS_ZOS
258                           _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
259                       #else
260                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
261                       #pragma convert(37)
262                       #endif
263 a.arora          1.73     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
264                       #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
265                       #pragma convert(0)
266                       #endif
267                       #endif
268                           _tickle_client_addr.sin_family = PF_INET;
269                           _tickle_client_addr.sin_port = 0;
270                       
271                           // bind socket to client side
272                           if((::bind(_tickle_client_socket,
273 kumpf            1.88                reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
274 a.arora          1.73 	       sizeof(_tickle_client_addr))) < 0){
275                       	// handle error
276                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
277                       			 "Received error number $0 while binding the internal client socket.",
278                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
279                       				 errno);
280                       #else
281                       				 WSAGetLastError());
282                       #endif
283                       	throw Exception(parms);
284                           }
285                       
286                           // connect to server side
287                           if((::connect(_tickle_client_socket,
288 kumpf            1.88                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
289 a.arora          1.73 		  sizeof(_tickle_server_addr))) < 0){
290                       	// handle error
291                       	MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
292                       			 "Received error number $0 while connecting the internal client socket.",
293                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
294                       				 errno);
295                       #else
296                       				 WSAGetLastError());
297                       #endif
298                       	throw Exception(parms);
299                           }
300                       
301                           /* set up the slave connection */
302                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
303 kumpf            1.86     PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
304 r.kieninger      1.83     pegasus_sleep(1);
305 a.arora          1.73 
306                           // this call may fail, we will try a max of 20 times to establish this peer connection
307                           if((_tickle_peer_socket = ::accept(_tickle_server_socket,
308 kumpf            1.88             reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
309                                   &peer_size)) < 0){
310 a.arora          1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
311                               // Only retry on non-windows platforms.
312                               if(_tickle_peer_socket == -1 && errno == EAGAIN)
313                               {
314 r.kieninger      1.83           int retries = 0;
315 a.arora          1.73           do
316                                 {
317                                   pegasus_sleep(1);
318                                   _tickle_peer_socket = ::accept(_tickle_server_socket,
319 kumpf            1.88                 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
320                                       &peer_size);
321 a.arora          1.73             retries++;
322                                 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
323                               }
324                       #endif
325                           }
326                           if(_tickle_peer_socket == -1){
327                       	// handle error
328                       	MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
329                       			 "Received error number $0 while accepting the internal socket connection.",
330                       #if !defined(PEGASUS_OS_TYPE_WINDOWS)
331                       				 errno);
332                       #else
333                       				 WSAGetLastError());
334                       #endif
335                       	throw Exception(parms);
336                           }
337                           // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
338                           // checks entries with IDLE state for events
339                           _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
340                           entry._status = _MonitorEntry::IDLE;
341                           _entries.append(entry);
342 a.arora          1.73 }
343                       
344                       void Monitor::tickle(void)
345                       {
346 sushma.fernandes 1.78     static char _buffer[] =
347 a.arora          1.73     {
348                             '0','0'
349                           };
350 r.kieninger      1.83 
351 sushma.fernandes 1.78     AutoMutex autoMutex(_tickle_mutex);
352 r.kieninger      1.83     Socket::disableBlocking(_tickle_client_socket);
353 sushma.fernandes 1.78     Socket::write(_tickle_client_socket,&_buffer, 2);
354 r.kieninger      1.83     Socket::enableBlocking(_tickle_client_socket);
355 sushma.fernandes 1.78 }
356                       
357                       void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
358                       {
359                           // Set the state to requested state
360                           _entries[index]._status = status;
361 a.arora          1.73 }
362                       
363 mike             1.2  Boolean Monitor::run(Uint32 milliseconds)
364                       {
365 mday             1.18 
366 mday             1.25     Boolean handled_events = false;
367 a.arora          1.73     int i = 0;
368 r.kieninger      1.83 
369 kumpf            1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
370 a.arora          1.73 
371 mday             1.25     fd_set fdread;
372                           FD_ZERO(&fdread);
373 a.arora          1.73 
374 mday             1.37     _entry_mut.lock(pegasus_thread_self());
375 r.kieninger      1.83 
376                           // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
377                           if (_stopConnections == 1)
378 kumpf            1.48     {
379                               for ( int indx = 0; indx < (int)_entries.size(); indx++)
380                               {
381                                   if (_entries[indx]._type == Monitor::ACCEPTOR)
382                                   {
383                                       if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
384                                       {
385                                          if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
386                                               _entries[indx]._status.value() == _MonitorEntry::DYING )
387                                          {
388                                              // remove the entry
389                       		       _entries[indx]._status = _MonitorEntry::EMPTY;
390                                          }
391                                          else
392                                          {
393                                              // set status to DYING
394 kumpf            1.52                       _entries[indx]._status = _MonitorEntry::DYING;
395 kumpf            1.48                    }
396                                      }
397                                  }
398                               }
399                               _stopConnections = 0;
400 a.arora          1.73 	_stopConnectionsSem.signal();
401 kumpf            1.48     }
402 kumpf            1.51 
403 kumpf            1.68     for( int indx = 0; indx < (int)_entries.size(); indx++)
404                           {
405 brian.campbell   1.80 			 const _MonitorEntry &entry = _entries[indx];
406                              if ((entry._status.value() == _MonitorEntry::DYING) &&
407                       					 (entry._type == Monitor::CONNECTION))
408 kumpf            1.68        {
409 brian.campbell   1.80           MessageQueue *q = MessageQueue::lookup(entry.queueId);
410 kumpf            1.68           PEGASUS_ASSERT(q != 0);
411 brian.campbell   1.80           HTTPConnection &h = *static_cast<HTTPConnection *>(q);
412 r.kieninger      1.83 
413 brian.campbell   1.80 					if (h._connectionClosePending == false)
414                       						continue;
415                       
416                       					// NOTE: do not attempt to delete while there are pending responses
417 r.kieninger      1.83 					// coming thru. The last response to come thru after a
418 brian.campbell   1.80 					// _connectionClosePending will reset _responsePending to false
419                       					// and then cause the monitor to rerun this code and clean up.
420                       					// (see HTTPConnection.cpp)
421                       
422                       					if (h._responsePending == true)
423                       					{
424                       						Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
425                       													"Ignoring connection delete request because "
426                       													"responses are still pending. "
427 r.kieninger      1.83 													"connection=0x%p, socket=%d\n",
428 brian.campbell   1.81 													(void *)&h, h.getSocket());
429 brian.campbell   1.80 						continue;
430                       					}
431                       					h._connectionClosePending = false;
432                                 MessageQueue &o = h.get_owner();
433                                 Message* message= new CloseConnectionMessage(entry.socket);
434 kumpf            1.68           message->dest = o.getQueueId();
435                       
436 r.kieninger      1.83           // HTTPAcceptor is responsible for closing the connection.
437 kumpf            1.68           // The lock is released to allow HTTPAcceptor to call
438 r.kieninger      1.83           // unsolicitSocketMessages to free the entry.
439 kumpf            1.68           // Once HTTPAcceptor completes processing of the close
440                                 // connection, the lock is re-requested and processing of
441                                 // the for loop continues.  This is safe with the current
442                                 // implementation of the _entries object.  Note that the
443                                 // loop condition accesses the _entries.size() on each
444                                 // iteration, so that a change in size while the mutex is
445                                 // unlocked will not result in an ArrayIndexOutOfBounds
446                                 // exception.
447                       
448                                 _entry_mut.unlock();
449                                 o.enqueue(message);
450                                 _entry_mut.lock(pegasus_thread_self());
451                              }
452                           }
453                       
454 kumpf            1.51     Uint32 _idleEntries = 0;
455 r.kieninger      1.83 
456 a.arora          1.73     /*
457                       	We will keep track of the maximum socket number and pass this value
458 r.kieninger      1.83 	to the kernel as a parameter to SELECT.  This loop seems like a good
459 a.arora          1.73 	place to calculate the max file descriptor (maximum socket number)
460                       	because we have to traverse the entire array.
461 r.kieninger      1.83     */
462 a.arora          1.73     int maxSocketCurrentPass = 0;
463 mday             1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
464 mike             1.2      {
465 a.arora          1.73        if(maxSocketCurrentPass < _entries[indx].socket)
466                       	  maxSocketCurrentPass = _entries[indx].socket;
467                       
468 mday             1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
469 mday             1.25        {
470 kumpf            1.51 	  _idleEntries++;
471 mday             1.25 	  FD_SET(_entries[indx].socket, &fdread);
472                              }
473 mday             1.13     }
474 s.hills          1.62 
475 a.arora          1.73     /*
476                       	Add 1 then assign maxSocket accordingly. We add 1 to account for
477                       	descriptors starting at 0.
478                           */
479                           maxSocketCurrentPass++;
480                       
481 r.kieninger      1.83     _entry_mut.unlock();
482 a.arora          1.73     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
483 kumpf            1.51    _entry_mut.lock(pegasus_thread_self());
484 mday             1.25 
485 mike             1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
486 kumpf            1.50     if(events == SOCKET_ERROR)
487 mike             1.2  #else
488 kumpf            1.50     if(events == -1)
489 mike             1.2  #endif
490 mday             1.13     {
491 kumpf            1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
492                                 "Monitor::run - errorno = %d has occurred on select.", errno);
493                              // The EBADF error indicates that one or more or the file
494                              // descriptions was not valid. This could indicate that
495                              // the _entries structure has been corrupted or that
496                              // we have a synchronization error.
497                       
498                              PEGASUS_ASSERT(errno != EBADF);
499                           }
500                           else if (events)
501                           {
502 kumpf            1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
503 r.kieninger      1.83           "Monitor::run select event received events = %d, monitoring %d idle entries",
504 kumpf            1.51 	   events, _idleEntries);
505 mday             1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
506                              {
507 kumpf            1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
508                                 // owned by the Monitor).
509 r.kieninger      1.83 	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
510 kumpf            1.53 	     (FD_ISSET(_entries[indx].socket, &fdread)))
511 mday             1.25 	  {
512                       	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
513 kumpf            1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
514                                         "Monitor::run indx = %d, queueId =  %d, q = %p",
515                                         indx, _entries[indx].queueId, q);
516                                    PEGASUS_ASSERT(q !=0);
517 mday             1.37 
518 r.kieninger      1.83 	     try
519 mday             1.25 	     {
520 mday             1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
521                       		{
522 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
523                                            "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
524 mday             1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
525 sushma.fernandes 1.78 
526                                          // Do not update the entry just yet. The entry gets updated once
527 r.kieninger      1.83                    // the request has been read.
528 sushma.fernandes 1.78 		   //_entries[indx]._status = _MonitorEntry::BUSY;
529                       
530 kumpf            1.66                    // If allocate_and_awaken failure, retry on next iteration
531 a.arora          1.73 /* Removed for PEP 183.
532 kumpf            1.69                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
533                                                  (void *)q, _dispatch))
534 kumpf            1.67                    {
535                                             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
536                                                 "Monitor::run: Insufficient resources to process request.");
537                                             _entries[indx]._status = _MonitorEntry::IDLE;
538                                             _entry_mut.unlock();
539                                             return true;
540                                          }
541 a.arora          1.73 */
542                       // Added for PEP 183
543                       		   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
544                         			 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
545                                                "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
546                                          dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
547                                          try
548                                          {
549                                              dst->run(1);
550                                          }
551                          		   catch (...)
552                          		   {
553                             			Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554                                 		"Monitor::_dispatch: exception received");
555                          		   }
556                          		   Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
557                                          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
558                       
559 sushma.fernandes 1.78                    // It is possible the entry status may not be set to busy.
560 r.kieninger      1.83                    // The following will fail in that case.
561                          		   // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
562 a.arora          1.73 		   // Once the HTTPConnection thread has set the status value to either
563                       		   // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
564                       		   // to the Monitor.  It is no longer permissible to access the connection
565                       		   // or the entry in the _entries table.
566 sushma.fernandes 1.78 
567                                          // The following is not relevant as the worker thread or the
568                                          // reader thread will update the status of the entry.
569                       		   //if (dst->_connectionClosePending)
570 r.kieninger      1.83 		   //{
571 sushma.fernandes 1.78 		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
572                       		   //}
573                       		   //else
574                       		   //{
575                       		   //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
576 r.kieninger      1.83 		   //}
577                       // end Added for PEP 183
578 a.arora          1.73 		}
579                       	        else if( _entries[indx]._type == Monitor::INTERNAL){
580 r.kieninger      1.83 			// set ourself to BUSY,
581                                               // read the data
582 a.arora          1.73                         // and set ourself back to IDLE
583 r.kieninger      1.83 
584 a.arora          1.73 		   	_entries[indx]._status == _MonitorEntry::BUSY;
585                       			static char buffer[2];
586                             			Socket::disableBlocking(_entries[indx].socket);
587                             			Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
588                             			Socket::enableBlocking(_entries[indx].socket);
589                       			_entries[indx]._status == _MonitorEntry::IDLE;
590 mday             1.37 		}
591                       		else
592 mday             1.25 		{
593 kumpf            1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
594                                            "Non-connection entry, indx = %d, has been received.", indx);
595 mday             1.37 		   int events = 0;
596                       		   events |= SocketMessage::READ;
597                       		   Message *msg = new SocketMessage(_entries[indx].socket, events);
598                       		   _entries[indx]._status = _MonitorEntry::BUSY;
599                       		   _entry_mut.unlock();
600 mday             1.27 
601 mday             1.37 		   q->enqueue(msg);
602                       		   _entries[indx]._status = _MonitorEntry::IDLE;
603 mday             1.25 		   return true;
604                       		}
605                       	     }
606 mday             1.37 	     catch(...)
607 mday             1.25 	     {
608                       	     }
609                       	     handled_events = true;
610                       	  }
611                              }
612 mday             1.24     }
613 mday             1.37     _entry_mut.unlock();
614 mday             1.13     return(handled_events);
615 mike             1.2  }
616                       
617 chuck            1.74 void Monitor::stopListeningForConnections(Boolean wait)
618 kumpf            1.48 {
619                           PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
620 r.kieninger      1.83     // set boolean then tickle the server to recognize _stopConnections
621 kumpf            1.48     _stopConnections = 1;
622 a.arora          1.73     tickle();
623 kumpf            1.48 
624 chuck            1.74     if (wait)
625 a.arora          1.73     {
626 chuck            1.74       // Wait for the monitor to notice _stopConnections.  Otherwise the
627                             // caller of this function may unbind the ports while the monitor
628                             // is still accepting connections on them.
629                             try
630                       	{
631                       	  _stopConnectionsSem.time_wait(10000);
632                       	}
633                             catch (TimeOut &)
634                       	{
635                       	  // The monitor is probably busy processng a very long request, and is
636                       	  // not accepting connections.  Let the caller unbind the ports.
637                       	}
638 a.arora          1.73     }
639 r.kieninger      1.83 
640 kumpf            1.48     PEG_METHOD_EXIT();
641                       }
642 mday             1.25 
643 mday             1.37 
644 mday             1.25 int  Monitor::solicitSocketMessages(
645 r.kieninger      1.83     Sint32 socket,
646 mike             1.2      Uint32 events,
647 r.kieninger      1.83     Uint32 queueId,
648 mday             1.8      int type)
649 mike             1.2  {
650 r.kieninger      1.83    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
651 alagaraja        1.75    AutoMutex autoMut(_entry_mut);
652 a.arora          1.73    // Check to see if we need to dynamically grow the _entries array
653                          // We always want the _entries array to 2 bigger than the
654                          // current connections requested
655                          _solicitSocketCount++;  // bump the count
656                          int size = (int)_entries.size();
657                          if(_solicitSocketCount >= (size-1)){
658                               for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
659                                       _MonitorEntry entry(0, 0, 0);
660                                       _entries.append(entry);
661                               }
662                          }
663 kumpf            1.4  
664 a.arora          1.73    int index;
665                          for(index = 1; index < (int)_entries.size(); index++)
666 mday             1.25    {
667 a.arora          1.73       try
668 mday             1.37       {
669 a.arora          1.73          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
670                                {
671                                   _entries[index].socket = socket;
672                                   _entries[index].queueId  = queueId;
673                                   _entries[index]._type = type;
674                                   _entries[index]._status = _MonitorEntry::IDLE;
675 r.kieninger      1.83 
676 a.arora          1.73             return index;
677                                }
678 mday             1.37       }
679                             catch(...)
680 mday             1.25       {
681                             }
682                          }
683 a.arora          1.73    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful
684 mday             1.25    PEG_METHOD_EXIT();
685 kumpf            1.50    return -1;
686 a.arora          1.73 
687 mike             1.2  }
688                       
689 mday             1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
690 mike             1.2  {
691 kumpf            1.50 
692 mday             1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
693 alagaraja        1.75     AutoMutex autoMut(_entry_mut);
694 a.arora          1.73 
695                           /*
696                               Start at index = 1 because _entries[0] is the tickle entry which never needs
697                               to be EMPTY;
698                           */
699                           int index;
700                           for(index = 1; index < _entries.size(); index++)
701 mike             1.2      {
702 mday             1.25        if(_entries[index].socket == socket)
703                              {
704 a.arora          1.73           _entries[index]._status = _MonitorEntry::EMPTY;
705                                 _entries[index].socket = -1;
706                                 _solicitSocketCount--;
707                                 break;
708 mday             1.25        }
709 mike             1.2      }
710 a.arora          1.73 
711                           /*
712                       	Dynamic Contraction:
713                       	To remove excess entries we will start from the end of the _entries array
714                       	and remove all entries with EMPTY status until we find the first NON EMPTY.
715                       	This prevents the positions, of the NON EMPTY entries, from being changed.
716 r.kieninger      1.83     */
717 a.arora          1.73     index = _entries.size() - 1;
718                           while(_entries[index]._status == _MonitorEntry::EMPTY){
719                       	if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
720                                       _entries.remove(index);
721                       	index--;
722                           }
723                       
724 kumpf            1.4      PEG_METHOD_EXIT();
725 mike             1.2  }
726                       
727 a.arora          1.73 // Note: this is no longer called with PEP 183.
728 mday             1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
729                       {
730 mday             1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
731 kumpf            1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
732 kumpf            1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
733                               dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
734 kumpf            1.51    try
735                          {
736                             dst->run(1);
737                          }
738                          catch (...)
739                          {
740                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
741                                 "Monitor::_dispatch: exception received");
742                          }
743                          Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
744                                 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
745 r.kieninger      1.83 
746 kumpf            1.53    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
747 kumpf            1.68 
748                          // Once the HTTPConnection thread has set the status value to either
749                          // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
750                          // to the Monitor.  It is no longer permissible to access the connection
751                          // or the entry in the _entries table.
752 kumpf            1.50    if (dst->_connectionClosePending)
753                          {
754 kumpf            1.68       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
755                          }
756                          else
757                          {
758                             dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
759 kumpf            1.50    }
760 mday             1.8     return 0;
761 mday             1.40 }
762                       
763 mike             1.2  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2