(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             //%/////////////////////////////////////////////////////////////////////////////
 33             
 34 mike  1.107 #include "Network.h"
 35 mike  1.2   #include <Pegasus/Common/Config.h>
 36             #include <cstring>
 37             #include "Monitor.h"
 38             #include "MessageQueue.h"
 39             #include "Socket.h"
 40 kumpf 1.4   #include <Pegasus/Common/Tracer.h>
 41 mday  1.7   #include <Pegasus/Common/HTTPConnection.h>
 42 kumpf 1.69  #include <Pegasus/Common/MessageQueueService.h>
 43 a.arora 1.73  #include <Pegasus/Common/Exception.h>
 44 mike    1.100 #include "ArrayIterator.h"
 45 mike    1.113 #include <errno.h>
 46 mike    1.2   
 47               PEGASUS_USING_STD;
 48               
 49               PEGASUS_NAMESPACE_BEGIN
 50               
 51 mike    1.96  static AtomicInt _connections(0);
 52 mday    1.25  
 53 mike    1.2   ////////////////////////////////////////////////////////////////////////////////
 54               //
 55               // Monitor
 56               //
 57               ////////////////////////////////////////////////////////////////////////////////
 58               
 59 kumpf   1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 60 mike    1.2   Monitor::Monitor()
 61 kumpf   1.87     : _stopConnections(0),
 62 a.arora 1.73       _stopConnectionsSem(0),
 63 a.dunfey 1.76       _solicitSocketCount(0),
 64 a.dunfey 1.77       _tickle_client_socket(-1),
 65                     _tickle_server_socket(-1),
 66                     _tickle_peer_socket(-1)
 67 mike     1.2   {
 68 kumpf    1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 69 mike     1.2       Socket::initializeInterface();
 70 kumpf    1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 71 a.arora  1.73  
 72                    // setup the tickler
 73                    initializeTickler();
 74                
 75 r.kieninger 1.83      // Start the count at 1 because initilizeTickler()
 76 a.arora     1.73      // has added an entry in the first position of the
 77                       // _entries array
 78 kumpf       1.116     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
 79 mday        1.37      {
 80                          _MonitorEntry entry(0, 0, 0);
 81                          _entries.append(entry);
 82                       }
 83 mday        1.18  }
 84 mday        1.20  
 85 mike        1.2   Monitor::~Monitor()
 86                   {
 87 thilo.boehm 1.115     uninitializeTickler();
 88                       Socket::uninitializeInterface();
 89 marek       1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
 90 thilo.boehm 1.115                   "returning from monitor destructor");
 91                   }
 92 kumpf       1.116 void Monitor::uninitializeTickler()
 93                   {
 94 marek       1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 95 a.dunfey    1.76  
 96 kumpf       1.116     try
 97                       {
 98                           if (_tickle_peer_socket >= 0)
 99 a.dunfey    1.76          {
100                               Socket::close(_tickle_peer_socket);
101                           }
102 kumpf       1.116         if (_tickle_client_socket >= 0)
103 a.dunfey    1.76          {
104                               Socket::close(_tickle_client_socket);
105                           }
106 kumpf       1.116         if (_tickle_server_socket >= 0)
107 a.dunfey    1.76          {
108                               Socket::close(_tickle_server_socket);
109                           }
110                       }
111 kumpf       1.116     catch (...)
112 a.dunfey    1.76      {
113 marek       1.118         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
114 a.dunfey    1.76                    "Failed to close tickle sockets");
115                       }
116                   
117 mday        1.18  }
118                   
119 kumpf       1.116 void Monitor::initializeTickler()
120                   {
121 r.kieninger 1.83      /*
122                          NOTE: On any errors trying to
123                                setup out tickle connection,
124 a.arora     1.73               throw an exception/end the server
125                       */
126                   
127                       /* setup the tickle server/listener */
128 kumpf       1.116     // try until the tcpip is restarted
129 thilo.boehm 1.115     do
130                       {
131                           // get a socket for the server side
132 kumpf       1.116         if ((_tickle_server_socket =
133                                    Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
134                                PEGASUS_INVALID_SOCKET)
135                           {
136                               MessageLoaderParms parms(
137                                   "Common.Monitor.TICKLE_CREATE",
138                                   "Received error number $0 while creating the internal socket.",
139                                   getSocketError());
140 thilo.boehm 1.115             throw Exception(parms);
141                           }
142 a.arora     1.73  
143 thilo.boehm 1.115         // initialize the address
144                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
145 a.arora     1.73  #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
146                   #pragma convert(37)
147                   #endif
148 thilo.boehm 1.115         _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
149 a.arora     1.73  #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
150                   #pragma convert(0)
151                   #endif
152 thilo.boehm 1.115         _tickle_server_addr.sin_family = PF_INET;
153                           _tickle_server_addr.sin_port = 0;
154 a.arora     1.73  
155 thilo.boehm 1.115         SocketLength _addr_size = sizeof(_tickle_server_addr);
156 a.arora     1.73  
157 thilo.boehm 1.115         // bind server side to socket
158 kumpf       1.116         if ((::bind(_tickle_server_socket,
159                                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
160                                    sizeof(_tickle_server_addr))) < 0)
161 thilo.boehm 1.115         {
162 r.kieninger 1.83  #ifdef PEGASUS_OS_ZOS
163 kumpf       1.116             MessageLoaderParms parms(
164                                   "Common.Monitor.TICKLE_BIND_LONG",
165                                   "Received error:$0 while binding the internal socket.",
166                                   strerror(errno));
167 r.kieninger 1.83  #else
168 kumpf       1.116             MessageLoaderParms parms(
169                                   "Common.Monitor.TICKLE_BIND",
170                                   "Received error number $0 while binding the internal socket.",
171                                   getSocketError());
172 r.kieninger 1.83  #endif
173 thilo.boehm 1.115             throw Exception(parms);
174                           }
175                   
176                           // tell the kernel we are a server
177 kumpf       1.116         if ((::listen(_tickle_server_socket, 3)) < 0)
178 thilo.boehm 1.115         {
179 kumpf       1.116             MessageLoaderParms parms(
180                                   "Common.Monitor.TICKLE_LISTEN",
181                                   "Received error number $0 while listening to the internal "
182                                       "socket.",
183                                   getSocketError());
184 thilo.boehm 1.115             throw Exception(parms);
185                           }
186                   
187                           // make sure we have the correct socket for our server
188 kumpf       1.116         int sock = ::getsockname(
189                               _tickle_server_socket,
190                               reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
191                               &_addr_size);
192                           if (sock < 0)
193                           {
194                               MessageLoaderParms parms(
195                                   "Common.Monitor.TICKLE_SOCKNAME",
196                                   "Received error number $0 while getting the internal socket "
197                                       "name.",
198                                   getSocketError());
199 thilo.boehm 1.115             throw Exception(parms);
200                           }
201 a.arora     1.73  
202 thilo.boehm 1.115         /* set up the tickle client/connector */
203 r.kieninger 1.83  
204 thilo.boehm 1.115         // get a socket for our tickle client
205 kumpf       1.116         if ((_tickle_client_socket =
206                                    Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
207                               PEGASUS_INVALID_SOCKET)
208                           {
209                               MessageLoaderParms parms(
210                                   "Common.Monitor.TICKLE_CLIENT_CREATE",
211                                   "Received error number $0 while creating the internal client "
212                                       "socket.",
213                                   getSocketError());
214 thilo.boehm 1.115             throw Exception(parms);
215                           }
216 a.arora     1.73  
217 thilo.boehm 1.115         // setup the address of the client
218                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
219 a.arora     1.73  #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
220 kumpf       1.116 # pragma convert(37)
221 a.arora     1.73  #endif
222 thilo.boehm 1.115         _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
223 a.arora     1.73  #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
224 kumpf       1.116 # pragma convert(0)
225 a.arora     1.73  #endif
226 thilo.boehm 1.115         _tickle_client_addr.sin_family = PF_INET;
227                           _tickle_client_addr.sin_port = 0;
228 a.arora     1.73  
229 thilo.boehm 1.115         // bind socket to client side
230 kumpf       1.116         if ((::bind(_tickle_client_socket,
231                                    reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
232                                    sizeof(_tickle_client_addr))) < 0)
233                           {
234                               MessageLoaderParms parms(
235                                   "Common.Monitor.TICKLE_CLIENT_BIND",
236                                   "Received error number $0 while binding the internal client "
237                                       "socket.",
238                                   getSocketError());
239 thilo.boehm 1.115             throw Exception(parms);
240                           }
241                   
242                           // connect to server side
243 kumpf       1.116         if ((::connect(_tickle_client_socket,
244                                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
245                                    sizeof(_tickle_server_addr))) < 0)
246                           {
247                               MessageLoaderParms parms(
248                                   "Common.Monitor.TICKLE_CLIENT_CONNECT",
249                                   "Received error number $0 while connecting the internal "
250                                       "client socket.",
251                                   getSocketError());
252 thilo.boehm 1.115             throw Exception(parms);
253                           }
254                   
255                           /* set up the slave connection */
256                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
257                           SocketLength peer_size = sizeof(_tickle_peer_addr);
258                           Threads::sleep(1);
259                   
260 kumpf       1.116         // this call may fail, we will try a max of 20 times to establish
261                           // this peer connection
262                           if ((_tickle_peer_socket = ::accept(_tickle_server_socket,
263                                    reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
264                                    &peer_size)) < 0)
265 thilo.boehm 1.115         {
266 kumpf       1.116             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
267                                   getSocketError() == PEGASUS_NETWORK_TRYAGAIN)
268 thilo.boehm 1.115             {
269                                   int retries = 0;
270                                   do
271                                   {
272                                       Threads::sleep(1);
273 kumpf       1.116                     _tickle_peer_socket = ::accept(
274                                           _tickle_server_socket,
275                                           reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
276                                           &peer_size);
277 thilo.boehm 1.115                     retries++;
278 kumpf       1.116                 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
279                                            getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&
280                                            retries < 20);
281 thilo.boehm 1.115             }
282                               // TCP/IP is down, destroy sockets and retry again.
283 kumpf       1.116             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
284                                   getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
285 thilo.boehm 1.115             {
286                                   // destroy everything
287                                   uninitializeTickler();
288                                   // retry again.
289                                   continue;
290                               }
291                           }
292 kumpf       1.116         if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)
293 thilo.boehm 1.115         {
294 kumpf       1.116             MessageLoaderParms parms(
295                                   "Common.Monitor.TICKLE_ACCEPT",
296                                   "Received error number $0 while accepting the internal "
297                                       "socket connection.",
298                                   getSocketError());
299 thilo.boehm 1.115             throw Exception(parms);
300 kumpf       1.116         }
301                           else
302 thilo.boehm 1.115         {
303                               // socket is ok
304                               break;
305 a.arora     1.73          }
306 kumpf       1.116     } while (1); // try until TCP/IP is restarted
307 marek       1.111 
308                       Socket::disableBlocking(_tickle_peer_socket);
309                       Socket::disableBlocking(_tickle_client_socket);
310                   
311 kumpf       1.116     // add the tickler to the list of entries to be monitored and set to
312                       // IDLE because Monitor only
313 a.arora     1.73      // checks entries with IDLE state for events
314                       _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
315                       entry._status = _MonitorEntry::IDLE;
316 thilo.boehm 1.115 
317                       // is the tickler initalized as first socket on startup ?
318                       if (_entries.size()==0)
319                       {
320                          // if yes, append a new entry
321                          _entries.append(entry);
322                       }
323                       else
324                       {
325                          // if not, overwrite the tickler entry with new socket
326                          _entries[0]=entry;
327                       }
328 a.arora     1.73  }
329                   
330 kumpf       1.116 void Monitor::tickle()
331 a.arora     1.73  {
332 sushma.fernandes 1.78      static char _buffer[] =
333 a.arora          1.73      {
334                              '0','0'
335                            };
336 r.kieninger      1.83  
337 sushma.fernandes 1.78      AutoMutex autoMutex(_tickle_mutex);
338                            Socket::write(_tickle_client_socket,&_buffer, 2);
339                        }
340                        
341                        void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
342                        {
343                            // Set the state to requested state
344                            _entries[index]._status = status;
345 a.arora          1.73  }
346                        
347 kumpf            1.114 void Monitor::run(Uint32 milliseconds)
348 mike             1.2   {
349 mday             1.18  
350 r.kieninger      1.83  
351 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
352 a.arora          1.73  
353 mday             1.25      fd_set fdread;
354                            FD_ZERO(&fdread);
355 a.arora          1.73  
356 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
357 r.kieninger      1.83  
358 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
359                        
360 kumpf            1.116     // Check the stopConnections flag.  If set, clear the Acceptor monitor
361                            // entries
362 mike             1.96      if (_stopConnections.get() == 1)
363 kumpf            1.48      {
364 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
365 kumpf            1.48          {
366 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
367 kumpf            1.48              {
368 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
369 kumpf            1.48                  {
370 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
371                                                entries[indx]._status.get() == _MonitorEntry::DYING )
372 kumpf            1.48                     {
373                                               // remove the entry
374 kumpf            1.116                        entries[indx]._status = _MonitorEntry::EMPTY;
375 kumpf            1.48                     }
376                                           else
377                                           {
378                                               // set status to DYING
379 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
380 kumpf            1.48                     }
381                                       }
382                                   }
383                                }
384                                _stopConnections = 0;
385 kumpf            1.116         _stopConnectionsSem.signal();
386 kumpf            1.48      }
387 kumpf            1.51  
388 kumpf            1.116     for (int indx = 0; indx < (int)entries.size(); indx++)
389 kumpf            1.68      {
390 kumpf            1.116         const _MonitorEntry &entry = entries[indx];
391                                if ((entry._status.get() == _MonitorEntry::DYING) &&
392                                    (entry._type == Monitor::CONNECTION))
393                                {
394                                    MessageQueue *q = MessageQueue::lookup(entry.queueId);
395                                    PEGASUS_ASSERT(q != 0);
396                                    HTTPConnection &h = *static_cast<HTTPConnection *>(q);
397                        
398                                    if (h._connectionClosePending == false)
399                                        continue;
400                        
401                                    // NOTE: do not attempt to delete while there are pending responses
402                                    // coming thru. The last response to come thru after a
403                                    // _connectionClosePending will reset _responsePending to false
404                                    // and then cause the monitor to rerun this code and clean up.
405                                    // (see HTTPConnection.cpp)
406                        
407                                    if (h._responsePending == true)
408                                    {
409 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
410 kumpf            1.116                     "Monitor::run - Ignoring connection delete request "
411                                                "because responses are still pending. "
412                                                "connection=0x%p, socket=%d\n",
413 marek            1.118                     (void *)&h, h.getSocket()));
414 kumpf            1.116                 continue;
415                                    }
416                                    h._connectionClosePending = false;
417                                    MessageQueue &o = h.get_owner();
418                                    Message* message= new CloseConnectionMessage(entry.socket);
419                                    message->dest = o.getQueueId();
420                        
421                                    // HTTPAcceptor is responsible for closing the connection.
422                                    // The lock is released to allow HTTPAcceptor to call
423                                    // unsolicitSocketMessages to free the entry.
424                                    // Once HTTPAcceptor completes processing of the close
425                                    // connection, the lock is re-requested and processing of
426                                    // the for loop continues.  This is safe with the current
427                                    // implementation of the entries object.  Note that the
428                                    // loop condition accesses the entries.size() on each
429                                    // iteration, so that a change in size while the mutex is
430                                    // unlocked will not result in an ArrayIndexOutOfBounds
431                                    // exception.
432                        
433                                    _entry_mut.unlock();
434                                    o.enqueue(message);
435 kumpf            1.116             _entry_mut.lock();
436                        
437                                    // After enqueue a message and the autoEntryMutex has been
438                                    // released and locked again, the array of _entries can be
439                                    // changed. The ArrayIterator has be reset with the original
440                                    // _entries.
441                                    entries.reset(_entries);
442                                }
443 kumpf            1.68      }
444                        
445 kumpf            1.51      Uint32 _idleEntries = 0;
446 r.kieninger      1.83  
447 a.arora          1.73      /*
448 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
449                                to the kernel as a parameter to SELECT.  This loop seems like a good
450                                place to calculate the max file descriptor (maximum socket number)
451                                because we have to traverse the entire array.
452 r.kieninger      1.83      */
453 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
454 kumpf            1.116     for (int indx = 0; indx < (int)entries.size(); indx++)
455 mike             1.2       {
456 kumpf            1.116        if (maxSocketCurrentPass < entries[indx].socket)
457                                   maxSocketCurrentPass = entries[indx].socket;
458 a.arora          1.73  
459 kumpf            1.116        if (entries[indx]._status.get() == _MonitorEntry::IDLE)
460 mday             1.25         {
461 david.dillard    1.95             _idleEntries++;
462 mike             1.100            FD_SET(entries[indx].socket, &fdread);
463 mday             1.25         }
464 mday             1.13      }
465 s.hills          1.62  
466 a.arora          1.73      /*
467 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
468                                descriptors starting at 0.
469 a.arora          1.73      */
470                            maxSocketCurrentPass++;
471                        
472 mike             1.112     _entry_mut.unlock();
473 david.dillard    1.95  
474                            //
475                            // The first argument to select() is ignored on Windows and it is not
476                            // a socket value.  The original code assumed that the number of sockets
477                            // and a socket value have the same type.  On Windows they do not.
478                            //
479                        #ifdef PEGASUS_OS_TYPE_WINDOWS
480                            int events = select(0, &fdread, NULL, NULL, &tv);
481                        #else
482 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
483 david.dillard    1.95  #endif
484 mike             1.112     _entry_mut.lock();
485 kumpf            1.116 
486                            // After enqueue a message and the autoEntryMutex has been released and
487                            // locked again, the array of _entries can be changed. The ArrayIterator
488                            // has be reset with the original _entries
489 r.kieninger      1.102     entries.reset(_entries);
490 mike             1.106 
491                            if (events == PEGASUS_SOCKET_ERROR)
492 mday             1.13      {
493 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
494                                    "Monitor::run - errorno = %d has occurred on select.", errno));
495 kumpf            1.116         // The EBADF error indicates that one or more or the file
496                                // descriptions was not valid. This could indicate that
497                                // the entries structure has been corrupted or that
498                                // we have a synchronization error.
499 kumpf            1.50  
500 kumpf            1.116         PEGASUS_ASSERT(errno != EBADF);
501 kumpf            1.50      }
502                            else if (events)
503                            {
504 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
505 kumpf            1.116             "Monitor::run select event received events = %d, monitoring %d "
506                                        "idle entries",
507 marek            1.118             events, _idleEntries));
508 kumpf            1.116         for (int indx = 0; indx < (int)entries.size(); indx++)
509                                {
510                                    // The Monitor should only look at entries in the table that are
511                                    // IDLE (i.e., owned by the Monitor).
512                                    if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
513                                        (FD_ISSET(entries[indx].socket, &fdread)))
514                                    {
515                                        MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
516 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
517 kumpf            1.116                     "Monitor::run indx = %d, queueId =  %d, q = %p",
518 marek            1.118                     indx, entries[indx].queueId, q));
519 kumpf            1.116                 PEGASUS_ASSERT(q !=0);
520                        
521                                        try
522 david.dillard    1.95                  {
523 kumpf            1.116                     if (entries[indx]._type == Monitor::CONNECTION)
524                                            {
525 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
526 kumpf            1.116                             "entries[indx].type for indx = %d is "
527                                                        "Monitor::CONNECTION",
528 marek            1.118                             indx));
529 kumpf            1.116                         static_cast<HTTPConnection *>(q)->_entry_index = indx;
530                        
531                                                // Do not update the entry just yet. The entry gets
532                                                // updated once the request has been read.
533                                                //entries[indx]._status = _MonitorEntry::BUSY;
534 sushma.fernandes 1.78  
535 kumpf            1.116                         // If allocate_and_awaken failure, retry on next
536                                                // iteration
537 a.arora          1.73  /* Removed for PEP 183.
538 kumpf            1.116                         if (!MessageQueueService::get_thread_pool()->
539                                                        allocate_and_awaken((void *)q, _dispatch))
540                                                {
541 kumpf            1.119                             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
542                                                        Tracer::LEVEL2,
543 kumpf            1.116                                 "Monitor::run: Insufficient resources to "
544                                                            "process request.");
545                                                    entries[indx]._status = _MonitorEntry::IDLE;
546                                                    return true;
547                                                }
548 a.arora          1.73  */
549                        // Added for PEP 183
550 kumpf            1.116                         HTTPConnection *dst =
551                                                    reinterpret_cast<HTTPConnection *>(q);
552 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
553 kumpf            1.116                             "Monitor::_dispatch: entering run() for "
554                                                        "indx = %d, queueId = %d, q = %p",
555                                                    dst->_entry_index,
556                                                    dst->_monitor->_entries[dst->_entry_index].queueId,
557 marek            1.118                             dst));
558 kumpf            1.116 
559                                                try
560                                                {
561                                                    dst->run(1);
562                                                }
563                                                catch (...)
564                                                {
565 marek            1.118                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
566 kumpf            1.116                                 "Monitor::_dispatch: exception received");
567                                                }
568 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
569 kumpf            1.116                             "Monitor::_dispatch: exited run() for index %d",
570 marek            1.118                             dst->_entry_index));
571 kumpf            1.116 
572                                                // It is possible the entry status may not be set to
573                                                // busy.  The following will fail in that case.
574                                                // PEGASUS_ASSERT(dst->_monitor->_entries[
575                                                //     dst->_entry_index]._status.get() ==
576                                                //    _MonitorEntry::BUSY);
577                                                // Once the HTTPConnection thread has set the status
578                                                // value to either Monitor::DYING or Monitor::IDLE,
579                                                // it has returned control of the connection to the
580                                                // Monitor.  It is no longer permissible to access
581                                                // the connection or the entry in the _entries table.
582                        
583                                                // The following is not relevant as the worker thread
584                                                // or the reader thread will update the status of the
585                                                // entry.
586                                                //if (dst->_connectionClosePending)
587                                                //{
588                                                //  dst->_monitor->_entries[dst->_entry_index]._status =
589                                                //    _MonitorEntry::DYING;
590                                                //}
591                                                //else
592 kumpf            1.116                         //{
593                                                //  dst->_monitor->_entries[dst->_entry_index]._status =
594                                                //    _MonitorEntry::IDLE;
595                                                //}
596 r.kieninger      1.83  // end Added for PEP 183
597 kumpf            1.116                     }
598                                            else if (entries[indx]._type == Monitor::INTERNAL)
599                                            {
600                                                // set ourself to BUSY,
601 r.kieninger      1.83                          // read the data
602 a.arora          1.73                          // and set ourself back to IDLE
603 r.kieninger      1.83  
604 kumpf            1.116                         entries[indx]._status = _MonitorEntry::BUSY;
605                                                static char buffer[2];
606                                                Sint32 amt =
607                                                    Socket::read(entries[indx].socket,&buffer, 2);
608                        
609                                                if (amt == PEGASUS_SOCKET_ERROR &&
610                                                    getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
611                                                {
612 marek            1.118                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
613 kumpf            1.116                                 "Monitor::run: Tickler socket got an IO error. "
614                                                            "Going to re-create Socket and wait for "
615                                                            "TCP/IP restart.");
616                                                    uninitializeTickler();
617                                                    initializeTickler();
618                                                }
619                                                else
620                                                {
621                                                    entries[indx]._status = _MonitorEntry::IDLE;
622                                                }
623                                            }
624                                            else
625                                            {
626 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
627 kumpf            1.116                             "Non-connection entry, indx = %d, has been "
628                                                        "received.",
629 marek            1.118                             indx));
630 kumpf            1.116                         int events = 0;
631                                                events |= SocketMessage::READ;
632                                                Message* msg = new SocketMessage(
633                                                    entries[indx].socket, events);
634                                                entries[indx]._status = _MonitorEntry::BUSY;
635                                                _entry_mut.unlock();
636                                                q->enqueue(msg);
637                                                _entry_mut.lock();
638                        
639                                                // After enqueue a message and the autoEntryMutex has
640                                                // been released and locked again, the array of
641                                                // entries can be changed. The ArrayIterator has be
642                                                // reset with the original _entries
643                                                entries.reset(_entries);
644                                                entries[indx]._status = _MonitorEntry::IDLE;
645                                            }
646                                        }
647                                        catch (...)
648                                        {
649                                        }
650 thilo.boehm      1.115             }
651 kumpf            1.116         }
652 mday             1.24      }
653 mike             1.2   }
654                        
655 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
656 kumpf            1.48  {
657                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
658 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
659 kumpf            1.48      _stopConnections = 1;
660 a.arora          1.73      tickle();
661 kumpf            1.48  
662 chuck            1.74      if (wait)
663 a.arora          1.73      {
664 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
665                              // caller of this function may unbind the ports while the monitor
666                              // is still accepting connections on them.
667 kumpf            1.101       _stopConnectionsSem.wait();
668 a.arora          1.73      }
669 r.kieninger      1.83  
670 kumpf            1.48      PEG_METHOD_EXIT();
671                        }
672 mday             1.25  
673 mday             1.37  
674 kumpf            1.116 int Monitor::solicitSocketMessages(
675 mike             1.106     SocketHandle socket,
676 mike             1.2       Uint32 events,
677 r.kieninger      1.83      Uint32 queueId,
678 mday             1.8       int type)
679 mike             1.2   {
680 kumpf            1.116     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
681                            AutoMutex autoMut(_entry_mut);
682                            // Check to see if we need to dynamically grow the _entries array
683                            // We always want the _entries array to 2 bigger than the
684                            // current connections requested
685                            _solicitSocketCount++;  // bump the count
686                            int size = (int)_entries.size();
687                            if ((int)_solicitSocketCount >= (size-1))
688                            {
689                                for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
690                                {
691                                    _MonitorEntry entry(0, 0, 0);
692                                    _entries.append(entry);
693                                }
694                            }
695 a.arora          1.73  
696 kumpf            1.116     int index;
697                            for (index = 1; index < (int)_entries.size(); index++)
698                            {
699                                try
700                                {
701                                    if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
702                                    {
703                                        _entries[index].socket = socket;
704                                        _entries[index].queueId  = queueId;
705                                        _entries[index]._type = type;
706                                        _entries[index]._status = _MonitorEntry::IDLE;
707                        
708                                        return index;
709                                    }
710                                }
711                                catch (...)
712                                {
713                                }
714                            }
715                            // decrease the count, if we are here we didn't do anything meaningful
716                            _solicitSocketCount--;
717 kumpf            1.116     PEG_METHOD_EXIT();
718                            return -1;
719 mike             1.2   }
720                        
721 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
722 mike             1.2   {
723 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
724 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
725 a.arora          1.73  
726                            /*
727 kumpf            1.116         Start at index = 1 because _entries[0] is the tickle entry which
728                                never needs to be EMPTY;
729 a.arora          1.73      */
730 w.otsuka         1.89      unsigned int index;
731 kumpf            1.116     for (index = 1; index < _entries.size(); index++)
732 mike             1.2       {
733 kumpf            1.116         if (_entries[index].socket == socket)
734                                {
735                                    _entries[index]._status = _MonitorEntry::EMPTY;
736                                    _entries[index].socket = PEGASUS_INVALID_SOCKET;
737                                    _solicitSocketCount--;
738                                    break;
739                                }
740 mike             1.2       }
741 a.arora          1.73  
742                            /*
743 kumpf            1.116         Dynamic Contraction:
744                                To remove excess entries we will start from the end of the _entries
745                                array and remove all entries with EMPTY status until we find the
746                                first NON EMPTY.  This prevents the positions, of the NON EMPTY
747                                entries, from being changed.
748 r.kieninger      1.83      */
749 a.arora          1.73      index = _entries.size() - 1;
750 kumpf            1.116     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
751                            {
752                                if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
753 a.arora          1.73                  _entries.remove(index);
754 kumpf            1.116         index--;
755 a.arora          1.73      }
756 kumpf            1.4       PEG_METHOD_EXIT();
757 mike             1.2   }
758                        
759 a.arora          1.73  // Note: this is no longer called with PEP 183.
760 kumpf            1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
761 mday             1.7   {
762 kumpf            1.116     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
763 marek            1.118     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
764 kumpf            1.116         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
765                                    "q = %p",
766                                dst->_entry_index,
767                                dst->_monitor->_entries[dst->_entry_index].queueId,
768 marek            1.118         dst));
769 kumpf            1.116 
770                            try
771                            {
772                                dst->run(1);
773                            }
774                            catch (...)
775                            {
776 marek            1.118         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
777 kumpf            1.116             "Monitor::_dispatch: exception received");
778                            }
779 marek            1.118     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
780                                "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
781 kumpf            1.116 
782                            PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
783                                _MonitorEntry::BUSY);
784                        
785                            // Once the HTTPConnection thread has set the status value to either
786                            // Monitor::DYING or Monitor::IDLE, it has returned control of the
787                            // connection to the Monitor.  It is no longer permissible to access the
788                            // connection or the entry in the _entries table.
789                            if (dst->_connectionClosePending)
790                            {
791                                dst->_monitor->_entries[dst->_entry_index]._status =
792                                    _MonitorEntry::DYING;
793                            }
794                            else
795                            {
796                                dst->_monitor->_entries[dst->_entry_index]._status =
797                                    _MonitorEntry::IDLE;
798                            }
799                            return 0;
800 mday             1.40  }
801                        
802 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2