(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             //%/////////////////////////////////////////////////////////////////////////////
 33             
 34 mike  1.107 #include "Network.h"
 35 mike  1.2   #include <Pegasus/Common/Config.h>
 36             #include <cstring>
 37             #include "Monitor.h"
 38             #include "MessageQueue.h"
 39             #include "Socket.h"
 40 kumpf 1.4   #include <Pegasus/Common/Tracer.h>
 41 mday  1.7   #include <Pegasus/Common/HTTPConnection.h>
 42 kumpf 1.69  #include <Pegasus/Common/MessageQueueService.h>
 43 a.arora 1.73  #include <Pegasus/Common/Exception.h>
 44 mike    1.100 #include "ArrayIterator.h"
 45 kumpf   1.122 #include "HostAddress.h"
 46 mike    1.113 #include <errno.h>
 47 mike    1.2   
 48               PEGASUS_USING_STD;
 49               
 50               PEGASUS_NAMESPACE_BEGIN
 51               
 52               ////////////////////////////////////////////////////////////////////////////////
 53               //
 54 kumpf   1.122 // Tickler
 55               //
 56               ////////////////////////////////////////////////////////////////////////////////
 57               
 58               Tickler::Tickler()
 59                   : _listenSocket(PEGASUS_INVALID_SOCKET),
 60                     _clientSocket(PEGASUS_INVALID_SOCKET),
 61                     _serverSocket(PEGASUS_INVALID_SOCKET)
 62               {
 63 kumpf   1.123     try
 64                   {
 65                       _initialize();
 66                   }
 67                   catch (...)
 68                   {
 69                       _uninitialize();
 70                       throw;
 71                   }
 72 kumpf   1.122 }
 73               
 74               Tickler::~Tickler()
 75               {
 76 kumpf   1.123     _uninitialize();
 77 kumpf   1.122 }
 78               
 79 kumpf   1.127 #if defined(PEGASUS_OS_TYPE_UNIX)
 80 kumpf   1.123 
 81               // Use an anonymous pipe for the tickle connection.
 82               
 83               void Tickler::_initialize()
 84               {
 85                   int fds[2];
 86               
 87                   if (pipe(fds) == -1)
 88                   {
 89                       MessageLoaderParms parms(
 90                           "Common.Monitor.TICKLE_CREATE",
 91                           "Received error number $0 while creating the internal socket.",
 92                           getSocketError());
 93                       throw Exception(parms);
 94                   }
 95               
 96                   _serverSocket = fds[0];
 97                   _clientSocket = fds[1];
 98               }
 99               
100               #else
101 kumpf   1.123 
102               // Use an external loopback socket connection to allow the tickle socket to
103 kumpf   1.127 // be included in the select() array on non-Unix platforms.
104 kumpf   1.123 
105               void Tickler::_initialize()
106 kumpf   1.122 {
107                   //
108                   // Set up the addresses for the listen, client, and server sockets
109                   // based on whether IPv6 is enabled.
110                   //
111               
112 venkat.puvvada 1.124     Socket::initializeInterface();
113                      
114 kumpf          1.123 # ifdef PEGASUS_ENABLE_IPV6
115 kumpf          1.122     struct sockaddr_storage listenAddress;
116                          struct sockaddr_storage clientAddress;
117                          struct sockaddr_storage serverAddress;
118 kumpf          1.123 # else
119 kumpf          1.122     struct sockaddr_in listenAddress;
120                          struct sockaddr_in clientAddress;
121                          struct sockaddr_in serverAddress;
122 kumpf          1.123 # endif
123 kumpf          1.122 
124                          int addressFamily;
125                          SocketLength addressLength;
126                      
127 dave.sudlik    1.125     memset(&listenAddress, 0, sizeof (listenAddress));
128                      
129 kumpf          1.123 # ifdef PEGASUS_ENABLE_IPV6
130 kumpf          1.122     if (System::isIPv6StackActive())
131                          {
132                              // Use the IPv6 loopback address for the listen sockets
133                              HostAddress::convertTextToBinary(
134                                  HostAddress::AT_IPV6,
135                                  "::1",
136                                  &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
137                              listenAddress.ss_family = AF_INET6;
138                              reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
139                      
140                              addressFamily = AF_INET6;
141                              addressLength = sizeof(struct sockaddr_in6);
142                          }
143                          else
144 kumpf          1.123 # endif
145 kumpf          1.122     {
146                              // Use the IPv4 loopback address for the listen sockets
147                              HostAddress::convertTextToBinary(
148                                  HostAddress::AT_IPV4,
149                                  "127.0.0.1",
150                                  &reinterpret_cast<struct sockaddr_in*>(
151                                      &listenAddress)->sin_addr.s_addr);
152                              reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
153                                  AF_INET;
154                              reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
155                      
156                              addressFamily = AF_INET;
157                              addressLength = sizeof(struct sockaddr_in);
158                          }
159                      
160                          // Use the same address for the client socket as the listen socket
161                          clientAddress = listenAddress;
162                      
163                          //
164                          // Set up a listen socket to allow the tickle client and server to connect
165                          //
166 kumpf          1.122 
167                          // Create the listen socket
168                          if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
169                                   PEGASUS_INVALID_SOCKET)
170                          {
171                              MessageLoaderParms parms(
172                                  "Common.Monitor.TICKLE_CREATE",
173                                  "Received error number $0 while creating the internal socket.",
174                                  getSocketError());
175                              throw Exception(parms);
176                          }
177                      
178                          // Bind the listen socket to the loopback address
179                          if (::bind(
180                                  _listenSocket,
181                                  reinterpret_cast<struct sockaddr*>(&listenAddress),
182                                  addressLength) < 0)
183                          {
184                              MessageLoaderParms parms(
185                                  "Common.Monitor.TICKLE_BIND",
186                                  "Received error number $0 while binding the internal socket.",
187 kumpf          1.122             getSocketError());
188                              throw Exception(parms);
189                          }
190                      
191                          // Listen for a connection from the tickle client
192                          if ((::listen(_listenSocket, 3)) < 0)
193                          {
194                              MessageLoaderParms parms(
195                                  "Common.Monitor.TICKLE_LISTEN",
196                                  "Received error number $0 while listening to the internal socket.",
197                                  getSocketError());
198                              throw Exception(parms);
199                          }
200                      
201                          // Verify we have the correct listen socket
202                          SocketLength tmpAddressLength = addressLength;
203                          int sock = ::getsockname(
204                              _listenSocket,
205                              reinterpret_cast<struct sockaddr*>(&listenAddress),
206                              &tmpAddressLength);
207                          if (sock < 0)
208 kumpf          1.122     {
209                              MessageLoaderParms parms(
210                                  "Common.Monitor.TICKLE_SOCKNAME",
211                                  "Received error number $0 while getting the internal socket name.",
212                                  getSocketError());
213                              throw Exception(parms);
214                          }
215                      
216                          //
217                          // Set up the client side of the tickle connection.
218                          //
219                      
220                          // Create the client socket
221                          if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
222                                   PEGASUS_INVALID_SOCKET)
223                          {
224                              MessageLoaderParms parms(
225                                  "Common.Monitor.TICKLE_CLIENT_CREATE",
226                                  "Received error number $0 while creating the internal client "
227                                      "socket.",
228                                  getSocketError());
229 kumpf          1.122         throw Exception(parms);
230                          }
231                      
232                          // Bind the client socket to the loopback address
233                          if (::bind(
234                                  _clientSocket,
235                                  reinterpret_cast<struct sockaddr*>(&clientAddress),
236                                  addressLength) < 0)
237                          {
238                              MessageLoaderParms parms(
239                                  "Common.Monitor.TICKLE_CLIENT_BIND",
240                                  "Received error number $0 while binding the internal client "
241                                      "socket.",
242                                  getSocketError());
243                              throw Exception(parms);
244                          }
245                      
246                          // Connect the client socket to the listen socket address
247                          if (::connect(
248                                  _clientSocket,
249                                  reinterpret_cast<struct sockaddr*>(&listenAddress),
250 kumpf          1.122             addressLength) < 0)
251                          {
252                              MessageLoaderParms parms(
253                                  "Common.Monitor.TICKLE_CLIENT_CONNECT",
254                                  "Received error number $0 while connecting the internal client "
255                                      "socket.",
256                                  getSocketError());
257                              throw Exception(parms);
258                          }
259                      
260                          //
261                          // Set up the server side of the tickle connection.
262                          //
263                      
264                          tmpAddressLength = addressLength;
265                      
266 kumpf          1.123     // Accept the client socket connection.
267                          _serverSocket = ::accept(
268                              _listenSocket,
269                              reinterpret_cast<struct sockaddr*>(&serverAddress),
270                              &tmpAddressLength);
271 kumpf          1.122 
272                          if (_serverSocket == PEGASUS_SOCKET_ERROR)
273                          {
274                              MessageLoaderParms parms(
275                                  "Common.Monitor.TICKLE_ACCEPT",
276                                  "Received error number $0 while accepting the internal socket "
277                                      "connection.",
278                                  getSocketError());
279                              throw Exception(parms);
280                          }
281                      
282                          //
283                          // Close the listen socket and make the other sockets non-blocking
284                          //
285                      
286                          Socket::close(_listenSocket);
287                          _listenSocket = PEGASUS_INVALID_SOCKET;
288                      
289                          Socket::disableBlocking(_serverSocket);
290                          Socket::disableBlocking(_clientSocket);
291                      }
292 kumpf          1.122 
293 kumpf          1.123 #endif
294                      
295                      void Tickler::_uninitialize()
296 kumpf          1.122 {
297                          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
298                      
299                          try
300                          {
301                              if (_serverSocket != PEGASUS_INVALID_SOCKET)
302                              {
303                                  Socket::close(_serverSocket);
304                                  _serverSocket = PEGASUS_INVALID_SOCKET;
305                              }
306                              if (_clientSocket != PEGASUS_INVALID_SOCKET)
307                              {
308                                  Socket::close(_clientSocket);
309                                  _clientSocket = PEGASUS_INVALID_SOCKET;
310                              }
311                              if (_listenSocket != PEGASUS_INVALID_SOCKET)
312                              {
313                                  Socket::close(_listenSocket);
314                                  _listenSocket = PEGASUS_INVALID_SOCKET;
315                              }
316                          }
317 kumpf          1.122     catch (...)
318                          {
319                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
320                                  "Failed to close tickle sockets");
321                          }
322 venkat.puvvada 1.124     Socket::uninitializeInterface();
323 kumpf          1.122 }
324                      
325                      
326                      ////////////////////////////////////////////////////////////////////////////////
327                      //
328 mike           1.2   // Monitor
329                      //
330                      ////////////////////////////////////////////////////////////////////////////////
331                      
332 kumpf          1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
333 mike           1.2   Monitor::Monitor()
334 kumpf          1.87     : _stopConnections(0),
335 a.arora        1.73       _stopConnectionsSem(0),
336 kumpf          1.122      _solicitSocketCount(0)
337 mike           1.2   {
338 kumpf          1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
339                          _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
340 a.arora        1.73  
341 kumpf          1.123     // Create a MonitorEntry for the Tickler and set its state to IDLE so the
342                          // Monitor will watch for its events.
343 kumpf          1.126     _entries.append(MonitorEntry(
344                              _tickler.getServerSocket(),
345                              1,
346                              MonitorEntry::STATUS_IDLE,
347                              MonitorEntry::TYPE_INTERNAL));
348 a.arora        1.73  
349 kumpf          1.123     // Start the count at 1 because _entries[0] is the Tickler
350 kumpf          1.116     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
351 mday           1.37      {
352 kumpf          1.126         _entries.append(MonitorEntry());
353 mday           1.37      }
354 mday           1.18  }
355 mday           1.20  
356 mike           1.2   Monitor::~Monitor()
357                      {
358 marek          1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
359 thilo.boehm    1.115                   "returning from monitor destructor");
360                      }
361 mday           1.18  
362 kumpf          1.116 void Monitor::tickle()
363 a.arora        1.73  {
364 kumpf          1.126     Socket::write(_tickler.getClientSocket(), "\0", 1);
365 sushma.fernandes 1.78  }
366                        
367 kumpf            1.121 void Monitor::setState(
368                            Uint32 index,
369 kumpf            1.126     MonitorEntry::Status status)
370 sushma.fernandes 1.78  {
371 kumpf            1.126     AutoMutex autoEntryMutex(_entriesMutex);
372 sushma.fernandes 1.78      // Set the state to requested state
373 kumpf            1.126     _entries[index].status = status;
374 a.arora          1.73  }
375                        
376 kumpf            1.114 void Monitor::run(Uint32 milliseconds)
377 mike             1.2   {
378 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
379 a.arora          1.73  
380 mday             1.25      fd_set fdread;
381                            FD_ZERO(&fdread);
382 a.arora          1.73  
383 kumpf            1.126     AutoMutex autoEntryMutex(_entriesMutex);
384 r.kieninger      1.83  
385 kumpf            1.126     ArrayIterator<MonitorEntry> entries(_entries);
386 mike             1.100 
387 kumpf            1.116     // Check the stopConnections flag.  If set, clear the Acceptor monitor
388                            // entries
389 mike             1.96      if (_stopConnections.get() == 1)
390 kumpf            1.48      {
391 kumpf            1.126         for (Uint32 indx = 0; indx < entries.size(); indx++)
392 kumpf            1.48          {
393 kumpf            1.126             if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
394 kumpf            1.48              {
395 kumpf            1.126                 if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
396 kumpf            1.48                  {
397 kumpf            1.126                     if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
398                                                entries[indx].status == MonitorEntry::STATUS_DYING)
399                                            {
400                                                // remove the entry
401                                                entries[indx].status = MonitorEntry::STATUS_EMPTY;
402                                            }
403                                            else
404                                            {
405                                                // set status to DYING
406                                                entries[indx].status = MonitorEntry::STATUS_DYING;
407                                            }
408                                        }
409                                    }
410 kumpf            1.48          }
411                                _stopConnections = 0;
412 kumpf            1.116         _stopConnectionsSem.signal();
413 kumpf            1.48      }
414 kumpf            1.51  
415 kumpf            1.126     for (Uint32 indx = 0; indx < entries.size(); indx++)
416 kumpf            1.68      {
417 kumpf            1.126         const MonitorEntry& entry = entries[indx];
418                        
419                                if ((entry.status == MonitorEntry::STATUS_DYING) &&
420                                    (entry.type == MonitorEntry::TYPE_CONNECTION))
421 kumpf            1.116         {
422                                    MessageQueue *q = MessageQueue::lookup(entry.queueId);
423                                    PEGASUS_ASSERT(q != 0);
424                                    HTTPConnection &h = *static_cast<HTTPConnection *>(q);
425                        
426                                    if (h._connectionClosePending == false)
427                                        continue;
428                        
429                                    // NOTE: do not attempt to delete while there are pending responses
430                                    // coming thru. The last response to come thru after a
431                                    // _connectionClosePending will reset _responsePending to false
432                                    // and then cause the monitor to rerun this code and clean up.
433                                    // (see HTTPConnection.cpp)
434                        
435                                    if (h._responsePending == true)
436                                    {
437 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
438 kumpf            1.116                     "Monitor::run - Ignoring connection delete request "
439                                                "because responses are still pending. "
440                                                "connection=0x%p, socket=%d\n",
441 marek            1.118                     (void *)&h, h.getSocket()));
442 kumpf            1.116                 continue;
443                                    }
444                                    h._connectionClosePending = false;
445                                    MessageQueue &o = h.get_owner();
446                                    Message* message= new CloseConnectionMessage(entry.socket);
447                                    message->dest = o.getQueueId();
448                        
449                                    // HTTPAcceptor is responsible for closing the connection.
450                                    // The lock is released to allow HTTPAcceptor to call
451                                    // unsolicitSocketMessages to free the entry.
452                                    // Once HTTPAcceptor completes processing of the close
453                                    // connection, the lock is re-requested and processing of
454                                    // the for loop continues.  This is safe with the current
455                                    // implementation of the entries object.  Note that the
456                                    // loop condition accesses the entries.size() on each
457                                    // iteration, so that a change in size while the mutex is
458                                    // unlocked will not result in an ArrayIndexOutOfBounds
459                                    // exception.
460                        
461 kumpf            1.126             _entriesMutex.unlock();
462 kumpf            1.116             o.enqueue(message);
463 kumpf            1.126             _entriesMutex.lock();
464 kumpf            1.116 
465                                    // After enqueue a message and the autoEntryMutex has been
466                                    // released and locked again, the array of _entries can be
467                                    // changed. The ArrayIterator has be reset with the original
468                                    // _entries.
469                                    entries.reset(_entries);
470                                }
471 kumpf            1.68      }
472                        
473 kumpf            1.51      Uint32 _idleEntries = 0;
474 r.kieninger      1.83  
475 a.arora          1.73      /*
476 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
477                                to the kernel as a parameter to SELECT.  This loop seems like a good
478                                place to calculate the max file descriptor (maximum socket number)
479                                because we have to traverse the entire array.
480 r.kieninger      1.83      */
481 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
482 kumpf            1.126     for (Uint32 indx = 0; indx < entries.size(); indx++)
483 mike             1.2       {
484 kumpf            1.126         if (maxSocketCurrentPass < entries[indx].socket)
485                                    maxSocketCurrentPass = entries[indx].socket;
486 a.arora          1.73  
487 kumpf            1.126         if (entries[indx].status == MonitorEntry::STATUS_IDLE)
488                                {
489                                    _idleEntries++;
490                                    FD_SET(entries[indx].socket, &fdread);
491                                }
492 mday             1.13      }
493 s.hills          1.62  
494 a.arora          1.73      /*
495 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
496                                descriptors starting at 0.
497 a.arora          1.73      */
498                            maxSocketCurrentPass++;
499                        
500 kumpf            1.126     _entriesMutex.unlock();
501 david.dillard    1.95  
502                            //
503                            // The first argument to select() is ignored on Windows and it is not
504                            // a socket value.  The original code assumed that the number of sockets
505                            // and a socket value have the same type.  On Windows they do not.
506                            //
507                        #ifdef PEGASUS_OS_TYPE_WINDOWS
508                            int events = select(0, &fdread, NULL, NULL, &tv);
509                        #else
510 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
511 david.dillard    1.95  #endif
512 kumpf            1.126     _entriesMutex.lock();
513 kumpf            1.116 
514 dave.sudlik      1.128     struct timeval timeNow;
515                            Time::gettimeofday(&timeNow);
516                        
517 kumpf            1.116     // After enqueue a message and the autoEntryMutex has been released and
518                            // locked again, the array of _entries can be changed. The ArrayIterator
519                            // has be reset with the original _entries
520 r.kieninger      1.102     entries.reset(_entries);
521 mike             1.106 
522                            if (events == PEGASUS_SOCKET_ERROR)
523 mday             1.13      {
524 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
525                                    "Monitor::run - errorno = %d has occurred on select.", errno));
526 kumpf            1.116         // The EBADF error indicates that one or more or the file
527                                // descriptions was not valid. This could indicate that
528                                // the entries structure has been corrupted or that
529                                // we have a synchronization error.
530 kumpf            1.50  
531 kumpf            1.116         PEGASUS_ASSERT(errno != EBADF);
532 kumpf            1.50      }
533                            else if (events)
534                            {
535 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
536 kumpf            1.116             "Monitor::run select event received events = %d, monitoring %d "
537                                        "idle entries",
538 marek            1.118             events, _idleEntries));
539 kumpf            1.126         for (Uint32 indx = 0; indx < entries.size(); indx++)
540 kumpf            1.116         {
541                                    // The Monitor should only look at entries in the table that are
542                                    // IDLE (i.e., owned by the Monitor).
543 kumpf            1.126             if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
544 kumpf            1.116                 (FD_ISSET(entries[indx].socket, &fdread)))
545                                    {
546 kumpf            1.126                 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
547                                        PEGASUS_ASSERT(q != 0);
548 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
549 kumpf            1.126                     "Monitor::run indx = %d, queueId = %d, q = %p",
550 marek            1.118                     indx, entries[indx].queueId, q));
551 kumpf            1.116 
552                                        try
553 david.dillard    1.95                  {
554 kumpf            1.126                     if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
555 kumpf            1.116                     {
556 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
557 kumpf            1.126                             "entries[%d].type is TYPE_CONNECTION",
558 marek            1.118                             indx));
559 kumpf            1.116 
560 dave.sudlik      1.128                         HTTPConnection *dst = 
561 kumpf            1.116                             reinterpret_cast<HTTPConnection *>(q);
562 dave.sudlik      1.128                         dst->_entry_index = indx;
563 kumpf            1.116 
564 dave.sudlik      1.128                         // Update idle start time because we have received some
565                                                // data. Any data is good data at this point, and we'll
566                                                // keep the connection alive, even if we've exceeded
567                                                // the idleConnectionTimeout, which will be checked
568                                                // when we call closeConnectionOnTimeout() next.
569                                                Time::gettimeofday(&dst->_idleStartTime);
570                        
571                                                // Check for accept pending (ie. SSL handshake pending)
572                                                // or idle connection timeouts for sockets from which
573                                                // we received data (avoiding extra queue lookup below).
574                                                if (!dst->closeConnectionOnTimeout(&timeNow))
575 kumpf            1.116                         {
576 dave.sudlik      1.128                             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
577                                                        "Entering HTTPConnection::run() for "
578                                                            "indx = %d, queueId = %d, q = %p",
579                                                        indx, entries[indx].queueId, q));
580                        
581                                                    try
582                                                    {
583                                                        dst->run(1);
584                                                    }
585                                                    catch (...)
586                                                    {
587                                                        PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
588                                                            "Caught exception from "
589                                                            "HTTPConnection::run()");
590                                                    }
591                                                    PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
592                                                        "Exited HTTPConnection::run()");
593                                                    }
594                        
595                        
596 kumpf            1.116                     }
597 kumpf            1.126                     else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
598 kumpf            1.116                     {
599 kumpf            1.126                         char buffer;
600                                                Sint32 ignored =
601                                                    Socket::read(entries[indx].socket, &buffer, 1);
602 kumpf            1.116                     }
603                                            else
604                                            {
605 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
606 kumpf            1.116                             "Non-connection entry, indx = %d, has been "
607                                                        "received.",
608 marek            1.118                             indx));
609 kumpf            1.116                         int events = 0;
610                                                events |= SocketMessage::READ;
611                                                Message* msg = new SocketMessage(
612                                                    entries[indx].socket, events);
613 kumpf            1.126                         entries[indx].status = MonitorEntry::STATUS_BUSY;
614                                                _entriesMutex.unlock();
615 kumpf            1.116                         q->enqueue(msg);
616 kumpf            1.126                         _entriesMutex.lock();
617 kumpf            1.116 
618                                                // After enqueue a message and the autoEntryMutex has
619                                                // been released and locked again, the array of
620 kumpf            1.126                         // entries can be changed. The ArrayIterator has to be
621                                                // reset with the latest _entries.
622 kumpf            1.116                         entries.reset(_entries);
623 kumpf            1.126                         entries[indx].status = MonitorEntry::STATUS_IDLE;
624 kumpf            1.116                     }
625                                        }
626                                        catch (...)
627                                        {
628                                        }
629 thilo.boehm      1.115             }
630 dave.sudlik      1.128             // else check for accept pending (ie. SSL handshake pending) or
631                                    // idle connection timeouts for sockets from which we did not
632                                    // receive data.
633                                    else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
634                                        entries[indx].type == MonitorEntry::TYPE_CONNECTION)
635                        
636                                    {
637                                        MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
638                                        HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
639                                        dst->_entry_index = indx;
640                                        dst->closeConnectionOnTimeout(&timeNow);
641                                    }
642                                }
643                            }
644                            // else if "events" is zero (ie. select timed out) then we still need
645                            // to check if there are any pending SSL handshakes that have timed out.
646                            else
647                            {
648                                for (Uint32 indx = 0; indx < entries.size(); indx++)
649                                {
650                                    if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
651 dave.sudlik      1.128                 entries[indx].type == MonitorEntry::TYPE_CONNECTION)
652                                    {
653                                        MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
654                                        HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
655                                        dst->_entry_index = indx;
656                                        dst->closeConnectionOnTimeout(&timeNow);
657                                    }
658 kumpf            1.116         }
659 mday             1.24      }
660 mike             1.2   }
661                        
662 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
663 kumpf            1.48  {
664                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
665 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
666 kumpf            1.48      _stopConnections = 1;
667 a.arora          1.73      tickle();
668 kumpf            1.48  
669 chuck            1.74      if (wait)
670 a.arora          1.73      {
671 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
672                              // caller of this function may unbind the ports while the monitor
673                              // is still accepting connections on them.
674 kumpf            1.101       _stopConnectionsSem.wait();
675 a.arora          1.73      }
676 r.kieninger      1.83  
677 kumpf            1.48      PEG_METHOD_EXIT();
678                        }
679 mday             1.25  
680 mday             1.37  
681 kumpf            1.116 int Monitor::solicitSocketMessages(
682 mike             1.106     SocketHandle socket,
683 mike             1.2       Uint32 events,
684 r.kieninger      1.83      Uint32 queueId,
685 kumpf            1.126     Uint32 type)
686 mike             1.2   {
687 kumpf            1.116     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
688 kumpf            1.126     AutoMutex autoMut(_entriesMutex);
689                        
690 kumpf            1.116     // Check to see if we need to dynamically grow the _entries array
691 kumpf            1.126     // We always want the _entries array to be 2 bigger than the
692 kumpf            1.116     // current connections requested
693                            _solicitSocketCount++;  // bump the count
694 kumpf            1.126 
695                            for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
696 kumpf            1.116     {
697 kumpf            1.126         _entries.append(MonitorEntry());
698 kumpf            1.116     }
699 a.arora          1.73  
700 kumpf            1.126     for (Uint32 index = 1; index < _entries.size(); index++)
701 kumpf            1.116     {
702                                try
703                                {
704 kumpf            1.126             if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
705 kumpf            1.116             {
706                                        _entries[index].socket = socket;
707                                        _entries[index].queueId  = queueId;
708 kumpf            1.126                 _entries[index].type = type;
709                                        _entries[index].status = MonitorEntry::STATUS_IDLE;
710 kumpf            1.116 
711 kumpf            1.126                 return (int)index;
712 kumpf            1.116             }
713                                }
714                                catch (...)
715                                {
716                                }
717                            }
718                            // decrease the count, if we are here we didn't do anything meaningful
719                            _solicitSocketCount--;
720                            PEG_METHOD_EXIT();
721                            return -1;
722 mike             1.2   }
723                        
724 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
725 mike             1.2   {
726 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
727 kumpf            1.126     AutoMutex autoMut(_entriesMutex);
728 a.arora          1.73  
729                            /*
730 kumpf            1.116         Start at index = 1 because _entries[0] is the tickle entry which
731 kumpf            1.126         never needs to be reset to EMPTY;
732 a.arora          1.73      */
733 kumpf            1.126     for (Uint32 index = 1; index < _entries.size(); index++)
734 mike             1.2       {
735 kumpf            1.116         if (_entries[index].socket == socket)
736                                {
737 kumpf            1.126             _entries[index].reset();
738 kumpf            1.116             _solicitSocketCount--;
739                                    break;
740                                }
741 mike             1.2       }
742 a.arora          1.73  
743                            /*
744 kumpf            1.116         Dynamic Contraction:
745                                To remove excess entries we will start from the end of the _entries
746                                array and remove all entries with EMPTY status until we find the
747                                first NON EMPTY.  This prevents the positions, of the NON EMPTY
748                                entries, from being changed.
749 r.kieninger      1.83      */
750 kumpf            1.126     for (Uint32 index = _entries.size() - 1;
751                                 (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
752                                     (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
753                                 index--)
754 kumpf            1.116     {
755 kumpf            1.126         _entries.remove(index);
756 a.arora          1.73      }
757 kumpf            1.126 
758 kumpf            1.4       PEG_METHOD_EXIT();
759 mike             1.2   }
760                        
761                        PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2