(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.103 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.2   //
  3 karl  1.79  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.64  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.79  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.90  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12             // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.2   //
 14             // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 kumpf 1.17  // of this software and associated documentation files (the "Software"), to
 16             // deal in the Software without restriction, including without limitation the
 17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.2   // sell copies of the Software, and to permit persons to whom the Software is
 19             // furnished to do so, subject to the following conditions:
 20 karl  1.103 // 
 21 kumpf 1.17  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 kumpf 1.17  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.2   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29             //
 30             //==============================================================================
 31             //
 32             //%/////////////////////////////////////////////////////////////////////////////
 33             
 34 mike  1.107 #include "Network.h"
 35 mike  1.2   #include <Pegasus/Common/Config.h>
 36             #include <cstring>
 37             #include "Monitor.h"
 38             #include "MessageQueue.h"
 39             #include "Socket.h"
 40 kumpf 1.4   #include <Pegasus/Common/Tracer.h>
 41 mday  1.7   #include <Pegasus/Common/HTTPConnection.h>
 42 kumpf 1.69  #include <Pegasus/Common/MessageQueueService.h>
 43 a.arora 1.73  #include <Pegasus/Common/Exception.h>
 44 mike    1.100 #include "ArrayIterator.h"
 45 mike    1.113 #include <errno.h>
 46 mike    1.2   
 47               PEGASUS_USING_STD;
 48               
 49               PEGASUS_NAMESPACE_BEGIN
 50               
 51 mike    1.96  static AtomicInt _connections(0);
 52 mday    1.25  
 53 mike    1.2   ////////////////////////////////////////////////////////////////////////////////
 54               //
 55               // Monitor
 56               //
 57               ////////////////////////////////////////////////////////////////////////////////
 58               
 59 kumpf   1.54  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 60 mike    1.2   Monitor::Monitor()
 61 kumpf   1.87     : _stopConnections(0),
 62 a.arora 1.73       _stopConnectionsSem(0),
 63 a.dunfey 1.76       _solicitSocketCount(0),
 64 a.dunfey 1.77       _tickle_client_socket(-1),
 65                     _tickle_server_socket(-1),
 66                     _tickle_peer_socket(-1)
 67 mike     1.2   {
 68 kumpf    1.54      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 69 mike     1.2       Socket::initializeInterface();
 70 kumpf    1.54      _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
 71 a.arora  1.73  
 72                    // setup the tickler
 73                    initializeTickler();
 74                
 75 r.kieninger 1.83      // Start the count at 1 because initilizeTickler()
 76 a.arora     1.73      // has added an entry in the first position of the
 77                       // _entries array
 78 kumpf       1.116     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
 79 mday        1.37      {
 80                          _MonitorEntry entry(0, 0, 0);
 81                          _entries.append(entry);
 82                       }
 83 mday        1.18  }
 84 mday        1.20  
 85 mike        1.2   Monitor::~Monitor()
 86                   {
 87 thilo.boehm 1.115     uninitializeTickler();
 88                       Socket::uninitializeInterface();
 89 marek       1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
 90 thilo.boehm 1.115                   "returning from monitor destructor");
 91                   }
 92 kumpf       1.116 void Monitor::uninitializeTickler()
 93                   {
 94 marek       1.118     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
 95 a.dunfey    1.76  
 96 kumpf       1.116     try
 97                       {
 98                           if (_tickle_peer_socket >= 0)
 99 a.dunfey    1.76          {
100                               Socket::close(_tickle_peer_socket);
101                           }
102 kumpf       1.116         if (_tickle_client_socket >= 0)
103 a.dunfey    1.76          {
104                               Socket::close(_tickle_client_socket);
105                           }
106 kumpf       1.116         if (_tickle_server_socket >= 0)
107 a.dunfey    1.76          {
108                               Socket::close(_tickle_server_socket);
109                           }
110                       }
111 kumpf       1.116     catch (...)
112 a.dunfey    1.76      {
113 marek       1.118         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
114 a.dunfey    1.76                    "Failed to close tickle sockets");
115                       }
116                   
117 mday        1.18  }
118                   
119 kumpf       1.116 void Monitor::initializeTickler()
120                   {
121 r.kieninger 1.83      /*
122                          NOTE: On any errors trying to
123                                setup out tickle connection,
124 a.arora     1.73               throw an exception/end the server
125                       */
126                   
127                       /* setup the tickle server/listener */
128 kumpf       1.116     // try until the tcpip is restarted
129 thilo.boehm 1.115     do
130                       {
131                           // get a socket for the server side
132 kumpf       1.116         if ((_tickle_server_socket =
133                                    Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
134                                PEGASUS_INVALID_SOCKET)
135                           {
136                               MessageLoaderParms parms(
137                                   "Common.Monitor.TICKLE_CREATE",
138                                   "Received error number $0 while creating the internal socket.",
139                                   getSocketError());
140 thilo.boehm 1.115             throw Exception(parms);
141                           }
142 a.arora     1.73  
143 thilo.boehm 1.115         // initialize the address
144                           memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
145 ouyang.jian 1.120 
146 thilo.boehm 1.115         _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
147                           _tickle_server_addr.sin_family = PF_INET;
148                           _tickle_server_addr.sin_port = 0;
149 a.arora     1.73  
150 thilo.boehm 1.115         SocketLength _addr_size = sizeof(_tickle_server_addr);
151 a.arora     1.73  
152 thilo.boehm 1.115         // bind server side to socket
153 kumpf       1.116         if ((::bind(_tickle_server_socket,
154                                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
155                                    sizeof(_tickle_server_addr))) < 0)
156 thilo.boehm 1.115         {
157 r.kieninger 1.83  #ifdef PEGASUS_OS_ZOS
158 kumpf       1.116             MessageLoaderParms parms(
159                                   "Common.Monitor.TICKLE_BIND_LONG",
160                                   "Received error:$0 while binding the internal socket.",
161                                   strerror(errno));
162 r.kieninger 1.83  #else
163 kumpf       1.116             MessageLoaderParms parms(
164                                   "Common.Monitor.TICKLE_BIND",
165                                   "Received error number $0 while binding the internal socket.",
166                                   getSocketError());
167 r.kieninger 1.83  #endif
168 thilo.boehm 1.115             throw Exception(parms);
169                           }
170                   
171                           // tell the kernel we are a server
172 kumpf       1.116         if ((::listen(_tickle_server_socket, 3)) < 0)
173 thilo.boehm 1.115         {
174 kumpf       1.116             MessageLoaderParms parms(
175                                   "Common.Monitor.TICKLE_LISTEN",
176                                   "Received error number $0 while listening to the internal "
177                                       "socket.",
178                                   getSocketError());
179 thilo.boehm 1.115             throw Exception(parms);
180                           }
181                   
182                           // make sure we have the correct socket for our server
183 kumpf       1.116         int sock = ::getsockname(
184                               _tickle_server_socket,
185                               reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
186                               &_addr_size);
187                           if (sock < 0)
188                           {
189                               MessageLoaderParms parms(
190                                   "Common.Monitor.TICKLE_SOCKNAME",
191                                   "Received error number $0 while getting the internal socket "
192                                       "name.",
193                                   getSocketError());
194 thilo.boehm 1.115             throw Exception(parms);
195                           }
196 a.arora     1.73  
197 thilo.boehm 1.115         /* set up the tickle client/connector */
198 r.kieninger 1.83  
199 thilo.boehm 1.115         // get a socket for our tickle client
200 kumpf       1.116         if ((_tickle_client_socket =
201                                    Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
202                               PEGASUS_INVALID_SOCKET)
203                           {
204                               MessageLoaderParms parms(
205                                   "Common.Monitor.TICKLE_CLIENT_CREATE",
206                                   "Received error number $0 while creating the internal client "
207                                       "socket.",
208                                   getSocketError());
209 thilo.boehm 1.115             throw Exception(parms);
210                           }
211 a.arora     1.73  
212 thilo.boehm 1.115         // setup the address of the client
213                           memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
214 ouyang.jian 1.120 
215 thilo.boehm 1.115         _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
216                           _tickle_client_addr.sin_family = PF_INET;
217                           _tickle_client_addr.sin_port = 0;
218 a.arora     1.73  
219 thilo.boehm 1.115         // bind socket to client side
220 kumpf       1.116         if ((::bind(_tickle_client_socket,
221                                    reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
222                                    sizeof(_tickle_client_addr))) < 0)
223                           {
224                               MessageLoaderParms parms(
225                                   "Common.Monitor.TICKLE_CLIENT_BIND",
226                                   "Received error number $0 while binding the internal client "
227                                       "socket.",
228                                   getSocketError());
229 thilo.boehm 1.115             throw Exception(parms);
230                           }
231                   
232                           // connect to server side
233 kumpf       1.116         if ((::connect(_tickle_client_socket,
234                                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
235                                    sizeof(_tickle_server_addr))) < 0)
236                           {
237                               MessageLoaderParms parms(
238                                   "Common.Monitor.TICKLE_CLIENT_CONNECT",
239                                   "Received error number $0 while connecting the internal "
240                                       "client socket.",
241                                   getSocketError());
242 thilo.boehm 1.115             throw Exception(parms);
243                           }
244                   
245                           /* set up the slave connection */
246                           memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
247                           SocketLength peer_size = sizeof(_tickle_peer_addr);
248                           Threads::sleep(1);
249                   
250 kumpf       1.116         // this call may fail, we will try a max of 20 times to establish
251                           // this peer connection
252                           if ((_tickle_peer_socket = ::accept(_tickle_server_socket,
253                                    reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
254                                    &peer_size)) < 0)
255 thilo.boehm 1.115         {
256 kumpf       1.116             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
257                                   getSocketError() == PEGASUS_NETWORK_TRYAGAIN)
258 thilo.boehm 1.115             {
259                                   int retries = 0;
260                                   do
261                                   {
262                                       Threads::sleep(1);
263 kumpf       1.116                     _tickle_peer_socket = ::accept(
264                                           _tickle_server_socket,
265                                           reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
266                                           &peer_size);
267 thilo.boehm 1.115                     retries++;
268 kumpf       1.116                 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
269                                            getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&
270                                            retries < 20);
271 thilo.boehm 1.115             }
272                               // TCP/IP is down, destroy sockets and retry again.
273 kumpf       1.116             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
274                                   getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
275 thilo.boehm 1.115             {
276                                   // destroy everything
277                                   uninitializeTickler();
278                                   // retry again.
279                                   continue;
280                               }
281                           }
282 kumpf       1.116         if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)
283 thilo.boehm 1.115         {
284 kumpf       1.116             MessageLoaderParms parms(
285                                   "Common.Monitor.TICKLE_ACCEPT",
286                                   "Received error number $0 while accepting the internal "
287                                       "socket connection.",
288                                   getSocketError());
289 thilo.boehm 1.115             throw Exception(parms);
290 kumpf       1.116         }
291                           else
292 thilo.boehm 1.115         {
293                               // socket is ok
294                               break;
295 a.arora     1.73          }
296 kumpf       1.116     } while (1); // try until TCP/IP is restarted
297 marek       1.111 
298                       Socket::disableBlocking(_tickle_peer_socket);
299                       Socket::disableBlocking(_tickle_client_socket);
300                   
301 kumpf       1.116     // add the tickler to the list of entries to be monitored and set to
302                       // IDLE because Monitor only
303 a.arora     1.73      // checks entries with IDLE state for events
304                       _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
305                       entry._status = _MonitorEntry::IDLE;
306 thilo.boehm 1.115 
307                       // is the tickler initalized as first socket on startup ?
308                       if (_entries.size()==0)
309                       {
310                          // if yes, append a new entry
311                          _entries.append(entry);
312                       }
313                       else
314                       {
315                          // if not, overwrite the tickler entry with new socket
316                          _entries[0]=entry;
317                       }
318 a.arora     1.73  }
319                   
320 kumpf       1.116 void Monitor::tickle()
321 a.arora     1.73  {
322 sushma.fernandes 1.78      static char _buffer[] =
323 a.arora          1.73      {
324                              '0','0'
325                            };
326 r.kieninger      1.83  
327 sushma.fernandes 1.78      AutoMutex autoMutex(_tickle_mutex);
328                            Socket::write(_tickle_client_socket,&_buffer, 2);
329                        }
330                        
331 kumpf            1.121 void Monitor::setState(
332                            Uint32 index,
333                            _MonitorEntry::entry_status status)
334 sushma.fernandes 1.78  {
335 kumpf            1.121     AutoMutex autoEntryMutex(_entry_mut);
336 sushma.fernandes 1.78      // Set the state to requested state
337                            _entries[index]._status = status;
338 a.arora          1.73  }
339                        
340 kumpf            1.114 void Monitor::run(Uint32 milliseconds)
341 mike             1.2   {
342 mday             1.18  
343 r.kieninger      1.83  
344 kumpf            1.36      struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
345 a.arora          1.73  
346 mday             1.25      fd_set fdread;
347                            FD_ZERO(&fdread);
348 a.arora          1.73  
349 kumpf            1.94      AutoMutex autoEntryMutex(_entry_mut);
350 r.kieninger      1.83  
351 mike             1.100     ArrayIterator<_MonitorEntry> entries(_entries);
352                        
353 kumpf            1.116     // Check the stopConnections flag.  If set, clear the Acceptor monitor
354                            // entries
355 mike             1.96      if (_stopConnections.get() == 1)
356 kumpf            1.48      {
357 mike             1.100         for ( int indx = 0; indx < (int)entries.size(); indx++)
358 kumpf            1.48          {
359 mike             1.100             if (entries[indx]._type == Monitor::ACCEPTOR)
360 kumpf            1.48              {
361 mike             1.100                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
362 kumpf            1.48                  {
363 mike             1.100                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
364                                                entries[indx]._status.get() == _MonitorEntry::DYING )
365 kumpf            1.48                     {
366                                               // remove the entry
367 kumpf            1.116                        entries[indx]._status = _MonitorEntry::EMPTY;
368 kumpf            1.48                     }
369                                           else
370                                           {
371                                               // set status to DYING
372 mike             1.100                       entries[indx]._status = _MonitorEntry::DYING;
373 kumpf            1.48                     }
374                                       }
375                                   }
376                                }
377                                _stopConnections = 0;
378 kumpf            1.116         _stopConnectionsSem.signal();
379 kumpf            1.48      }
380 kumpf            1.51  
381 kumpf            1.116     for (int indx = 0; indx < (int)entries.size(); indx++)
382 kumpf            1.68      {
383 kumpf            1.116         const _MonitorEntry &entry = entries[indx];
384                                if ((entry._status.get() == _MonitorEntry::DYING) &&
385                                    (entry._type == Monitor::CONNECTION))
386                                {
387                                    MessageQueue *q = MessageQueue::lookup(entry.queueId);
388                                    PEGASUS_ASSERT(q != 0);
389                                    HTTPConnection &h = *static_cast<HTTPConnection *>(q);
390                        
391                                    if (h._connectionClosePending == false)
392                                        continue;
393                        
394                                    // NOTE: do not attempt to delete while there are pending responses
395                                    // coming thru. The last response to come thru after a
396                                    // _connectionClosePending will reset _responsePending to false
397                                    // and then cause the monitor to rerun this code and clean up.
398                                    // (see HTTPConnection.cpp)
399                        
400                                    if (h._responsePending == true)
401                                    {
402 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
403 kumpf            1.116                     "Monitor::run - Ignoring connection delete request "
404                                                "because responses are still pending. "
405                                                "connection=0x%p, socket=%d\n",
406 marek            1.118                     (void *)&h, h.getSocket()));
407 kumpf            1.116                 continue;
408                                    }
409                                    h._connectionClosePending = false;
410                                    MessageQueue &o = h.get_owner();
411                                    Message* message= new CloseConnectionMessage(entry.socket);
412                                    message->dest = o.getQueueId();
413                        
414                                    // HTTPAcceptor is responsible for closing the connection.
415                                    // The lock is released to allow HTTPAcceptor to call
416                                    // unsolicitSocketMessages to free the entry.
417                                    // Once HTTPAcceptor completes processing of the close
418                                    // connection, the lock is re-requested and processing of
419                                    // the for loop continues.  This is safe with the current
420                                    // implementation of the entries object.  Note that the
421                                    // loop condition accesses the entries.size() on each
422                                    // iteration, so that a change in size while the mutex is
423                                    // unlocked will not result in an ArrayIndexOutOfBounds
424                                    // exception.
425                        
426                                    _entry_mut.unlock();
427                                    o.enqueue(message);
428 kumpf            1.116             _entry_mut.lock();
429                        
430                                    // After enqueue a message and the autoEntryMutex has been
431                                    // released and locked again, the array of _entries can be
432                                    // changed. The ArrayIterator has be reset with the original
433                                    // _entries.
434                                    entries.reset(_entries);
435                                }
436 kumpf            1.68      }
437                        
438 kumpf            1.51      Uint32 _idleEntries = 0;
439 r.kieninger      1.83  
440 a.arora          1.73      /*
441 david.dillard    1.95          We will keep track of the maximum socket number and pass this value
442                                to the kernel as a parameter to SELECT.  This loop seems like a good
443                                place to calculate the max file descriptor (maximum socket number)
444                                because we have to traverse the entire array.
445 r.kieninger      1.83      */
446 mike             1.106     SocketHandle maxSocketCurrentPass = 0;
447 kumpf            1.116     for (int indx = 0; indx < (int)entries.size(); indx++)
448 mike             1.2       {
449 kumpf            1.116        if (maxSocketCurrentPass < entries[indx].socket)
450                                   maxSocketCurrentPass = entries[indx].socket;
451 a.arora          1.73  
452 kumpf            1.116        if (entries[indx]._status.get() == _MonitorEntry::IDLE)
453 mday             1.25         {
454 david.dillard    1.95             _idleEntries++;
455 mike             1.100            FD_SET(entries[indx].socket, &fdread);
456 mday             1.25         }
457 mday             1.13      }
458 s.hills          1.62  
459 a.arora          1.73      /*
460 david.dillard    1.95          Add 1 then assign maxSocket accordingly. We add 1 to account for
461                                descriptors starting at 0.
462 a.arora          1.73      */
463                            maxSocketCurrentPass++;
464                        
465 mike             1.112     _entry_mut.unlock();
466 david.dillard    1.95  
467                            //
468                            // The first argument to select() is ignored on Windows and it is not
469                            // a socket value.  The original code assumed that the number of sockets
470                            // and a socket value have the same type.  On Windows they do not.
471                            //
472                        #ifdef PEGASUS_OS_TYPE_WINDOWS
473                            int events = select(0, &fdread, NULL, NULL, &tv);
474                        #else
475 a.arora          1.73      int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
476 david.dillard    1.95  #endif
477 mike             1.112     _entry_mut.lock();
478 kumpf            1.116 
479                            // After enqueue a message and the autoEntryMutex has been released and
480                            // locked again, the array of _entries can be changed. The ArrayIterator
481                            // has be reset with the original _entries
482 r.kieninger      1.102     entries.reset(_entries);
483 mike             1.106 
484                            if (events == PEGASUS_SOCKET_ERROR)
485 mday             1.13      {
486 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
487                                    "Monitor::run - errorno = %d has occurred on select.", errno));
488 kumpf            1.116         // The EBADF error indicates that one or more or the file
489                                // descriptions was not valid. This could indicate that
490                                // the entries structure has been corrupted or that
491                                // we have a synchronization error.
492 kumpf            1.50  
493 kumpf            1.116         PEGASUS_ASSERT(errno != EBADF);
494 kumpf            1.50      }
495                            else if (events)
496                            {
497 marek            1.118         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
498 kumpf            1.116             "Monitor::run select event received events = %d, monitoring %d "
499                                        "idle entries",
500 marek            1.118             events, _idleEntries));
501 kumpf            1.116         for (int indx = 0; indx < (int)entries.size(); indx++)
502                                {
503                                    // The Monitor should only look at entries in the table that are
504                                    // IDLE (i.e., owned by the Monitor).
505                                    if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
506                                        (FD_ISSET(entries[indx].socket, &fdread)))
507                                    {
508                                        MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
509 marek            1.118                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
510 kumpf            1.116                     "Monitor::run indx = %d, queueId =  %d, q = %p",
511 marek            1.118                     indx, entries[indx].queueId, q));
512 kumpf            1.116                 PEGASUS_ASSERT(q !=0);
513                        
514                                        try
515 david.dillard    1.95                  {
516 kumpf            1.116                     if (entries[indx]._type == Monitor::CONNECTION)
517                                            {
518 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
519 kumpf            1.116                             "entries[indx].type for indx = %d is "
520                                                        "Monitor::CONNECTION",
521 marek            1.118                             indx));
522 kumpf            1.116                         static_cast<HTTPConnection *>(q)->_entry_index = indx;
523                        
524                                                // Do not update the entry just yet. The entry gets
525                                                // updated once the request has been read.
526                                                //entries[indx]._status = _MonitorEntry::BUSY;
527 sushma.fernandes 1.78  
528 kumpf            1.116                         // If allocate_and_awaken failure, retry on next
529                                                // iteration
530 a.arora          1.73  /* Removed for PEP 183.
531 kumpf            1.116                         if (!MessageQueueService::get_thread_pool()->
532                                                        allocate_and_awaken((void *)q, _dispatch))
533                                                {
534 kumpf            1.119                             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
535                                                        Tracer::LEVEL2,
536 kumpf            1.116                                 "Monitor::run: Insufficient resources to "
537                                                            "process request.");
538                                                    entries[indx]._status = _MonitorEntry::IDLE;
539                                                    return true;
540                                                }
541 a.arora          1.73  */
542                        // Added for PEP 183
543 kumpf            1.116                         HTTPConnection *dst =
544                                                    reinterpret_cast<HTTPConnection *>(q);
545 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
546 kumpf            1.116                             "Monitor::_dispatch: entering run() for "
547                                                        "indx = %d, queueId = %d, q = %p",
548                                                    dst->_entry_index,
549                                                    dst->_monitor->_entries[dst->_entry_index].queueId,
550 marek            1.118                             dst));
551 kumpf            1.116 
552                                                try
553                                                {
554                                                    dst->run(1);
555                                                }
556                                                catch (...)
557                                                {
558 marek            1.118                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
559 kumpf            1.116                                 "Monitor::_dispatch: exception received");
560                                                }
561 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
562 kumpf            1.116                             "Monitor::_dispatch: exited run() for index %d",
563 marek            1.118                             dst->_entry_index));
564 kumpf            1.116 
565                                                // It is possible the entry status may not be set to
566                                                // busy.  The following will fail in that case.
567                                                // PEGASUS_ASSERT(dst->_monitor->_entries[
568                                                //     dst->_entry_index]._status.get() ==
569                                                //    _MonitorEntry::BUSY);
570                                                // Once the HTTPConnection thread has set the status
571                                                // value to either Monitor::DYING or Monitor::IDLE,
572                                                // it has returned control of the connection to the
573                                                // Monitor.  It is no longer permissible to access
574                                                // the connection or the entry in the _entries table.
575                        
576                                                // The following is not relevant as the worker thread
577                                                // or the reader thread will update the status of the
578                                                // entry.
579                                                //if (dst->_connectionClosePending)
580                                                //{
581                                                //  dst->_monitor->_entries[dst->_entry_index]._status =
582                                                //    _MonitorEntry::DYING;
583                                                //}
584                                                //else
585 kumpf            1.116                         //{
586                                                //  dst->_monitor->_entries[dst->_entry_index]._status =
587                                                //    _MonitorEntry::IDLE;
588                                                //}
589 r.kieninger      1.83  // end Added for PEP 183
590 kumpf            1.116                     }
591                                            else if (entries[indx]._type == Monitor::INTERNAL)
592                                            {
593                                                // set ourself to BUSY,
594 r.kieninger      1.83                          // read the data
595 a.arora          1.73                          // and set ourself back to IDLE
596 r.kieninger      1.83  
597 kumpf            1.116                         entries[indx]._status = _MonitorEntry::BUSY;
598                                                static char buffer[2];
599                                                Sint32 amt =
600                                                    Socket::read(entries[indx].socket,&buffer, 2);
601                        
602                                                if (amt == PEGASUS_SOCKET_ERROR &&
603                                                    getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
604                                                {
605 marek            1.118                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
606 kumpf            1.116                                 "Monitor::run: Tickler socket got an IO error. "
607                                                            "Going to re-create Socket and wait for "
608                                                            "TCP/IP restart.");
609                                                    uninitializeTickler();
610                                                    initializeTickler();
611                                                }
612                                                else
613                                                {
614                                                    entries[indx]._status = _MonitorEntry::IDLE;
615                                                }
616                                            }
617                                            else
618                                            {
619 marek            1.118                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
620 kumpf            1.116                             "Non-connection entry, indx = %d, has been "
621                                                        "received.",
622 marek            1.118                             indx));
623 kumpf            1.116                         int events = 0;
624                                                events |= SocketMessage::READ;
625                                                Message* msg = new SocketMessage(
626                                                    entries[indx].socket, events);
627                                                entries[indx]._status = _MonitorEntry::BUSY;
628                                                _entry_mut.unlock();
629                                                q->enqueue(msg);
630                                                _entry_mut.lock();
631                        
632                                                // After enqueue a message and the autoEntryMutex has
633                                                // been released and locked again, the array of
634                                                // entries can be changed. The ArrayIterator has be
635                                                // reset with the original _entries
636                                                entries.reset(_entries);
637                                                entries[indx]._status = _MonitorEntry::IDLE;
638                                            }
639                                        }
640                                        catch (...)
641                                        {
642                                        }
643 thilo.boehm      1.115             }
644 kumpf            1.116         }
645 mday             1.24      }
646 mike             1.2   }
647                        
648 chuck            1.74  void Monitor::stopListeningForConnections(Boolean wait)
649 kumpf            1.48  {
650                            PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
651 r.kieninger      1.83      // set boolean then tickle the server to recognize _stopConnections
652 kumpf            1.48      _stopConnections = 1;
653 a.arora          1.73      tickle();
654 kumpf            1.48  
655 chuck            1.74      if (wait)
656 a.arora          1.73      {
657 chuck            1.74        // Wait for the monitor to notice _stopConnections.  Otherwise the
658                              // caller of this function may unbind the ports while the monitor
659                              // is still accepting connections on them.
660 kumpf            1.101       _stopConnectionsSem.wait();
661 a.arora          1.73      }
662 r.kieninger      1.83  
663 kumpf            1.48      PEG_METHOD_EXIT();
664                        }
665 mday             1.25  
666 mday             1.37  
667 kumpf            1.116 int Monitor::solicitSocketMessages(
668 mike             1.106     SocketHandle socket,
669 mike             1.2       Uint32 events,
670 r.kieninger      1.83      Uint32 queueId,
671 mday             1.8       int type)
672 mike             1.2   {
673 kumpf            1.116     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
674                            AutoMutex autoMut(_entry_mut);
675                            // Check to see if we need to dynamically grow the _entries array
676                            // We always want the _entries array to 2 bigger than the
677                            // current connections requested
678                            _solicitSocketCount++;  // bump the count
679                            int size = (int)_entries.size();
680                            if ((int)_solicitSocketCount >= (size-1))
681                            {
682                                for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
683                                {
684                                    _MonitorEntry entry(0, 0, 0);
685                                    _entries.append(entry);
686                                }
687                            }
688 a.arora          1.73  
689 kumpf            1.116     int index;
690                            for (index = 1; index < (int)_entries.size(); index++)
691                            {
692                                try
693                                {
694                                    if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
695                                    {
696                                        _entries[index].socket = socket;
697                                        _entries[index].queueId  = queueId;
698                                        _entries[index]._type = type;
699                                        _entries[index]._status = _MonitorEntry::IDLE;
700                        
701                                        return index;
702                                    }
703                                }
704                                catch (...)
705                                {
706                                }
707                            }
708                            // decrease the count, if we are here we didn't do anything meaningful
709                            _solicitSocketCount--;
710 kumpf            1.116     PEG_METHOD_EXIT();
711                            return -1;
712 mike             1.2   }
713                        
714 mike             1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
715 mike             1.2   {
716 mday             1.25      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
717 alagaraja        1.75      AutoMutex autoMut(_entry_mut);
718 a.arora          1.73  
719                            /*
720 kumpf            1.116         Start at index = 1 because _entries[0] is the tickle entry which
721                                never needs to be EMPTY;
722 a.arora          1.73      */
723 w.otsuka         1.89      unsigned int index;
724 kumpf            1.116     for (index = 1; index < _entries.size(); index++)
725 mike             1.2       {
726 kumpf            1.116         if (_entries[index].socket == socket)
727                                {
728                                    _entries[index]._status = _MonitorEntry::EMPTY;
729                                    _entries[index].socket = PEGASUS_INVALID_SOCKET;
730                                    _solicitSocketCount--;
731                                    break;
732                                }
733 mike             1.2       }
734 a.arora          1.73  
735                            /*
736 kumpf            1.116         Dynamic Contraction:
737                                To remove excess entries we will start from the end of the _entries
738                                array and remove all entries with EMPTY status until we find the
739                                first NON EMPTY.  This prevents the positions, of the NON EMPTY
740                                entries, from being changed.
741 r.kieninger      1.83      */
742 a.arora          1.73      index = _entries.size() - 1;
743 kumpf            1.116     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
744                            {
745                                if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
746 a.arora          1.73                  _entries.remove(index);
747 kumpf            1.116         index--;
748 a.arora          1.73      }
749 kumpf            1.4       PEG_METHOD_EXIT();
750 mike             1.2   }
751                        
752 a.arora          1.73  // Note: this is no longer called with PEP 183.
753 kumpf            1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
754 mday             1.7   {
755 kumpf            1.116     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
756 marek            1.118     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
757 kumpf            1.116         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
758                                    "q = %p",
759                                dst->_entry_index,
760                                dst->_monitor->_entries[dst->_entry_index].queueId,
761 marek            1.118         dst));
762 kumpf            1.116 
763                            try
764                            {
765                                dst->run(1);
766                            }
767                            catch (...)
768                            {
769 marek            1.118         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
770 kumpf            1.116             "Monitor::_dispatch: exception received");
771                            }
772 marek            1.118     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
773                                "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
774 kumpf            1.116 
775                            PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
776                                _MonitorEntry::BUSY);
777                        
778                            // Once the HTTPConnection thread has set the status value to either
779                            // Monitor::DYING or Monitor::IDLE, it has returned control of the
780                            // connection to the Monitor.  It is no longer permissible to access the
781                            // connection or the entry in the _entries table.
782                            if (dst->_connectionClosePending)
783                            {
784                                dst->_monitor->_entries[dst->_entry_index]._status =
785                                    _MonitorEntry::DYING;
786                            }
787                            else
788                            {
789                                dst->_monitor->_entries[dst->_entry_index]._status =
790                                    _MonitorEntry::IDLE;
791                            }
792                            return 0;
793 mday             1.40  }
794                        
795 mike             1.2   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2