(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             //%/////////////////////////////////////////////////////////////////////////////
 33             
 34 mike  1.107 #include "Network.h"
 35 mike  1.2   #include <Pegasus/Common/Config.h>
 36             #include <cstring>
 37             #include "Monitor.h"
 38             #include "MessageQueue.h"
 39             #include "Socket.h"
 40 kumpf 1.4   #include <Pegasus/Common/Tracer.h>
 41 mday  1.7   #include <Pegasus/Common/HTTPConnection.h>
 42 kumpf 1.69  #include <Pegasus/Common/MessageQueueService.h>
 43 a.arora 1.73  #include <Pegasus/Common/Exception.h>
 44 mike    1.100 #include "ArrayIterator.h"
 45 kumpf   1.122 #include "HostAddress.h"
 46 mike    1.113 #include <errno.h>
 47 mike    1.2   
 48               PEGASUS_USING_STD;
 49               
 50               PEGASUS_NAMESPACE_BEGIN
 51               
 52 mike    1.96  static AtomicInt _connections(0);
 53 mday    1.25  
 54 mike    1.2   ////////////////////////////////////////////////////////////////////////////////
 55               //
 56 kumpf   1.122 // Tickler
 57               //
 58               ////////////////////////////////////////////////////////////////////////////////
 59               
 60               Tickler::Tickler()
 61                   : _listenSocket(PEGASUS_INVALID_SOCKET),
 62                     _clientSocket(PEGASUS_INVALID_SOCKET),
 63                     _serverSocket(PEGASUS_INVALID_SOCKET)
 64               {
 65               }
 66               
 67               Tickler::~Tickler()
 68               {
 69                   uninitialize();
 70               }
 71               
 72               Boolean Tickler::initialize()
 73               {
 74                   //
 75                   // Set up the addresses for the listen, client, and server sockets
 76                   // based on whether IPv6 is enabled.
 77 kumpf   1.122     //
 78               
 79               #ifdef PEGASUS_ENABLE_IPV6
 80                   struct sockaddr_storage listenAddress;
 81                   struct sockaddr_storage clientAddress;
 82                   struct sockaddr_storage serverAddress;
 83               #else
 84                   struct sockaddr_in listenAddress;
 85                   struct sockaddr_in clientAddress;
 86                   struct sockaddr_in serverAddress;
 87               #endif
 88               
 89                   int addressFamily;
 90                   SocketLength addressLength;
 91               
 92               #ifdef PEGASUS_ENABLE_IPV6
 93                   if (System::isIPv6StackActive())
 94                   {
 95                       // Use the IPv6 loopback address for the listen sockets
 96                       HostAddress::convertTextToBinary(
 97                           HostAddress::AT_IPV6,
 98 kumpf   1.122             "::1",
 99                           &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
100                       listenAddress.ss_family = AF_INET6;
101                       reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
102               
103                       addressFamily = AF_INET6;
104                       addressLength = sizeof(struct sockaddr_in6);
105                   }
106                   else
107               #endif
108                   {
109                       // Use the IPv4 loopback address for the listen sockets
110                       HostAddress::convertTextToBinary(
111                           HostAddress::AT_IPV4,
112                           "127.0.0.1",
113                           &reinterpret_cast<struct sockaddr_in*>(
114                               &listenAddress)->sin_addr.s_addr);
115                       reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
116                           AF_INET;
117                       reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
118               
119 kumpf   1.122         addressFamily = AF_INET;
120                       addressLength = sizeof(struct sockaddr_in);
121                   }
122               
123                   // Use the same address for the client socket as the listen socket
124                   clientAddress = listenAddress;
125               
126                   //
127                   // Set up a listen socket to allow the tickle client and server to connect
128                   //
129               
130                   // Create the listen socket
131                   if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
132                            PEGASUS_INVALID_SOCKET)
133                   {
134                       MessageLoaderParms parms(
135                           "Common.Monitor.TICKLE_CREATE",
136                           "Received error number $0 while creating the internal socket.",
137                           getSocketError());
138                       throw Exception(parms);
139                   }
140 kumpf   1.122 
141                   // Bind the listen socket to the loopback address
142                   if (::bind(
143                           _listenSocket,
144                           reinterpret_cast<struct sockaddr*>(&listenAddress),
145                           addressLength) < 0)
146                   {
147               #ifdef PEGASUS_OS_ZOS
148                       MessageLoaderParms parms(
149                           "Common.Monitor.TICKLE_BIND_LONG",
150                           "Received error:$0 while binding the internal socket.",
151                           strerror(errno));
152               #else
153                       MessageLoaderParms parms(
154                           "Common.Monitor.TICKLE_BIND",
155                           "Received error number $0 while binding the internal socket.",
156                           getSocketError());
157               #endif
158                       throw Exception(parms);
159                   }
160               
161 kumpf   1.122     // Listen for a connection from the tickle client
162                   if ((::listen(_listenSocket, 3)) < 0)
163                   {
164                       MessageLoaderParms parms(
165                           "Common.Monitor.TICKLE_LISTEN",
166                           "Received error number $0 while listening to the internal socket.",
167                           getSocketError());
168                       throw Exception(parms);
169                   }
170               
171                   // Verify we have the correct listen socket
172                   SocketLength tmpAddressLength = addressLength;
173                   int sock = ::getsockname(
174                       _listenSocket,
175                       reinterpret_cast<struct sockaddr*>(&listenAddress),
176                       &tmpAddressLength);
177                   if (sock < 0)
178                   {
179                       MessageLoaderParms parms(
180                           "Common.Monitor.TICKLE_SOCKNAME",
181                           "Received error number $0 while getting the internal socket name.",
182 kumpf   1.122             getSocketError());
183                       throw Exception(parms);
184                   }
185               
186                   //
187                   // Set up the client side of the tickle connection.
188                   //
189               
190                   // Create the client socket
191                   if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
192                            PEGASUS_INVALID_SOCKET)
193                   {
194                       MessageLoaderParms parms(
195                           "Common.Monitor.TICKLE_CLIENT_CREATE",
196                           "Received error number $0 while creating the internal client "
197                               "socket.",
198                           getSocketError());
199                       throw Exception(parms);
200                   }
201               
202                   // Bind the client socket to the loopback address
203 kumpf   1.122     if (::bind(
204                           _clientSocket,
205                           reinterpret_cast<struct sockaddr*>(&clientAddress),
206                           addressLength) < 0)
207                   {
208                       MessageLoaderParms parms(
209                           "Common.Monitor.TICKLE_CLIENT_BIND",
210                           "Received error number $0 while binding the internal client "
211                               "socket.",
212                           getSocketError());
213                       throw Exception(parms);
214                   }
215               
216                   // Connect the client socket to the listen socket address
217                   if (::connect(
218                           _clientSocket,
219                           reinterpret_cast<struct sockaddr*>(&listenAddress),
220                           addressLength) < 0)
221                   {
222                       MessageLoaderParms parms(
223                           "Common.Monitor.TICKLE_CLIENT_CONNECT",
224 kumpf   1.122             "Received error number $0 while connecting the internal client "
225                               "socket.",
226                           getSocketError());
227                       throw Exception(parms);
228                   }
229               
230                   //
231                   // Set up the server side of the tickle connection.
232                   //
233               
234                   tmpAddressLength = addressLength;
235               
236                   // Accept the client socket connection.  The accept call may fail with
237                   // EAGAIN.  Try a max of 20 times to establish this connection.
238                   unsigned int retries = 0;
239                   do
240                   {
241                       _serverSocket = ::accept(
242                           _listenSocket,
243                           reinterpret_cast<struct sockaddr*>(&serverAddress),
244                           &tmpAddressLength);
245 kumpf   1.122 
246                       if ((_serverSocket != PEGASUS_SOCKET_ERROR) ||
247                           (getSocketError() != PEGASUS_NETWORK_TRYAGAIN))
248                       {
249                           break;
250                       }
251               
252                       Threads::sleep(1);
253                       retries++;
254                   } while (retries <= 20);
255               
256                   if (_serverSocket == PEGASUS_SOCKET_ERROR)
257                   {
258                       if (getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
259                       {
260                           // TCP/IP is down
261                           uninitialize();
262                           return false;
263                       }
264               
265                       MessageLoaderParms parms(
266 kumpf   1.122             "Common.Monitor.TICKLE_ACCEPT",
267                           "Received error number $0 while accepting the internal socket "
268                               "connection.",
269                           getSocketError());
270                       throw Exception(parms);
271                   }
272               
273                   //
274                   // Close the listen socket and make the other sockets non-blocking
275                   //
276               
277                   Socket::close(_listenSocket);
278                   _listenSocket = PEGASUS_INVALID_SOCKET;
279               
280                   Socket::disableBlocking(_serverSocket);
281                   Socket::disableBlocking(_clientSocket);
282                   return true;
283               }
284               
285               void Tickler::uninitialize()
286               {
287 kumpf   1.122     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
288               
289                   try
290                   {
291                       if (_serverSocket != PEGASUS_INVALID_SOCKET)
292                       {
293                           Socket::close(_serverSocket);
294                           _serverSocket = PEGASUS_INVALID_SOCKET;
295                       }
296                       if (_clientSocket != PEGASUS_INVALID_SOCKET)
297                       {
298                           Socket::close(_clientSocket);
299                           _clientSocket = PEGASUS_INVALID_SOCKET;
300                       }
301                       if (_listenSocket != PEGASUS_INVALID_SOCKET)
302                       {
303                           Socket::close(_listenSocket);
304                           _listenSocket = PEGASUS_INVALID_SOCKET;
305                       }
306                   }
307                   catch (...)
308 kumpf   1.122     {
309                       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
310                           "Failed to close tickle sockets");
311                   }
312               }
313               
314               
315               ////////////////////////////////////////////////////////////////////////////////
316               //
317 mike    1.2   // Monitor
318               //
319               ////////////////////////////////////////////////////////////////////////////////
320               
321 kumpf   1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
322 mike    1.2   Monitor::Monitor()
323 kumpf   1.87     : _stopConnections(0),
324 a.arora 1.73       _stopConnectionsSem(0),
325 kumpf   1.122      _solicitSocketCount(0)
326 mike    1.2   {
327 kumpf   1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
328 mike    1.2       Socket::initializeInterface();
329 kumpf   1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
330 a.arora 1.73  
331                   // setup the tickler
332                   initializeTickler();
333               
334 r.kieninger 1.83      // Start the count at 1 because initilizeTickler()
335 a.arora     1.73      // has added an entry in the first position of the
336                       // _entries array
337 kumpf       1.116     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
338 mday        1.37      {
339                          _MonitorEntry entry(0, 0, 0);
340                          _entries.append(entry);
341                       }
342 mday        1.18  }
343 mday        1.20  
344 mike        1.2   Monitor::~Monitor()
345                   {
346 kumpf       1.122     _tickler.uninitialize();
347 thilo.boehm 1.115     Socket::uninitializeInterface();
348 marek       1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
349 thilo.boehm 1.115                   "returning from monitor destructor");
350                   }
351 mday        1.18  
352 kumpf       1.116 void Monitor::initializeTickler()
353                   {
354 kumpf       1.122     while (!_tickler.initialize())
355 thilo.boehm 1.115     {
356 kumpf       1.122         // Retry until TCP/IP is started
357                       }
358 thilo.boehm 1.115 
359 kumpf       1.122     // Create a MonitorEntry for the tickler and set its state to IDLE so the
360                       // Monitor will watch for its events.
361                       _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);
362 a.arora     1.73      entry._status = _MonitorEntry::IDLE;
363 thilo.boehm 1.115 
364 kumpf       1.122     if (_entries.size() == 0)
365 thilo.boehm 1.115     {
366 kumpf       1.122         // The tickler has not been initialized before; add its entry at the
367                           // beginning of the list.
368                           _entries.append(entry);
369 thilo.boehm 1.115     }
370                       else
371                       {
372 kumpf       1.122         // Overwrite the existing tickler entry.
373                           _entries[0] = entry;
374 thilo.boehm 1.115     }
375 a.arora     1.73  }
376                   
377 kumpf       1.116 void Monitor::tickle()
378 a.arora     1.73  {
379 kumpf       1.122     AutoMutex autoMutex(_tickleMutex);
380                       Socket::write(_tickler.getClientSocket(), "\0\0", 2);
381 sushma.fernandes 1.78  }
382                        
383 kumpf            1.121 void Monitor::setState(
384                            Uint32 index,
385                            _MonitorEntry::entry_status status)
386 sushma.fernandes 1.78  {
387 kumpf            1.121     AutoMutex autoEntryMutex(_entry_mut);
388 sushma.fernandes 1.78      // Set the state to requested state
389                            _entries[index]._status = status;
390 a.arora          1.73  }
391                        
392 kumpf            1.114 void Monitor::run(Uint32 milliseconds)
393 mike             1.2   {
394 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
395 a.arora          1.73  
396 mday             1.25      fd_set fdread;
397                            FD_ZERO(&fdread);
398 a.arora          1.73  
399 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
400 r.kieninger      1.83  
401 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
402                        
403 kumpf            1.116     // Check the stopConnections flag.  If set, clear the Acceptor monitor
404                            // entries
405 mike             1.96      if (_stopConnections.get() == 1)
406 kumpf            1.48      {
407 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
408 kumpf            1.48          {
409 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
410 kumpf            1.48              {
411 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
412 kumpf            1.48                  {
413 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
414                                                entries[indx]._status.get() == _MonitorEntry::DYING )
415 kumpf            1.48                     {
416                                               // remove the entry
417 kumpf            1.116                        entries[indx]._status = _MonitorEntry::EMPTY;
418 kumpf            1.48                     }
419                                           else
420                                           {
421                                               // set status to DYING
422 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
423 kumpf            1.48                     }
424                                       }
425                                   }
426                                }
427                                _stopConnections = 0;
428 kumpf            1.116         _stopConnectionsSem.signal();
429 kumpf            1.48      }
430 kumpf            1.51  
431 kumpf            1.116     for (int indx = 0; indx < (int)entries.size(); indx++)
432 kumpf            1.68      {
433 kumpf            1.116         const _MonitorEntry &entry = entries[indx];
434                                if ((entry._status.get() == _MonitorEntry::DYING) &&
435                                    (entry._type == Monitor::CONNECTION))
436                                {
437                                    MessageQueue *q = MessageQueue::lookup(entry.queueId);
438                                    PEGASUS_ASSERT(q != 0);
439                                    HTTPConnection &h = *static_cast<HTTPConnection *>(q);
440                        
441                                    if (h._connectionClosePending == false)
442                                        continue;
443                        
444                                    // NOTE: do not attempt to delete while there are pending responses
445                                    // coming thru. The last response to come thru after a
446                                    // _connectionClosePending will reset _responsePending to false
447                                    // and then cause the monitor to rerun this code and clean up.
448                                    // (see HTTPConnection.cpp)
449                        
450                                    if (h._responsePending == true)
451                                    {
452 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
453 kumpf            1.116                     "Monitor::run - Ignoring connection delete request "
454                                                "because responses are still pending. "
455                                                "connection=0x%p, socket=%d\n",
456 marek            1.118                     (void *)&h, h.getSocket()));
457 kumpf            1.116                 continue;
458                                    }
459                                    h._connectionClosePending = false;
460                                    MessageQueue &o = h.get_owner();
461                                    Message* message= new CloseConnectionMessage(entry.socket);
462                                    message->dest = o.getQueueId();
463                        
464                                    // HTTPAcceptor is responsible for closing the connection.
465                                    // The lock is released to allow HTTPAcceptor to call
466                                    // unsolicitSocketMessages to free the entry.
467                                    // Once HTTPAcceptor completes processing of the close
468                                    // connection, the lock is re-requested and processing of
469                                    // the for loop continues.  This is safe with the current
470                                    // implementation of the entries object.  Note that the
471                                    // loop condition accesses the entries.size() on each
472                                    // iteration, so that a change in size while the mutex is
473                                    // unlocked will not result in an ArrayIndexOutOfBounds
474                                    // exception.
475                        
476                                    _entry_mut.unlock();
477                                    o.enqueue(message);
478 kumpf            1.116             _entry_mut.lock();
479                        
480                                    // After enqueue a message and the autoEntryMutex has been
481                                    // released and locked again, the array of _entries can be
482                                    // changed. The ArrayIterator has be reset with the original
483                                    // _entries.
484                                    entries.reset(_entries);
485                                }
486 kumpf            1.68      }
487                        
488 kumpf            1.51      Uint32 _idleEntries = 0;
489 r.kieninger      1.83  
490 a.arora          1.73      /*
491 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
492                                to the kernel as a parameter to SELECT.  This loop seems like a good
493                                place to calculate the max file descriptor (maximum socket number)
494                                because we have to traverse the entire array.
495 r.kieninger      1.83      */
496 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
497 kumpf            1.116     for (int indx = 0; indx < (int)entries.size(); indx++)
498 mike             1.2       {
499 kumpf            1.116        if (maxSocketCurrentPass < entries[indx].socket)
500                                   maxSocketCurrentPass = entries[indx].socket;
501 a.arora          1.73  
502 kumpf            1.116        if (entries[indx]._status.get() == _MonitorEntry::IDLE)
503 mday             1.25         {
504 david.dillard    1.95             _idleEntries++;
505 mike             1.100            FD_SET(entries[indx].socket, &fdread);
506 mday             1.25         }
507 mday             1.13      }
508 s.hills          1.62  
509 a.arora          1.73      /*
510 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
511                                descriptors starting at 0.
512 a.arora          1.73      */
513                            maxSocketCurrentPass++;
514                        
515 mike             1.112     _entry_mut.unlock();
516 david.dillard    1.95  
517                            //
518                            // The first argument to select() is ignored on Windows and it is not
519                            // a socket value.  The original code assumed that the number of sockets
520                            // and a socket value have the same type.  On Windows they do not.
521                            //
522                        #ifdef PEGASUS_OS_TYPE_WINDOWS
523                            int events = select(0, &fdread, NULL, NULL, &tv);
524                        #else
525 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
526 david.dillard    1.95  #endif
527 mike             1.112     _entry_mut.lock();
528 kumpf            1.116 
529                            // After enqueue a message and the autoEntryMutex has been released and
530                            // locked again, the array of _entries can be changed. The ArrayIterator
531                            // has be reset with the original _entries
532 r.kieninger      1.102     entries.reset(_entries);
533 mike             1.106 
534                            if (events == PEGASUS_SOCKET_ERROR)
535 mday             1.13      {
536 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
537                                    "Monitor::run - errorno = %d has occurred on select.", errno));
538 kumpf            1.116         // The EBADF error indicates that one or more or the file
539                                // descriptions was not valid. This could indicate that
540                                // the entries structure has been corrupted or that
541                                // we have a synchronization error.
542 kumpf            1.50  
543 kumpf            1.116         PEGASUS_ASSERT(errno != EBADF);
544 kumpf            1.50      }
545                            else if (events)
546                            {
547 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
548 kumpf            1.116             "Monitor::run select event received events = %d, monitoring %d "
549                                        "idle entries",
550 marek            1.118             events, _idleEntries));
551 kumpf            1.116         for (int indx = 0; indx < (int)entries.size(); indx++)
552                                {
553                                    // The Monitor should only look at entries in the table that are
554                                    // IDLE (i.e., owned by the Monitor).
555                                    if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
556                                        (FD_ISSET(entries[indx].socket, &fdread)))
557                                    {
558                                        MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
559 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
560 kumpf            1.116                     "Monitor::run indx = %d, queueId =  %d, q = %p",
561 marek            1.118                     indx, entries[indx].queueId, q));
562 kumpf            1.116                 PEGASUS_ASSERT(q !=0);
563                        
564                                        try
565 david.dillard    1.95                  {
566 kumpf            1.116                     if (entries[indx]._type == Monitor::CONNECTION)
567                                            {
568 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
569 kumpf            1.116                             "entries[indx].type for indx = %d is "
570                                                        "Monitor::CONNECTION",
571 marek            1.118                             indx));
572 kumpf            1.116                         static_cast<HTTPConnection *>(q)->_entry_index = indx;
573                        
574                                                // Do not update the entry just yet. The entry gets
575                                                // updated once the request has been read.
576                                                //entries[indx]._status = _MonitorEntry::BUSY;
577 sushma.fernandes 1.78  
578 kumpf            1.116                         // If allocate_and_awaken failure, retry on next
579                                                // iteration
580 a.arora          1.73  /* Removed for PEP 183.
581 kumpf            1.116                         if (!MessageQueueService::get_thread_pool()->
582                                                        allocate_and_awaken((void *)q, _dispatch))
583                                                {
584 kumpf            1.119                             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
585                                                        Tracer::LEVEL2,
586 kumpf            1.116                                 "Monitor::run: Insufficient resources to "
587                                                            "process request.");
588                                                    entries[indx]._status = _MonitorEntry::IDLE;
589                                                    return true;
590                                                }
591 a.arora          1.73  */
592                        // Added for PEP 183
593 kumpf            1.116                         HTTPConnection *dst =
594                                                    reinterpret_cast<HTTPConnection *>(q);
595 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
596 kumpf            1.116                             "Monitor::_dispatch: entering run() for "
597                                                        "indx = %d, queueId = %d, q = %p",
598                                                    dst->_entry_index,
599                                                    dst->_monitor->_entries[dst->_entry_index].queueId,
600 marek            1.118                             dst));
601 kumpf            1.116 
602                                                try
603                                                {
604                                                    dst->run(1);
605                                                }
606                                                catch (...)
607                                                {
608 marek            1.118                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
609 kumpf            1.116                                 "Monitor::_dispatch: exception received");
610                                                }
611 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
612 kumpf            1.116                             "Monitor::_dispatch: exited run() for index %d",
613 marek            1.118                             dst->_entry_index));
614 kumpf            1.116 
615                                                // It is possible the entry status may not be set to
616                                                // busy.  The following will fail in that case.
617                                                // PEGASUS_ASSERT(dst->_monitor->_entries[
618                                                //     dst->_entry_index]._status.get() ==
619                                                //    _MonitorEntry::BUSY);
620                                                // Once the HTTPConnection thread has set the status
621                                                // value to either Monitor::DYING or Monitor::IDLE,
622                                                // it has returned control of the connection to the
623                                                // Monitor.  It is no longer permissible to access
624                                                // the connection or the entry in the _entries table.
625                        
626                                                // The following is not relevant as the worker thread
627                                                // or the reader thread will update the status of the
628                                                // entry.
629                                                //if (dst->_connectionClosePending)
630                                                //{
631                                                //  dst->_monitor->_entries[dst->_entry_index]._status =
632                                                //    _MonitorEntry::DYING;
633                                                //}
634                                                //else
635 kumpf            1.116                         //{
636                                                //  dst->_monitor->_entries[dst->_entry_index]._status =
637                                                //    _MonitorEntry::IDLE;
638                                                //}
639 r.kieninger      1.83  // end Added for PEP 183
640 kumpf            1.116                     }
641                                            else if (entries[indx]._type == Monitor::INTERNAL)
642                                            {
643                                                // set ourself to BUSY,
644 r.kieninger      1.83                          // read the data
645 a.arora          1.73                          // and set ourself back to IDLE
646 r.kieninger      1.83  
647 kumpf            1.116                         entries[indx]._status = _MonitorEntry::BUSY;
648                                                static char buffer[2];
649                                                Sint32 amt =
650                                                    Socket::read(entries[indx].socket,&buffer, 2);
651                        
652                                                if (amt == PEGASUS_SOCKET_ERROR &&
653                                                    getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
654                                                {
655 marek            1.118                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
656 kumpf            1.116                                 "Monitor::run: Tickler socket got an IO error. "
657                                                            "Going to re-create Socket and wait for "
658                                                            "TCP/IP restart.");
659 kumpf            1.122                             _tickler.uninitialize();
660 kumpf            1.116                             initializeTickler();
661                                                }
662                                                else
663                                                {
664                                                    entries[indx]._status = _MonitorEntry::IDLE;
665                                                }
666                                            }
667                                            else
668                                            {
669 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
670 kumpf            1.116                             "Non-connection entry, indx = %d, has been "
671                                                        "received.",
672 marek            1.118                             indx));
673 kumpf            1.116                         int events = 0;
674                                                events |= SocketMessage::READ;
675                                                Message* msg = new SocketMessage(
676                                                    entries[indx].socket, events);
677                                                entries[indx]._status = _MonitorEntry::BUSY;
678                                                _entry_mut.unlock();
679                                                q->enqueue(msg);
680                                                _entry_mut.lock();
681                        
682                                                // After enqueue a message and the autoEntryMutex has
683                                                // been released and locked again, the array of
684                                                // entries can be changed. The ArrayIterator has be
685                                                // reset with the original _entries
686                                                entries.reset(_entries);
687                                                entries[indx]._status = _MonitorEntry::IDLE;
688                                            }
689                                        }
690                                        catch (...)
691                                        {
692                                        }
693 thilo.boehm      1.115             }
694 kumpf            1.116         }
695 mday             1.24      }
696 mike             1.2   }
697                        
698 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
699 kumpf            1.48  {
700                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
701 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
702 kumpf            1.48      _stopConnections = 1;
703 a.arora          1.73      tickle();
704 kumpf            1.48  
705 chuck            1.74      if (wait)
706 a.arora          1.73      {
707 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
708                              // caller of this function may unbind the ports while the monitor
709                              // is still accepting connections on them.
710 kumpf            1.101       _stopConnectionsSem.wait();
711 a.arora          1.73      }
712 r.kieninger      1.83  
713 kumpf            1.48      PEG_METHOD_EXIT();
714                        }
715 mday             1.25  
716 mday             1.37  
717 kumpf            1.116 int Monitor::solicitSocketMessages(
718 mike             1.106     SocketHandle socket,
719 mike             1.2       Uint32 events,
720 r.kieninger      1.83      Uint32 queueId,
721 mday             1.8       int type)
722 mike             1.2   {
723 kumpf            1.116     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
724                            AutoMutex autoMut(_entry_mut);
725                            // Check to see if we need to dynamically grow the _entries array
726                            // We always want the _entries array to 2 bigger than the
727                            // current connections requested
728                            _solicitSocketCount++;  // bump the count
729                            int size = (int)_entries.size();
730                            if ((int)_solicitSocketCount >= (size-1))
731                            {
732                                for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
733                                {
734                                    _MonitorEntry entry(0, 0, 0);
735                                    _entries.append(entry);
736                                }
737                            }
738 a.arora          1.73  
739 kumpf            1.116     int index;
740                            for (index = 1; index < (int)_entries.size(); index++)
741                            {
742                                try
743                                {
744                                    if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
745                                    {
746                                        _entries[index].socket = socket;
747                                        _entries[index].queueId  = queueId;
748                                        _entries[index]._type = type;
749                                        _entries[index]._status = _MonitorEntry::IDLE;
750                        
751                                        return index;
752                                    }
753                                }
754                                catch (...)
755                                {
756                                }
757                            }
758                            // decrease the count, if we are here we didn't do anything meaningful
759                            _solicitSocketCount--;
760 kumpf            1.116     PEG_METHOD_EXIT();
761                            return -1;
762 mike             1.2   }
763                        
764 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
765 mike             1.2   {
766 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
767 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
768 a.arora          1.73  
769                            /*
770 kumpf            1.116         Start at index = 1 because _entries[0] is the tickle entry which
771                                never needs to be EMPTY;
772 a.arora          1.73      */
773 w.otsuka         1.89      unsigned int index;
774 kumpf            1.116     for (index = 1; index < _entries.size(); index++)
775 mike             1.2       {
776 kumpf            1.116         if (_entries[index].socket == socket)
777                                {
778                                    _entries[index]._status = _MonitorEntry::EMPTY;
779                                    _entries[index].socket = PEGASUS_INVALID_SOCKET;
780                                    _solicitSocketCount--;
781                                    break;
782                                }
783 mike             1.2       }
784 a.arora          1.73  
785                            /*
786 kumpf            1.116         Dynamic Contraction:
787                                To remove excess entries we will start from the end of the _entries
788                                array and remove all entries with EMPTY status until we find the
789                                first NON EMPTY.  This prevents the positions, of the NON EMPTY
790                                entries, from being changed.
791 r.kieninger      1.83      */
792 a.arora          1.73      index = _entries.size() - 1;
793 kumpf            1.116     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
794                            {
795                                if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
796 a.arora          1.73                  _entries.remove(index);
797 kumpf            1.116         index--;
798 a.arora          1.73      }
799 kumpf            1.4       PEG_METHOD_EXIT();
800 mike             1.2   }
801                        
802 a.arora          1.73  // Note: this is no longer called with PEP 183.
803 kumpf            1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
804 mday             1.7   {
805 kumpf            1.116     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
806 marek            1.118     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
807 kumpf            1.116         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
808                                    "q = %p",
809                                dst->_entry_index,
810                                dst->_monitor->_entries[dst->_entry_index].queueId,
811 marek            1.118         dst));
812 kumpf            1.116 
813                            try
814                            {
815                                dst->run(1);
816                            }
817                            catch (...)
818                            {
819 marek            1.118         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
820 kumpf            1.116             "Monitor::_dispatch: exception received");
821                            }
822 marek            1.118     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
823                                "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
824 kumpf            1.116 
825                            PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
826                                _MonitorEntry::BUSY);
827                        
828                            // Once the HTTPConnection thread has set the status value to either
829                            // Monitor::DYING or Monitor::IDLE, it has returned control of the
830                            // connection to the Monitor.  It is no longer permissible to access the
831                            // connection or the entry in the _entries table.
832                            if (dst->_connectionClosePending)
833                            {
834                                dst->_monitor->_entries[dst->_entry_index]._status =
835                                    _MonitorEntry::DYING;
836                            }
837                            else
838                            {
839                                dst->_monitor->_entries[dst->_entry_index]._status =
840                                    _MonitorEntry::IDLE;
841                            }
842                            return 0;
843 mday             1.40  }
844                        
845 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2