(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             //%/////////////////////////////////////////////////////////////////////////////
 33             
 34 mike  1.107 #include "Network.h"
 35 mike  1.2   #include <Pegasus/Common/Config.h>
 36             #include <cstring>
 37             #include "Monitor.h"
 38             #include "MessageQueue.h"
 39             #include "Socket.h"
 40 kumpf 1.4   #include <Pegasus/Common/Tracer.h>
 41 mday  1.7   #include <Pegasus/Common/HTTPConnection.h>
 42 kumpf 1.69  #include <Pegasus/Common/MessageQueueService.h>
 43 a.arora 1.73  #include <Pegasus/Common/Exception.h>
 44 mike    1.100 #include "ArrayIterator.h"
 45 kumpf   1.121.4.1 #include "HostAddress.h"
 46 mike    1.113     #include <errno.h>
 47 mike    1.2       
 48                   PEGASUS_USING_STD;
 49                   
 50                   PEGASUS_NAMESPACE_BEGIN
 51                   
 52                   ////////////////////////////////////////////////////////////////////////////////
 53                   //
 54 kumpf   1.121.4.1 // Tickler
 55 mike    1.2       //
 56                   ////////////////////////////////////////////////////////////////////////////////
 57                   
 58 kumpf   1.121.4.1 Tickler::Tickler()
 59                       : _listenSocket(PEGASUS_INVALID_SOCKET),
 60                         _clientSocket(PEGASUS_INVALID_SOCKET),
 61                         _serverSocket(PEGASUS_INVALID_SOCKET)
 62 mike    1.2       {
 63 kumpf   1.121.4.1     try
 64 mday    1.37          {
 65 kumpf   1.121.4.1         _initialize();
 66                       }
 67                       catch (...)
 68                       {
 69                           _uninitialize();
 70                           throw;
 71 mday    1.37          }
 72 mday    1.18      }
 73 mday    1.20      
 74 kumpf   1.121.4.1 Tickler::~Tickler()
 75 mike    1.2       {
 76 kumpf   1.121.4.1     _uninitialize();
 77 thilo.boehm 1.115     }
 78 kumpf       1.121.4.1 
 79 kumpf       1.121.4.2 #if defined(PEGASUS_OS_TYPE_UNIX)
 80 kumpf       1.121.4.1 
 81                       // Use an anonymous pipe for the tickle connection.
 82                       
 83                       void Tickler::_initialize()
 84 kumpf       1.116     {
 85 kumpf       1.121.4.1     int fds[2];
 86 a.dunfey    1.76      
 87 kumpf       1.121.4.1     if (pipe(fds) == -1)
 88 kumpf       1.116         {
 89 kumpf       1.121.4.1         MessageLoaderParms parms(
 90                                   "Common.Monitor.TICKLE_CREATE",
 91                                   "Received error number $0 while creating the internal socket.",
 92                                   getSocketError());
 93                               throw Exception(parms);
 94 a.dunfey    1.76          }
 95                       
 96 kumpf       1.121.4.1     _serverSocket = fds[0];
 97                           _clientSocket = fds[1];
 98 mday        1.18      }
 99                       
100 kumpf       1.121.4.1 #else
101                       
102                       // Use an external loopback socket connection to allow the tickle socket to
103 kumpf       1.121.4.2 // be included in the select() array on non-Unix platforms.
104 kumpf       1.121.4.1 
105                       void Tickler::_initialize()
106 kumpf       1.116     {
107 kumpf       1.121.4.1     //
108                           // Set up the addresses for the listen, client, and server sockets
109                           // based on whether IPv6 is enabled.
110                           //
111                       
112                           Socket::initializeInterface();
113                       
114                       # ifdef PEGASUS_ENABLE_IPV6
115                           struct sockaddr_storage listenAddress;
116                           struct sockaddr_storage clientAddress;
117                           struct sockaddr_storage serverAddress;
118                       # else
119                           struct sockaddr_in listenAddress;
120                           struct sockaddr_in clientAddress;
121                           struct sockaddr_in serverAddress;
122                       # endif
123                       
124                           int addressFamily;
125                           SocketLength addressLength;
126                       
127                           memset(&listenAddress, 0, sizeof (listenAddress));
128 kumpf       1.121.4.1 
129                       # ifdef PEGASUS_ENABLE_IPV6
130                           if (System::isIPv6StackActive())
131                           {
132                               // Use the IPv6 loopback address for the listen sockets
133                               HostAddress::convertTextToBinary(
134                                   HostAddress::AT_IPV6,
135                                   "::1",
136                                   &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
137                               listenAddress.ss_family = AF_INET6;
138                               reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
139                       
140                               addressFamily = AF_INET6;
141                               addressLength = sizeof(struct sockaddr_in6);
142                           }
143                           else
144                       # endif
145                           {
146                               // Use the IPv4 loopback address for the listen sockets
147                               HostAddress::convertTextToBinary(
148                                   HostAddress::AT_IPV4,
149 kumpf       1.121.4.1             "127.0.0.1",
150                                   &reinterpret_cast<struct sockaddr_in*>(
151                                       &listenAddress)->sin_addr.s_addr);
152                               reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
153                                   AF_INET;
154                               reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
155                       
156                               addressFamily = AF_INET;
157                               addressLength = sizeof(struct sockaddr_in);
158                           }
159                       
160                           // Use the same address for the client socket as the listen socket
161                           clientAddress = listenAddress;
162 a.arora     1.73      
163 kumpf       1.121.4.1     //
164                           // Set up a listen socket to allow the tickle client and server to connect
165                           //
166                       
167                           // Create the listen socket
168                           if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
169 kumpf       1.116                  PEGASUS_INVALID_SOCKET)
170 kumpf       1.121.4.1     {
171                               MessageLoaderParms parms(
172                                   "Common.Monitor.TICKLE_CREATE",
173                                   "Received error number $0 while creating the internal socket.",
174                                   getSocketError());
175                               throw Exception(parms);
176                           }
177                       
178                           // Bind the listen socket to the loopback address
179                           if (::bind(
180                                   _listenSocket,
181                                   reinterpret_cast<struct sockaddr*>(&listenAddress),
182                                   addressLength) < 0)
183                           {
184                               MessageLoaderParms parms(
185                                   "Common.Monitor.TICKLE_BIND",
186                                   "Received error number $0 while binding the internal socket.",
187                                   getSocketError());
188                               throw Exception(parms);
189                           }
190                       
191 kumpf       1.121.4.1     // Listen for a connection from the tickle client
192                           if ((::listen(_listenSocket, 3)) < 0)
193                           {
194                               MessageLoaderParms parms(
195                                   "Common.Monitor.TICKLE_LISTEN",
196                                   "Received error number $0 while listening to the internal socket.",
197                                   getSocketError());
198                               throw Exception(parms);
199                           }
200                       
201                           // Verify we have the correct listen socket
202                           SocketLength tmpAddressLength = addressLength;
203                           int sock = ::getsockname(
204                               _listenSocket,
205                               reinterpret_cast<struct sockaddr*>(&listenAddress),
206                               &tmpAddressLength);
207                           if (sock < 0)
208                           {
209                               MessageLoaderParms parms(
210                                   "Common.Monitor.TICKLE_SOCKNAME",
211                                   "Received error number $0 while getting the internal socket name.",
212 kumpf       1.121.4.1             getSocketError());
213                               throw Exception(parms);
214                           }
215 a.arora     1.73      
216 kumpf       1.121.4.1     //
217                           // Set up the client side of the tickle connection.
218                           //
219 ouyang.jian 1.120     
220 kumpf       1.121.4.1     // Create the client socket
221                           if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
222                                    PEGASUS_INVALID_SOCKET)
223                           {
224                               MessageLoaderParms parms(
225                                   "Common.Monitor.TICKLE_CLIENT_CREATE",
226                                   "Received error number $0 while creating the internal client "
227                                       "socket.",
228                                   getSocketError());
229                               throw Exception(parms);
230                           }
231                       
232                           // Bind the client socket to the loopback address
233                           if (::bind(
234                                   _clientSocket,
235                                   reinterpret_cast<struct sockaddr*>(&clientAddress),
236                                   addressLength) < 0)
237                           {
238                               MessageLoaderParms parms(
239                                   "Common.Monitor.TICKLE_CLIENT_BIND",
240                                   "Received error number $0 while binding the internal client "
241 kumpf       1.121.4.1                 "socket.",
242                                   getSocketError());
243                               throw Exception(parms);
244                           }
245                       
246                           // Connect the client socket to the listen socket address
247                           if (::connect(
248                                   _clientSocket,
249                                   reinterpret_cast<struct sockaddr*>(&listenAddress),
250                                   addressLength) < 0)
251                           {
252                               MessageLoaderParms parms(
253                                   "Common.Monitor.TICKLE_CLIENT_CONNECT",
254                                   "Received error number $0 while connecting the internal client "
255                                       "socket.",
256                                   getSocketError());
257                               throw Exception(parms);
258                           }
259 thilo.boehm 1.115     
260 kumpf       1.121.4.1     //
261                           // Set up the server side of the tickle connection.
262                           //
263 thilo.boehm 1.115     
264 kumpf       1.121.4.1     tmpAddressLength = addressLength;
265 a.arora     1.73      
266 kumpf       1.121.4.1     // Accept the client socket connection.
267                           _serverSocket = ::accept(
268                               _listenSocket,
269                               reinterpret_cast<struct sockaddr*>(&serverAddress),
270                               &tmpAddressLength);
271                       
272                           if (_serverSocket == PEGASUS_SOCKET_ERROR)
273                           {
274                               MessageLoaderParms parms(
275                                   "Common.Monitor.TICKLE_ACCEPT",
276                                   "Received error number $0 while accepting the internal socket "
277                                       "connection.",
278                                   getSocketError());
279                               throw Exception(parms);
280                           }
281 r.kieninger 1.83      
282 kumpf       1.121.4.1     //
283                           // Close the listen socket and make the other sockets non-blocking
284                           //
285 a.arora     1.73      
286 kumpf       1.121.4.1     Socket::close(_listenSocket);
287                           _listenSocket = PEGASUS_INVALID_SOCKET;
288 ouyang.jian 1.120     
289 kumpf       1.121.4.1     Socket::disableBlocking(_serverSocket);
290                           Socket::disableBlocking(_clientSocket);
291                       }
292 thilo.boehm 1.115     
293 kumpf       1.121.4.1 #endif
294                       
295                       void Tickler::_uninitialize()
296                       {
297                           PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
298 thilo.boehm 1.115     
299 kumpf       1.121.4.1     try
300                           {
301                               if (_serverSocket != PEGASUS_INVALID_SOCKET)
302 thilo.boehm 1.115             {
303 kumpf       1.121.4.1             Socket::close(_serverSocket);
304                                   _serverSocket = PEGASUS_INVALID_SOCKET;
305 thilo.boehm 1.115             }
306 kumpf       1.121.4.1         if (_clientSocket != PEGASUS_INVALID_SOCKET)
307 thilo.boehm 1.115             {
308 kumpf       1.121.4.1             Socket::close(_clientSocket);
309                                   _clientSocket = PEGASUS_INVALID_SOCKET;
310 kumpf       1.116             }
311 kumpf       1.121.4.1         if (_listenSocket != PEGASUS_INVALID_SOCKET)
312 thilo.boehm 1.115             {
313 kumpf       1.121.4.1             Socket::close(_listenSocket);
314                                   _listenSocket = PEGASUS_INVALID_SOCKET;
315 a.arora     1.73              }
316 kumpf       1.121.4.1     }
317                           catch (...)
318                           {
319                               PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
320                                   "Failed to close tickle sockets");
321                           }
322                           Socket::uninitializeInterface();
323                       }
324 marek       1.111     
325                       
326 kumpf       1.121.4.1 ////////////////////////////////////////////////////////////////////////////////
327                       //
328                       // Monitor
329                       //
330                       ////////////////////////////////////////////////////////////////////////////////
331                       
332                       #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
333                       Monitor::Monitor()
334                          : _stopConnections(0),
335                            _stopConnectionsSem(0),
336                            _solicitSocketCount(0)
337                       {
338                           int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
339                           _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
340                       
341                           // Create a MonitorEntry for the Tickler and set its state to IDLE so the
342                           // Monitor will watch for its events.
343                           _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);
344 a.arora     1.73          entry._status = _MonitorEntry::IDLE;
345 kumpf       1.121.4.1     _entries.append(entry);
346 thilo.boehm 1.115     
347 kumpf       1.121.4.1     // Start the count at 1 because _entries[0] is the Tickler
348                           for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
349 thilo.boehm 1.115         {
350 kumpf       1.121.4.1        _MonitorEntry entry(0, 0, 0);
351 thilo.boehm 1.115            _entries.append(entry);
352                           }
353 a.arora     1.73      }
354                       
355 kumpf       1.121.4.1 Monitor::~Monitor()
356 a.arora     1.73      {
357 kumpf       1.121.4.1     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
358                                         "returning from monitor destructor");
359                       }
360 r.kieninger 1.83      
361 kumpf       1.121.4.1 void Monitor::tickle()
362                       {
363                           AutoMutex autoMutex(_tickleMutex);
364                           Socket::write(_tickler.getClientSocket(), "\0\0", 2);
365 sushma.fernandes 1.78      }
366                            
367 kumpf            1.121     void Monitor::setState(
368                                Uint32 index,
369                                _MonitorEntry::entry_status status)
370 sushma.fernandes 1.78      {
371 kumpf            1.121         AutoMutex autoEntryMutex(_entry_mut);
372 sushma.fernandes 1.78          // Set the state to requested state
373                                _entries[index]._status = status;
374 a.arora          1.73      }
375                            
376 kumpf            1.114     void Monitor::run(Uint32 milliseconds)
377 mike             1.2       {
378 kumpf            1.36          struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
379 a.arora          1.73      
380 mday             1.25          fd_set fdread;
381                                FD_ZERO(&fdread);
382 a.arora          1.73      
383 kumpf            1.94          AutoMutex autoEntryMutex(_entry_mut);
384 r.kieninger      1.83      
385 mike             1.100         ArrayIterator<_MonitorEntry> entries(_entries);
386                            
387 kumpf            1.116         // Check the stopConnections flag.  If set, clear the Acceptor monitor
388                                // entries
389 mike             1.96          if (_stopConnections.get() == 1)
390 kumpf            1.48          {
391 mike             1.100             for ( int indx = 0; indx < (int)entries.size(); indx++)
392 kumpf            1.48              {
393 mike             1.100                 if (entries[indx]._type == Monitor::ACCEPTOR)
394 kumpf            1.48                  {
395 mike             1.100                     if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
396 kumpf            1.48                      {
397 mike             1.100                        if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
398                                                    entries[indx]._status.get() == _MonitorEntry::DYING )
399 kumpf            1.48                         {
400                                                   // remove the entry
401 kumpf            1.116                            entries[indx]._status = _MonitorEntry::EMPTY;
402 kumpf            1.48                         }
403                                               else
404                                               {
405                                                   // set status to DYING
406 mike             1.100                           entries[indx]._status = _MonitorEntry::DYING;
407 kumpf            1.48                         }
408                                           }
409                                       }
410                                    }
411                                    _stopConnections = 0;
412 kumpf            1.116             _stopConnectionsSem.signal();
413 kumpf            1.48          }
414 kumpf            1.51      
415 kumpf            1.116         for (int indx = 0; indx < (int)entries.size(); indx++)
416 kumpf            1.68          {
417 kumpf            1.116             const _MonitorEntry &entry = entries[indx];
418                                    if ((entry._status.get() == _MonitorEntry::DYING) &&
419                                        (entry._type == Monitor::CONNECTION))
420                                    {
421                                        MessageQueue *q = MessageQueue::lookup(entry.queueId);
422                                        PEGASUS_ASSERT(q != 0);
423                                        HTTPConnection &h = *static_cast<HTTPConnection *>(q);
424                            
425                                        if (h._connectionClosePending == false)
426                                            continue;
427                            
428                                        // NOTE: do not attempt to delete while there are pending responses
429                                        // coming thru. The last response to come thru after a
430                                        // _connectionClosePending will reset _responsePending to false
431                                        // and then cause the monitor to rerun this code and clean up.
432                                        // (see HTTPConnection.cpp)
433                            
434                                        if (h._responsePending == true)
435                                        {
436 marek            1.118                     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
437 kumpf            1.116                         "Monitor::run - Ignoring connection delete request "
438                                                    "because responses are still pending. "
439                                                    "connection=0x%p, socket=%d\n",
440 marek            1.118                         (void *)&h, h.getSocket()));
441 kumpf            1.116                     continue;
442                                        }
443                                        h._connectionClosePending = false;
444                                        MessageQueue &o = h.get_owner();
445                                        Message* message= new CloseConnectionMessage(entry.socket);
446                                        message->dest = o.getQueueId();
447                            
448                                        // HTTPAcceptor is responsible for closing the connection.
449                                        // The lock is released to allow HTTPAcceptor to call
450                                        // unsolicitSocketMessages to free the entry.
451                                        // Once HTTPAcceptor completes processing of the close
452                                        // connection, the lock is re-requested and processing of
453                                        // the for loop continues.  This is safe with the current
454                                        // implementation of the entries object.  Note that the
455                                        // loop condition accesses the entries.size() on each
456                                        // iteration, so that a change in size while the mutex is
457                                        // unlocked will not result in an ArrayIndexOutOfBounds
458                                        // exception.
459                            
460                                        _entry_mut.unlock();
461                                        o.enqueue(message);
462 kumpf            1.116                 _entry_mut.lock();
463                            
464                                        // After enqueue a message and the autoEntryMutex has been
465                                        // released and locked again, the array of _entries can be
466                                        // changed. The ArrayIterator has be reset with the original
467                                        // _entries.
468                                        entries.reset(_entries);
469                                    }
470 kumpf            1.68          }
471                            
472 kumpf            1.51          Uint32 _idleEntries = 0;
473 r.kieninger      1.83      
474 a.arora          1.73          /*
475 david.dillard    1.95              We will keep track of the maximum socket number and pass this value
476                                    to the kernel as a parameter to SELECT.  This loop seems like a good
477                                    place to calculate the max file descriptor (maximum socket number)
478                                    because we have to traverse the entire array.
479 r.kieninger      1.83          */
480 mike             1.106         SocketHandle maxSocketCurrentPass = 0;
481 kumpf            1.116         for (int indx = 0; indx < (int)entries.size(); indx++)
482 mike             1.2           {
483 kumpf            1.116            if (maxSocketCurrentPass < entries[indx].socket)
484                                       maxSocketCurrentPass = entries[indx].socket;
485 a.arora          1.73      
486 kumpf            1.116            if (entries[indx]._status.get() == _MonitorEntry::IDLE)
487 mday             1.25             {
488 david.dillard    1.95                 _idleEntries++;
489 mike             1.100                FD_SET(entries[indx].socket, &fdread);
490 mday             1.25             }
491 mday             1.13          }
492 s.hills          1.62      
493 a.arora          1.73          /*
494 david.dillard    1.95              Add 1 then assign maxSocket accordingly. We add 1 to account for
495                                    descriptors starting at 0.
496 a.arora          1.73          */
497                                maxSocketCurrentPass++;
498                            
499 mike             1.112         _entry_mut.unlock();
500 david.dillard    1.95      
501                                //
502                                // The first argument to select() is ignored on Windows and it is not
503                                // a socket value.  The original code assumed that the number of sockets
504                                // and a socket value have the same type.  On Windows they do not.
505                                //
506                            #ifdef PEGASUS_OS_TYPE_WINDOWS
507                                int events = select(0, &fdread, NULL, NULL, &tv);
508                            #else
509 a.arora          1.73          int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
510 david.dillard    1.95      #endif
511 mike             1.112         _entry_mut.lock();
512 kumpf            1.116     
513                                // After enqueue a message and the autoEntryMutex has been released and
514                                // locked again, the array of _entries can be changed. The ArrayIterator
515                                // has be reset with the original _entries
516 r.kieninger      1.102         entries.reset(_entries);
517 mike             1.106     
518                                if (events == PEGASUS_SOCKET_ERROR)
519 mday             1.13          {
520 marek            1.118             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
521                                        "Monitor::run - errorno = %d has occurred on select.", errno));
522 kumpf            1.116             // The EBADF error indicates that one or more or the file
523                                    // descriptions was not valid. This could indicate that
524                                    // the entries structure has been corrupted or that
525                                    // we have a synchronization error.
526 kumpf            1.50      
527 kumpf            1.116             PEGASUS_ASSERT(errno != EBADF);
528 kumpf            1.50          }
529                                else if (events)
530                                {
531 marek            1.118             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
532 kumpf            1.116                 "Monitor::run select event received events = %d, monitoring %d "
533                                            "idle entries",
534 marek            1.118                 events, _idleEntries));
535 kumpf            1.116             for (int indx = 0; indx < (int)entries.size(); indx++)
536                                    {
537                                        // The Monitor should only look at entries in the table that are
538                                        // IDLE (i.e., owned by the Monitor).
539                                        if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
540                                            (FD_ISSET(entries[indx].socket, &fdread)))
541                                        {
542                                            MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
543 marek            1.118                     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
544 kumpf            1.116                         "Monitor::run indx = %d, queueId =  %d, q = %p",
545 marek            1.118                         indx, entries[indx].queueId, q));
546 kumpf            1.116                     PEGASUS_ASSERT(q !=0);
547                            
548                                            try
549 david.dillard    1.95                      {
550 kumpf            1.116                         if (entries[indx]._type == Monitor::CONNECTION)
551                                                {
552 marek            1.118                             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
553 kumpf            1.116                                 "entries[indx].type for indx = %d is "
554                                                            "Monitor::CONNECTION",
555 marek            1.118                                 indx));
556 kumpf            1.116                             static_cast<HTTPConnection *>(q)->_entry_index = indx;
557                            
558                                                    // Do not update the entry just yet. The entry gets
559                                                    // updated once the request has been read.
560                                                    //entries[indx]._status = _MonitorEntry::BUSY;
561 sushma.fernandes 1.78      
562 kumpf            1.116                             // If allocate_and_awaken failure, retry on next
563                                                    // iteration
564 a.arora          1.73      /* Removed for PEP 183.
565 kumpf            1.116                             if (!MessageQueueService::get_thread_pool()->
566                                                            allocate_and_awaken((void *)q, _dispatch))
567                                                    {
568 kumpf            1.119                                 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
569                                                            Tracer::LEVEL2,
570 kumpf            1.116                                     "Monitor::run: Insufficient resources to "
571                                                                "process request.");
572                                                        entries[indx]._status = _MonitorEntry::IDLE;
573                                                        return true;
574                                                    }
575 a.arora          1.73      */
576                            // Added for PEP 183
577 kumpf            1.116                             HTTPConnection *dst =
578                                                        reinterpret_cast<HTTPConnection *>(q);
579 marek            1.118                             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
580 kumpf            1.116                                 "Monitor::_dispatch: entering run() for "
581                                                            "indx = %d, queueId = %d, q = %p",
582                                                        dst->_entry_index,
583                                                        dst->_monitor->_entries[dst->_entry_index].queueId,
584 marek            1.118                                 dst));
585 kumpf            1.116     
586                                                    try
587                                                    {
588                                                        dst->run(1);
589                                                    }
590                                                    catch (...)
591                                                    {
592 marek            1.118                                 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
593 kumpf            1.116                                     "Monitor::_dispatch: exception received");
594                                                    }
595 marek            1.118                             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
596 kumpf            1.116                                 "Monitor::_dispatch: exited run() for index %d",
597 marek            1.118                                 dst->_entry_index));
598 kumpf            1.116     
599                                                    // It is possible the entry status may not be set to
600                                                    // busy.  The following will fail in that case.
601                                                    // PEGASUS_ASSERT(dst->_monitor->_entries[
602                                                    //     dst->_entry_index]._status.get() ==
603                                                    //    _MonitorEntry::BUSY);
604                                                    // Once the HTTPConnection thread has set the status
605                                                    // value to either Monitor::DYING or Monitor::IDLE,
606                                                    // it has returned control of the connection to the
607                                                    // Monitor.  It is no longer permissible to access
608                                                    // the connection or the entry in the _entries table.
609                            
610                                                    // The following is not relevant as the worker thread
611                                                    // or the reader thread will update the status of the
612                                                    // entry.
613                                                    //if (dst->_connectionClosePending)
614                                                    //{
615                                                    //  dst->_monitor->_entries[dst->_entry_index]._status =
616                                                    //    _MonitorEntry::DYING;
617                                                    //}
618                                                    //else
619 kumpf            1.116                             //{
620                                                    //  dst->_monitor->_entries[dst->_entry_index]._status =
621                                                    //    _MonitorEntry::IDLE;
622                                                    //}
623 r.kieninger      1.83      // end Added for PEP 183
624 kumpf            1.116                         }
625                                                else if (entries[indx]._type == Monitor::INTERNAL)
626                                                {
627                                                    // set ourself to BUSY,
628 r.kieninger      1.83                              // read the data
629 a.arora          1.73                              // and set ourself back to IDLE
630 r.kieninger      1.83      
631 kumpf            1.116                             entries[indx]._status = _MonitorEntry::BUSY;
632                                                    static char buffer[2];
633                                                    Sint32 amt =
634                                                        Socket::read(entries[indx].socket,&buffer, 2);
635                            
636 kumpf            1.121.4.1                         entries[indx]._status = _MonitorEntry::IDLE;
637 kumpf            1.116                         }
638                                                else
639                                                {
640 marek            1.118                             PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
641 kumpf            1.116                                 "Non-connection entry, indx = %d, has been "
642                                                            "received.",
643 marek            1.118                                 indx));
644 kumpf            1.116                             int events = 0;
645                                                    events |= SocketMessage::READ;
646                                                    Message* msg = new SocketMessage(
647                                                        entries[indx].socket, events);
648                                                    entries[indx]._status = _MonitorEntry::BUSY;
649                                                    _entry_mut.unlock();
650                                                    q->enqueue(msg);
651                                                    _entry_mut.lock();
652                            
653                                                    // After enqueue a message and the autoEntryMutex has
654                                                    // been released and locked again, the array of
655                                                    // entries can be changed. The ArrayIterator has be
656                                                    // reset with the original _entries
657                                                    entries.reset(_entries);
658                                                    entries[indx]._status = _MonitorEntry::IDLE;
659                                                }
660                                            }
661                                            catch (...)
662                                            {
663                                            }
664 thilo.boehm      1.115                 }
665 kumpf            1.116             }
666 mday             1.24          }
667 mike             1.2       }
668                            
669 chuck            1.74      void Monitor::stopListeningForConnections(Boolean wait)
670 kumpf            1.48      {
671                                PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
672 r.kieninger      1.83          // set boolean then tickle the server to recognize _stopConnections
673 kumpf            1.48          _stopConnections = 1;
674 a.arora          1.73          tickle();
675 kumpf            1.48      
676 chuck            1.74          if (wait)
677 a.arora          1.73          {
678 chuck            1.74            // Wait for the monitor to notice _stopConnections.  Otherwise the
679                                  // caller of this function may unbind the ports while the monitor
680                                  // is still accepting connections on them.
681 kumpf            1.101           _stopConnectionsSem.wait();
682 a.arora          1.73          }
683 r.kieninger      1.83      
684 kumpf            1.48          PEG_METHOD_EXIT();
685                            }
686 mday             1.25      
687 mday             1.37      
688 kumpf            1.116     int Monitor::solicitSocketMessages(
689 mike             1.106         SocketHandle socket,
690 mike             1.2           Uint32 events,
691 r.kieninger      1.83          Uint32 queueId,
692 mday             1.8           int type)
693 mike             1.2       {
694 kumpf            1.116         PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
695                                AutoMutex autoMut(_entry_mut);
696                                // Check to see if we need to dynamically grow the _entries array
697                                // We always want the _entries array to 2 bigger than the
698                                // current connections requested
699                                _solicitSocketCount++;  // bump the count
700                                int size = (int)_entries.size();
701                                if ((int)_solicitSocketCount >= (size-1))
702                                {
703                                    for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
704                                    {
705                                        _MonitorEntry entry(0, 0, 0);
706                                        _entries.append(entry);
707                                    }
708                                }
709 a.arora          1.73      
710 kumpf            1.116         int index;
711                                for (index = 1; index < (int)_entries.size(); index++)
712                                {
713                                    try
714                                    {
715                                        if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
716                                        {
717                                            _entries[index].socket = socket;
718                                            _entries[index].queueId  = queueId;
719                                            _entries[index]._type = type;
720                                            _entries[index]._status = _MonitorEntry::IDLE;
721                            
722                                            return index;
723                                        }
724                                    }
725                                    catch (...)
726                                    {
727                                    }
728                                }
729                                // decrease the count, if we are here we didn't do anything meaningful
730                                _solicitSocketCount--;
731 kumpf            1.116         PEG_METHOD_EXIT();
732                                return -1;
733 mike             1.2       }
734                            
735 mike             1.106     void Monitor::unsolicitSocketMessages(SocketHandle socket)
736 mike             1.2       {
737 mday             1.25          PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
738 alagaraja        1.75          AutoMutex autoMut(_entry_mut);
739 a.arora          1.73      
740                                /*
741 kumpf            1.116             Start at index = 1 because _entries[0] is the tickle entry which
742                                    never needs to be EMPTY;
743 a.arora          1.73          */
744 w.otsuka         1.89          unsigned int index;
745 kumpf            1.116         for (index = 1; index < _entries.size(); index++)
746 mike             1.2           {
747 kumpf            1.116             if (_entries[index].socket == socket)
748                                    {
749                                        _entries[index]._status = _MonitorEntry::EMPTY;
750                                        _entries[index].socket = PEGASUS_INVALID_SOCKET;
751                                        _solicitSocketCount--;
752                                        break;
753                                    }
754 mike             1.2           }
755 a.arora          1.73      
756                                /*
757 kumpf            1.116             Dynamic Contraction:
758                                    To remove excess entries we will start from the end of the _entries
759                                    array and remove all entries with EMPTY status until we find the
760                                    first NON EMPTY.  This prevents the positions, of the NON EMPTY
761                                    entries, from being changed.
762 r.kieninger      1.83          */
763 a.arora          1.73          index = _entries.size() - 1;
764 kumpf            1.116         while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
765                                {
766                                    if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
767 a.arora          1.73                      _entries.remove(index);
768 kumpf            1.116             index--;
769 a.arora          1.73          }
770 kumpf            1.4           PEG_METHOD_EXIT();
771 mike             1.2       }
772                            
773 a.arora          1.73      // Note: this is no longer called with PEP 183.
774 kumpf            1.116     ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
775 mday             1.7       {
776 kumpf            1.116         HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
777 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
778 kumpf            1.116             "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
779                                        "q = %p",
780                                    dst->_entry_index,
781                                    dst->_monitor->_entries[dst->_entry_index].queueId,
782 marek            1.118             dst));
783 kumpf            1.116     
784                                try
785                                {
786                                    dst->run(1);
787                                }
788                                catch (...)
789                                {
790 marek            1.118             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
791 kumpf            1.116                 "Monitor::_dispatch: exception received");
792                                }
793 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
794                                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
795 kumpf            1.116     
796                                PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
797                                    _MonitorEntry::BUSY);
798                            
799                                // Once the HTTPConnection thread has set the status value to either
800                                // Monitor::DYING or Monitor::IDLE, it has returned control of the
801                                // connection to the Monitor.  It is no longer permissible to access the
802                                // connection or the entry in the _entries table.
803                                if (dst->_connectionClosePending)
804                                {
805                                    dst->_monitor->_entries[dst->_entry_index]._status =
806                                        _MonitorEntry::DYING;
807                                }
808                                else
809                                {
810                                    dst->_monitor->_entries[dst->_entry_index]._status =
811                                        _MonitorEntry::IDLE;
812                                }
813                                return 0;
814 mday             1.40      }
815                            
816 mike             1.2       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2