(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             //%/////////////////////////////////////////////////////////////////////////////
 33             
 34 mike  1.107 #include "Network.h"
 35 mike  1.2   #include <Pegasus/Common/Config.h>
 36             #include <cstring>
 37             #include "Monitor.h"
 38             #include "MessageQueue.h"
 39             #include "Socket.h"
 40 kumpf 1.4   #include <Pegasus/Common/Tracer.h>
 41 mday  1.7   #include <Pegasus/Common/HTTPConnection.h>
 42 kumpf 1.69  #include <Pegasus/Common/MessageQueueService.h>
 43 a.arora 1.73  #include <Pegasus/Common/Exception.h>
 44 mike    1.100 #include "ArrayIterator.h"
 45 kumpf   1.122 #include "HostAddress.h"
 46 mike    1.2   
 47               PEGASUS_USING_STD;
 48               
 49               PEGASUS_NAMESPACE_BEGIN
 50               
 51               ////////////////////////////////////////////////////////////////////////////////
 52               //
 53 kumpf   1.122 // Tickler
 54               //
 55               ////////////////////////////////////////////////////////////////////////////////
 56               
 57               Tickler::Tickler()
 58                   : _listenSocket(PEGASUS_INVALID_SOCKET),
 59                     _clientSocket(PEGASUS_INVALID_SOCKET),
 60                     _serverSocket(PEGASUS_INVALID_SOCKET)
 61               {
 62 kumpf   1.123     try
 63                   {
 64                       _initialize();
 65                   }
 66                   catch (...)
 67                   {
 68                       _uninitialize();
 69                       throw;
 70                   }
 71 kumpf   1.122 }
 72               
 73               Tickler::~Tickler()
 74               {
 75 kumpf   1.123     _uninitialize();
 76 kumpf   1.122 }
 77               
 78 kumpf   1.127 #if defined(PEGASUS_OS_TYPE_UNIX)
 79 kumpf   1.123 
 80               // Use an anonymous pipe for the tickle connection.
 81               
 82               void Tickler::_initialize()
 83               {
 84                   int fds[2];
 85               
 86                   if (pipe(fds) == -1)
 87                   {
 88                       MessageLoaderParms parms(
 89                           "Common.Monitor.TICKLE_CREATE",
 90                           "Received error number $0 while creating the internal socket.",
 91                           getSocketError());
 92                       throw Exception(parms);
 93                   }
 94               
 95                   _serverSocket = fds[0];
 96                   _clientSocket = fds[1];
 97               }
 98               
 99               #else
100 kumpf   1.123 
101               // Use an external loopback socket connection to allow the tickle socket to
102 kumpf   1.127 // be included in the select() array on non-Unix platforms.
103 kumpf   1.123 
104               void Tickler::_initialize()
105 kumpf   1.122 {
106                   //
107                   // Set up the addresses for the listen, client, and server sockets
108                   // based on whether IPv6 is enabled.
109                   //
110               
111 venkat.puvvada 1.124     Socket::initializeInterface();
112                      
113 kumpf          1.123 # ifdef PEGASUS_ENABLE_IPV6
114 kumpf          1.122     struct sockaddr_storage listenAddress;
115                          struct sockaddr_storage clientAddress;
116                          struct sockaddr_storage serverAddress;
117 kumpf          1.123 # else
118 kumpf          1.122     struct sockaddr_in listenAddress;
119                          struct sockaddr_in clientAddress;
120                          struct sockaddr_in serverAddress;
121 kumpf          1.123 # endif
122 kumpf          1.122 
123                          int addressFamily;
124                          SocketLength addressLength;
125                      
126 dave.sudlik    1.125     memset(&listenAddress, 0, sizeof (listenAddress));
127                      
128 kumpf          1.123 # ifdef PEGASUS_ENABLE_IPV6
129 kumpf          1.122     if (System::isIPv6StackActive())
130                          {
131                              // Use the IPv6 loopback address for the listen sockets
132                              HostAddress::convertTextToBinary(
133                                  HostAddress::AT_IPV6,
134                                  "::1",
135                                  &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
136                              listenAddress.ss_family = AF_INET6;
137                              reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
138                      
139                              addressFamily = AF_INET6;
140                              addressLength = sizeof(struct sockaddr_in6);
141                          }
142                          else
143 kumpf          1.123 # endif
144 kumpf          1.122     {
145                              // Use the IPv4 loopback address for the listen sockets
146                              HostAddress::convertTextToBinary(
147                                  HostAddress::AT_IPV4,
148                                  "127.0.0.1",
149                                  &reinterpret_cast<struct sockaddr_in*>(
150                                      &listenAddress)->sin_addr.s_addr);
151                              reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
152                                  AF_INET;
153                              reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
154                      
155                              addressFamily = AF_INET;
156                              addressLength = sizeof(struct sockaddr_in);
157                          }
158                      
159                          // Use the same address for the client socket as the listen socket
160                          clientAddress = listenAddress;
161                      
162                          //
163                          // Set up a listen socket to allow the tickle client and server to connect
164                          //
165 kumpf          1.122 
166                          // Create the listen socket
167                          if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
168                                   PEGASUS_INVALID_SOCKET)
169                          {
170                              MessageLoaderParms parms(
171                                  "Common.Monitor.TICKLE_CREATE",
172                                  "Received error number $0 while creating the internal socket.",
173                                  getSocketError());
174                              throw Exception(parms);
175                          }
176                      
177                          // Bind the listen socket to the loopback address
178                          if (::bind(
179                                  _listenSocket,
180                                  reinterpret_cast<struct sockaddr*>(&listenAddress),
181                                  addressLength) < 0)
182                          {
183                              MessageLoaderParms parms(
184                                  "Common.Monitor.TICKLE_BIND",
185                                  "Received error number $0 while binding the internal socket.",
186 kumpf          1.122             getSocketError());
187                              throw Exception(parms);
188                          }
189                      
190                          // Listen for a connection from the tickle client
191                          if ((::listen(_listenSocket, 3)) < 0)
192                          {
193                              MessageLoaderParms parms(
194                                  "Common.Monitor.TICKLE_LISTEN",
195                                  "Received error number $0 while listening to the internal socket.",
196                                  getSocketError());
197                              throw Exception(parms);
198                          }
199                      
200                          // Verify we have the correct listen socket
201                          SocketLength tmpAddressLength = addressLength;
202                          int sock = ::getsockname(
203                              _listenSocket,
204                              reinterpret_cast<struct sockaddr*>(&listenAddress),
205                              &tmpAddressLength);
206                          if (sock < 0)
207 kumpf          1.122     {
208                              MessageLoaderParms parms(
209                                  "Common.Monitor.TICKLE_SOCKNAME",
210                                  "Received error number $0 while getting the internal socket name.",
211                                  getSocketError());
212                              throw Exception(parms);
213                          }
214                      
215                          //
216                          // Set up the client side of the tickle connection.
217                          //
218                      
219                          // Create the client socket
220                          if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
221                                   PEGASUS_INVALID_SOCKET)
222                          {
223                              MessageLoaderParms parms(
224                                  "Common.Monitor.TICKLE_CLIENT_CREATE",
225                                  "Received error number $0 while creating the internal client "
226                                      "socket.",
227                                  getSocketError());
228 kumpf          1.122         throw Exception(parms);
229                          }
230                      
231                          // Bind the client socket to the loopback address
232                          if (::bind(
233                                  _clientSocket,
234                                  reinterpret_cast<struct sockaddr*>(&clientAddress),
235                                  addressLength) < 0)
236                          {
237                              MessageLoaderParms parms(
238                                  "Common.Monitor.TICKLE_CLIENT_BIND",
239                                  "Received error number $0 while binding the internal client "
240                                      "socket.",
241                                  getSocketError());
242                              throw Exception(parms);
243                          }
244                      
245                          // Connect the client socket to the listen socket address
246                          if (::connect(
247                                  _clientSocket,
248                                  reinterpret_cast<struct sockaddr*>(&listenAddress),
249 kumpf          1.122             addressLength) < 0)
250                          {
251                              MessageLoaderParms parms(
252                                  "Common.Monitor.TICKLE_CLIENT_CONNECT",
253                                  "Received error number $0 while connecting the internal client "
254                                      "socket.",
255                                  getSocketError());
256                              throw Exception(parms);
257                          }
258                      
259                          //
260                          // Set up the server side of the tickle connection.
261                          //
262                      
263                          tmpAddressLength = addressLength;
264                      
265 kumpf          1.123     // Accept the client socket connection.
266                          _serverSocket = ::accept(
267                              _listenSocket,
268                              reinterpret_cast<struct sockaddr*>(&serverAddress),
269                              &tmpAddressLength);
270 kumpf          1.122 
271                          if (_serverSocket == PEGASUS_SOCKET_ERROR)
272                          {
273                              MessageLoaderParms parms(
274                                  "Common.Monitor.TICKLE_ACCEPT",
275                                  "Received error number $0 while accepting the internal socket "
276                                      "connection.",
277                                  getSocketError());
278                              throw Exception(parms);
279                          }
280                      
281                          //
282                          // Close the listen socket and make the other sockets non-blocking
283                          //
284                      
285                          Socket::close(_listenSocket);
286                          Socket::disableBlocking(_serverSocket);
287                          Socket::disableBlocking(_clientSocket);
288                      }
289                      
290 kumpf          1.123 #endif
291                      
292                      void Tickler::_uninitialize()
293 kumpf          1.122 {
294                          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
295                      
296                          try
297                          {
298 kumpf          1.129         Socket::close(_serverSocket);
299                              Socket::close(_clientSocket);
300                              Socket::close(_listenSocket);
301 kumpf          1.122     }
302                          catch (...)
303                          {
304                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
305                                  "Failed to close tickle sockets");
306                          }
307 venkat.puvvada 1.124     Socket::uninitializeInterface();
308 kumpf          1.122 }
309                      
310                      
311                      ////////////////////////////////////////////////////////////////////////////////
312                      //
313 mike           1.2   // Monitor
314                      //
315                      ////////////////////////////////////////////////////////////////////////////////
316                      
317 kumpf          1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
318 mike           1.2   Monitor::Monitor()
319 kumpf          1.87     : _stopConnections(0),
320 a.arora        1.73       _stopConnectionsSem(0),
321 kumpf          1.122      _solicitSocketCount(0)
322 mike           1.2   {
323 kumpf          1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
324                          _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
325 a.arora        1.73  
326 kumpf          1.123     // Create a MonitorEntry for the Tickler and set its state to IDLE so the
327                          // Monitor will watch for its events.
328 kumpf          1.126     _entries.append(MonitorEntry(
329                              _tickler.getServerSocket(),
330                              1,
331                              MonitorEntry::STATUS_IDLE,
332                              MonitorEntry::TYPE_INTERNAL));
333 a.arora        1.73  
334 kumpf          1.123     // Start the count at 1 because _entries[0] is the Tickler
335 kumpf          1.116     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
336 mday           1.37      {
337 kumpf          1.126         _entries.append(MonitorEntry());
338 mday           1.37      }
339 mday           1.18  }
340 mday           1.20  
341 mike           1.2   Monitor::~Monitor()
342                      {
343 marek          1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
344 thilo.boehm    1.115                   "returning from monitor destructor");
345                      }
346 mday           1.18  
347 kumpf          1.116 void Monitor::tickle()
348 a.arora        1.73  {
349 kumpf          1.126     Socket::write(_tickler.getClientSocket(), "\0", 1);
350 sushma.fernandes 1.78  }
351                        
352 kumpf            1.121 void Monitor::setState(
353                            Uint32 index,
354 kumpf            1.126     MonitorEntry::Status status)
355 sushma.fernandes 1.78  {
356 kumpf            1.126     AutoMutex autoEntryMutex(_entriesMutex);
357 sushma.fernandes 1.78      // Set the state to requested state
358 kumpf            1.126     _entries[index].status = status;
359 a.arora          1.73  }
360                        
361 kumpf            1.114 void Monitor::run(Uint32 milliseconds)
362 mike             1.2   {
363 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
364 a.arora          1.73  
365 mday             1.25      fd_set fdread;
366                            FD_ZERO(&fdread);
367 a.arora          1.73  
368 kumpf            1.126     AutoMutex autoEntryMutex(_entriesMutex);
369 r.kieninger      1.83  
370 kumpf            1.126     ArrayIterator<MonitorEntry> entries(_entries);
371 mike             1.100 
372 kumpf            1.116     // Check the stopConnections flag.  If set, clear the Acceptor monitor
373                            // entries
374 mike             1.96      if (_stopConnections.get() == 1)
375 kumpf            1.48      {
376 kumpf            1.126         for (Uint32 indx = 0; indx < entries.size(); indx++)
377 kumpf            1.48          {
378 kumpf            1.126             if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
379 kumpf            1.48              {
380 kumpf            1.126                 if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
381 kumpf            1.48                  {
382 kumpf            1.126                     if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
383                                                entries[indx].status == MonitorEntry::STATUS_DYING)
384                                            {
385                                                // remove the entry
386                                                entries[indx].status = MonitorEntry::STATUS_EMPTY;
387                                            }
388                                            else
389                                            {
390                                                // set status to DYING
391                                                entries[indx].status = MonitorEntry::STATUS_DYING;
392                                            }
393                                        }
394                                    }
395 kumpf            1.48          }
396                                _stopConnections = 0;
397 kumpf            1.116         _stopConnectionsSem.signal();
398 kumpf            1.48      }
399 kumpf            1.51  
400 kumpf            1.126     for (Uint32 indx = 0; indx < entries.size(); indx++)
401 kumpf            1.68      {
402 kumpf            1.126         const MonitorEntry& entry = entries[indx];
403                        
404                                if ((entry.status == MonitorEntry::STATUS_DYING) &&
405                                    (entry.type == MonitorEntry::TYPE_CONNECTION))
406 kumpf            1.116         {
407                                    MessageQueue *q = MessageQueue::lookup(entry.queueId);
408                                    PEGASUS_ASSERT(q != 0);
409                                    HTTPConnection &h = *static_cast<HTTPConnection *>(q);
410                        
411                                    if (h._connectionClosePending == false)
412                                        continue;
413                        
414                                    // NOTE: do not attempt to delete while there are pending responses
415                                    // coming thru. The last response to come thru after a
416                                    // _connectionClosePending will reset _responsePending to false
417                                    // and then cause the monitor to rerun this code and clean up.
418                                    // (see HTTPConnection.cpp)
419                        
420                                    if (h._responsePending == true)
421                                    {
422 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
423 kumpf            1.116                     "Monitor::run - Ignoring connection delete request "
424                                                "because responses are still pending. "
425                                                "connection=0x%p, socket=%d\n",
426 marek            1.118                     (void *)&h, h.getSocket()));
427 kumpf            1.116                 continue;
428                                    }
429                                    h._connectionClosePending = false;
430                                    MessageQueue &o = h.get_owner();
431                                    Message* message= new CloseConnectionMessage(entry.socket);
432                                    message->dest = o.getQueueId();
433                        
434                                    // HTTPAcceptor is responsible for closing the connection.
435                                    // The lock is released to allow HTTPAcceptor to call
436                                    // unsolicitSocketMessages to free the entry.
437                                    // Once HTTPAcceptor completes processing of the close
438                                    // connection, the lock is re-requested and processing of
439                                    // the for loop continues.  This is safe with the current
440                                    // implementation of the entries object.  Note that the
441                                    // loop condition accesses the entries.size() on each
442                                    // iteration, so that a change in size while the mutex is
443                                    // unlocked will not result in an ArrayIndexOutOfBounds
444                                    // exception.
445                        
446 kumpf            1.126             _entriesMutex.unlock();
447 kumpf            1.116             o.enqueue(message);
448 kumpf            1.126             _entriesMutex.lock();
449 kumpf            1.116 
450                                    // After enqueue a message and the autoEntryMutex has been
451                                    // released and locked again, the array of _entries can be
452                                    // changed. The ArrayIterator has be reset with the original
453                                    // _entries.
454                                    entries.reset(_entries);
455                                }
456 kumpf            1.68      }
457                        
458 kumpf            1.51      Uint32 _idleEntries = 0;
459 r.kieninger      1.83  
460 a.arora          1.73      /*
461 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
462                                to the kernel as a parameter to SELECT.  This loop seems like a good
463                                place to calculate the max file descriptor (maximum socket number)
464                                because we have to traverse the entire array.
465 r.kieninger      1.83      */
466 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
467 kumpf            1.126     for (Uint32 indx = 0; indx < entries.size(); indx++)
468 mike             1.2       {
469 kumpf            1.126         if (maxSocketCurrentPass < entries[indx].socket)
470                                    maxSocketCurrentPass = entries[indx].socket;
471 a.arora          1.73  
472 kumpf            1.126         if (entries[indx].status == MonitorEntry::STATUS_IDLE)
473                                {
474                                    _idleEntries++;
475                                    FD_SET(entries[indx].socket, &fdread);
476                                }
477 mday             1.13      }
478 s.hills          1.62  
479 a.arora          1.73      /*
480 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
481                                descriptors starting at 0.
482 a.arora          1.73      */
483                            maxSocketCurrentPass++;
484                        
485 kumpf            1.126     _entriesMutex.unlock();
486 david.dillard    1.95  
487                            //
488                            // The first argument to select() is ignored on Windows and it is not
489                            // a socket value.  The original code assumed that the number of sockets
490                            // and a socket value have the same type.  On Windows they do not.
491                            //
492                        #ifdef PEGASUS_OS_TYPE_WINDOWS
493                            int events = select(0, &fdread, NULL, NULL, &tv);
494                        #else
495 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
496 david.dillard    1.95  #endif
497 kumpf            1.130     int selectErrno = getSocketError();
498                        
499 kumpf            1.126     _entriesMutex.lock();
500 kumpf            1.116 
501 dave.sudlik      1.128     struct timeval timeNow;
502                            Time::gettimeofday(&timeNow);
503                        
504 kumpf            1.116     // After enqueue a message and the autoEntryMutex has been released and
505                            // locked again, the array of _entries can be changed. The ArrayIterator
506                            // has be reset with the original _entries
507 r.kieninger      1.102     entries.reset(_entries);
508 mike             1.106 
509                            if (events == PEGASUS_SOCKET_ERROR)
510 mday             1.13      {
511 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
512 kumpf            1.130             "Monitor::run - select() returned error %d.", selectErrno));
513 kumpf            1.116         // The EBADF error indicates that one or more or the file
514                                // descriptions was not valid. This could indicate that
515                                // the entries structure has been corrupted or that
516                                // we have a synchronization error.
517 kumpf            1.50  
518 kumpf            1.130         PEGASUS_ASSERT(selectErrno != EBADF);
519 kumpf            1.50      }
520                            else if (events)
521                            {
522 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
523 kumpf            1.116             "Monitor::run select event received events = %d, monitoring %d "
524                                        "idle entries",
525 marek            1.118             events, _idleEntries));
526 kumpf            1.126         for (Uint32 indx = 0; indx < entries.size(); indx++)
527 kumpf            1.116         {
528                                    // The Monitor should only look at entries in the table that are
529                                    // IDLE (i.e., owned by the Monitor).
530 kumpf            1.126             if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
531 kumpf            1.116                 (FD_ISSET(entries[indx].socket, &fdread)))
532                                    {
533 kumpf            1.126                 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
534                                        PEGASUS_ASSERT(q != 0);
535 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
536 kumpf            1.126                     "Monitor::run indx = %d, queueId = %d, q = %p",
537 marek            1.118                     indx, entries[indx].queueId, q));
538 kumpf            1.116 
539                                        try
540 david.dillard    1.95                  {
541 kumpf            1.126                     if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
542 kumpf            1.116                     {
543 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
544 kumpf            1.126                             "entries[%d].type is TYPE_CONNECTION",
545 marek            1.118                             indx));
546 kumpf            1.116 
547 dave.sudlik      1.128                         HTTPConnection *dst = 
548 kumpf            1.116                             reinterpret_cast<HTTPConnection *>(q);
549 dave.sudlik      1.128                         dst->_entry_index = indx;
550 kumpf            1.116 
551 dave.sudlik      1.128                         // Update idle start time because we have received some
552                                                // data. Any data is good data at this point, and we'll
553                                                // keep the connection alive, even if we've exceeded
554                                                // the idleConnectionTimeout, which will be checked
555                                                // when we call closeConnectionOnTimeout() next.
556                                                Time::gettimeofday(&dst->_idleStartTime);
557                        
558                                                // Check for accept pending (ie. SSL handshake pending)
559                                                // or idle connection timeouts for sockets from which
560                                                // we received data (avoiding extra queue lookup below).
561                                                if (!dst->closeConnectionOnTimeout(&timeNow))
562 kumpf            1.116                         {
563 dave.sudlik      1.128                             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
564                                                        "Entering HTTPConnection::run() for "
565                                                            "indx = %d, queueId = %d, q = %p",
566                                                        indx, entries[indx].queueId, q));
567                        
568                                                    try
569                                                    {
570                                                        dst->run(1);
571                                                    }
572                                                    catch (...)
573                                                    {
574                                                        PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
575                                                            "Caught exception from "
576                                                            "HTTPConnection::run()");
577                                                    }
578                                                    PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
579                                                        "Exited HTTPConnection::run()");
580                                                    }
581                        
582                        
583 kumpf            1.116                     }
584 kumpf            1.126                     else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
585 kumpf            1.116                     {
586 kumpf            1.126                         char buffer;
587                                                Sint32 ignored =
588                                                    Socket::read(entries[indx].socket, &buffer, 1);
589 kumpf            1.116                     }
590                                            else
591                                            {
592 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
593 kumpf            1.116                             "Non-connection entry, indx = %d, has been "
594                                                        "received.",
595 marek            1.118                             indx));
596 kumpf            1.116                         int events = 0;
597                                                events |= SocketMessage::READ;
598                                                Message* msg = new SocketMessage(
599                                                    entries[indx].socket, events);
600 kumpf            1.126                         entries[indx].status = MonitorEntry::STATUS_BUSY;
601                                                _entriesMutex.unlock();
602 kumpf            1.116                         q->enqueue(msg);
603 kumpf            1.126                         _entriesMutex.lock();
604 kumpf            1.116 
605                                                // After enqueue a message and the autoEntryMutex has
606                                                // been released and locked again, the array of
607 kumpf            1.126                         // entries can be changed. The ArrayIterator has to be
608                                                // reset with the latest _entries.
609 kumpf            1.116                         entries.reset(_entries);
610 kumpf            1.126                         entries[indx].status = MonitorEntry::STATUS_IDLE;
611 kumpf            1.116                     }
612                                        }
613                                        catch (...)
614                                        {
615                                        }
616 thilo.boehm      1.115             }
617 dave.sudlik      1.128             // else check for accept pending (ie. SSL handshake pending) or
618                                    // idle connection timeouts for sockets from which we did not
619                                    // receive data.
620                                    else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
621                                        entries[indx].type == MonitorEntry::TYPE_CONNECTION)
622                        
623                                    {
624                                        MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
625                                        HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
626                                        dst->_entry_index = indx;
627                                        dst->closeConnectionOnTimeout(&timeNow);
628                                    }
629                                }
630                            }
631                            // else if "events" is zero (ie. select timed out) then we still need
632                            // to check if there are any pending SSL handshakes that have timed out.
633                            else
634                            {
635                                for (Uint32 indx = 0; indx < entries.size(); indx++)
636                                {
637                                    if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
638 dave.sudlik      1.128                 entries[indx].type == MonitorEntry::TYPE_CONNECTION)
639                                    {
640                                        MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
641                                        HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
642                                        dst->_entry_index = indx;
643                                        dst->closeConnectionOnTimeout(&timeNow);
644                                    }
645 kumpf            1.116         }
646 mday             1.24      }
647 mike             1.2   }
648                        
649 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
650 kumpf            1.48  {
651                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
652 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
653 kumpf            1.48      _stopConnections = 1;
654 a.arora          1.73      tickle();
655 kumpf            1.48  
656 chuck            1.74      if (wait)
657 a.arora          1.73      {
658 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
659                              // caller of this function may unbind the ports while the monitor
660                              // is still accepting connections on them.
661 kumpf            1.101       _stopConnectionsSem.wait();
662 a.arora          1.73      }
663 r.kieninger      1.83  
664 kumpf            1.48      PEG_METHOD_EXIT();
665                        }
666 mday             1.25  
667 mday             1.37  
668 kumpf            1.116 int Monitor::solicitSocketMessages(
669 mike             1.106     SocketHandle socket,
670 mike             1.2       Uint32 events,
671 r.kieninger      1.83      Uint32 queueId,
672 kumpf            1.126     Uint32 type)
673 mike             1.2   {
674 kumpf            1.116     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
675 kumpf            1.126     AutoMutex autoMut(_entriesMutex);
676                        
677 kumpf            1.116     // Check to see if we need to dynamically grow the _entries array
678 kumpf            1.126     // We always want the _entries array to be 2 bigger than the
679 kumpf            1.116     // current connections requested
680                            _solicitSocketCount++;  // bump the count
681 kumpf            1.126 
682                            for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
683 kumpf            1.116     {
684 kumpf            1.126         _entries.append(MonitorEntry());
685 kumpf            1.116     }
686 a.arora          1.73  
687 kumpf            1.126     for (Uint32 index = 1; index < _entries.size(); index++)
688 kumpf            1.116     {
689                                try
690                                {
691 kumpf            1.126             if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
692 kumpf            1.116             {
693                                        _entries[index].socket = socket;
694                                        _entries[index].queueId  = queueId;
695 kumpf            1.126                 _entries[index].type = type;
696                                        _entries[index].status = MonitorEntry::STATUS_IDLE;
697 kumpf            1.116 
698 kumpf            1.126                 return (int)index;
699 kumpf            1.116             }
700                                }
701                                catch (...)
702                                {
703                                }
704                            }
705                            // decrease the count, if we are here we didn't do anything meaningful
706                            _solicitSocketCount--;
707                            PEG_METHOD_EXIT();
708                            return -1;
709 mike             1.2   }
710                        
711 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
712 mike             1.2   {
713 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
714 kumpf            1.126     AutoMutex autoMut(_entriesMutex);
715 a.arora          1.73  
716                            /*
717 kumpf            1.116         Start at index = 1 because _entries[0] is the tickle entry which
718 kumpf            1.126         never needs to be reset to EMPTY;
719 a.arora          1.73      */
720 kumpf            1.126     for (Uint32 index = 1; index < _entries.size(); index++)
721 mike             1.2       {
722 kumpf            1.116         if (_entries[index].socket == socket)
723                                {
724 kumpf            1.126             _entries[index].reset();
725 kumpf            1.116             _solicitSocketCount--;
726                                    break;
727                                }
728 mike             1.2       }
729 a.arora          1.73  
730                            /*
731 kumpf            1.116         Dynamic Contraction:
732                                To remove excess entries we will start from the end of the _entries
733                                array and remove all entries with EMPTY status until we find the
734                                first NON EMPTY.  This prevents the positions, of the NON EMPTY
735                                entries, from being changed.
736 r.kieninger      1.83      */
737 kumpf            1.126     for (Uint32 index = _entries.size() - 1;
738                                 (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
739                                     (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
740                                 index--)
741 kumpf            1.116     {
742 kumpf            1.126         _entries.remove(index);
743 a.arora          1.73      }
744 kumpf            1.126 
745 kumpf            1.4       PEG_METHOD_EXIT();
746 mike             1.2   }
747                        
748                        PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2