(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12            // 
 13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Brasher (mbrasher@bmc.com)
 25            //
 26 mday  1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com 
 27 mike  1.2  //
 28            //%/////////////////////////////////////////////////////////////////////////////
 29            
 30            #include <Pegasus/Common/Config.h>
 31 mday  1.40 
 32 mike  1.2  #include <cstring>
 33            #include "Monitor.h"
 34            #include "MessageQueue.h"
 35            #include "Socket.h"
 36 kumpf 1.4  #include <Pegasus/Common/Tracer.h>
 37 mday  1.7  #include <Pegasus/Common/HTTPConnection.h>
 38 mike  1.2  
 39            #ifdef PEGASUS_OS_TYPE_WINDOWS
 40            # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 41            #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 42            of <winsock.h>. It may have been indirectly included (e.g., by including \
 43 mday  1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
 44 mike  1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 45            otherwise, less than 64 clients (the default) will be able to connect to the \
 46            CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 47 mday  1.5  
 48 mike  1.2  # endif
 49            # define FD_SETSIZE 1024
 50 mday  1.5  # include <windows.h>
 51 mike  1.2  #else
 52            # include <sys/types.h>
 53            # include <sys/socket.h>
 54            # include <sys/time.h>
 55            # include <netinet/in.h>
 56            # include <netdb.h>
 57            # include <arpa/inet.h>
 58            #endif
 59            
 60            PEGASUS_USING_STD;
 61            
 62            PEGASUS_NAMESPACE_BEGIN
 63            
 64 mday  1.18 
 65 mday  1.25 static AtomicInt _connections = 0;
 66            
 67            
 68            static struct timeval create_time = {0, 1};
 69 mday  1.38 static struct timeval destroy_time = {300, 0};
 70 mday  1.26 static struct timeval deadlock_time = {0, 0};
 71 mday  1.18 
 72 mike  1.2  ////////////////////////////////////////////////////////////////////////////////
 73            //
 74            // MonitorRep
 75            //
 76            ////////////////////////////////////////////////////////////////////////////////
 77            
 78            struct MonitorRep
 79            {
 80                fd_set rd_fd_set;
 81                fd_set wr_fd_set;
 82                fd_set ex_fd_set;
 83                fd_set active_rd_fd_set;
 84                fd_set active_wr_fd_set;
 85                fd_set active_ex_fd_set;
 86            };
 87            
 88            ////////////////////////////////////////////////////////////////////////////////
 89            //
 90            // Monitor
 91            //
 92            ////////////////////////////////////////////////////////////////////////////////
 93 mike  1.2  
 94 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 95 mike  1.2  Monitor::Monitor()
 96 kumpf 1.48    : _module_handle(0), _controller(0), _async(false), _stopConnections(0)
 97 mike  1.2  {
 98 kumpf 1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
 99 mike  1.2      Socket::initializeInterface();
100 mday  1.25     _rep = 0;
101 kumpf 1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
102                for( int i = 0; i < numberOfMonitorEntriesToAllocate; i++ )
103 mday  1.37     {
104                   _MonitorEntry entry(0, 0, 0);
105                   _entries.append(entry);
106                }
107 mike  1.2  }
108            
109 mday  1.18 Monitor::Monitor(Boolean async)
110 kumpf 1.48    : _module_handle(0), _controller(0), _async(async), _stopConnections(0)
111 mday  1.18 {
112 kumpf 1.54     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
113 mday  1.18     Socket::initializeInterface();
114 mday  1.25     _rep = 0;
115 kumpf 1.54     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
116                for( int i = 0; i < numberOfMonitorEntriesToAllocate; i++ )
117 mday  1.37     {
118                   _MonitorEntry entry(0, 0, 0);
119                   _entries.append(entry);
120                }
121 mday  1.19     if( _async == true )
122                {
123                   _thread_pool = new ThreadPool(0, 
124            				     "Monitor", 
125 mday  1.38 				     0, 
126 mday  1.25 				     0,
127 mday  1.19 				     create_time, 
128            				     destroy_time, 
129            				     deadlock_time);
130                }
131 mday  1.20     else 
132                   _thread_pool = 0;
133 mday  1.18 }
134 mday  1.20 
135 mike  1.2  Monitor::~Monitor()
136            {
137 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
138                              "deregistering with module controller");
139 kumpf 1.10 
140 kumpf 1.11     if(_module_handle != NULL)
141 mday  1.8      {
142                   _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
143                   _controller = 0;
144 kumpf 1.10        delete _module_handle;
145 mday  1.8      }
146 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
147 kumpf 1.48 
148 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
149 mike  1.2      Socket::uninitializeInterface();
150 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
151                              "returning from monitor destructor");
152 mday  1.21     if(_async == true)
153 mday  1.20        delete _thread_pool;
154 mike  1.2  }
155            
156 mday  1.7  
157 mday  1.18 int Monitor::kill_idle_threads()
158            {
159               static struct timeval now, last;
160               gettimeofday(&now, NULL);
161 mday  1.20    int dead_threads = 0;
162 mday  1.18    
163 mday  1.38    if( now.tv_sec - last.tv_sec > 120 )
164 mday  1.18    {
165                  gettimeofday(&last, NULL);
166 mday  1.20       try 
167                  {
168            	 dead_threads =  _thread_pool->kill_dead_threads();
169                  }
170                  catch(IPCException& )
171                  {
172                  }
173                  
174 mday  1.18    }
175 mday  1.20    return dead_threads;
176 mday  1.18 }
177            
178 mday  1.7  
179 mike  1.2  Boolean Monitor::run(Uint32 milliseconds)
180            {
181 mday  1.18 
182 mday  1.25     Boolean handled_events = false;
183 mday  1.40      int i = 0;
184 mday  1.37     #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)
185 kumpf 1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
186            #else
187 kumpf 1.35     struct timeval tv = {0, 1};
188            #endif
189 mday  1.25     fd_set fdread;
190                FD_ZERO(&fdread);
191 mday  1.37     _entry_mut.lock(pegasus_thread_self());
192 mday  1.13     
193 kumpf 1.48     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries  
194                if (_stopConnections == 1) 
195                {
196                    for ( int indx = 0; indx < (int)_entries.size(); indx++)
197                    {
198                        if (_entries[indx]._type == Monitor::ACCEPTOR)
199                        {
200                            if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
201                            {
202                               if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
203                                    _entries[indx]._status.value() == _MonitorEntry::DYING )
204                               {
205                                   // remove the entry
206            		       _entries[indx]._status = _MonitorEntry::EMPTY;
207                               }
208                               else
209                               {
210                                   // set status to DYING
211 kumpf 1.52                       _entries[indx]._status = _MonitorEntry::DYING;
212 kumpf 1.48                    }
213                           }
214                       }
215                    }
216                    _stopConnections = 0;
217                }
218 kumpf 1.51 
219                Uint32 _idleEntries = 0;
220 kumpf 1.48     
221 mday  1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
222 mike  1.2      {
223 mday  1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
224 mday  1.25        {
225 kumpf 1.51 	  _idleEntries++;
226 mday  1.25 	  FD_SET(_entries[indx].socket, &fdread);
227                   }
228 mday  1.13     }
229 kumpf 1.51    
230                _entry_mut.unlock(); 
231 mday  1.25     int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
232 kumpf 1.51    _entry_mut.lock(pegasus_thread_self());
233 mday  1.25 
234 mike  1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
235 kumpf 1.50     if(events == SOCKET_ERROR)
236 mike  1.2  #else
237 kumpf 1.50     if(events == -1)
238 mike  1.2  #endif
239 mday  1.13     {
240 kumpf 1.50        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
241                      "Monitor::run - errorno = %d has occurred on select.", errno);
242                   // The EBADF error indicates that one or more or the file
243                   // descriptions was not valid. This could indicate that
244                   // the _entries structure has been corrupted or that
245                   // we have a synchronization error.
246            
247                   PEGASUS_ASSERT(errno != EBADF);
248                }
249                else if (events)
250                {
251 kumpf 1.51        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
252                      "Monitor::run select event received events = %d, monitoring %d idle entries", 
253            	   events, _idleEntries);
254 mday  1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
255                   {
256 kumpf 1.53           // The Monitor should only look at entries in the table that are IDLE (i.e.,
257                      // owned by the Monitor).
258            	  if((_entries[indx]._status.value() == _MonitorEntry::IDLE) && 
259            	     (FD_ISSET(_entries[indx].socket, &fdread)))
260 mday  1.25 	  {
261            	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
262 kumpf 1.53              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
263                              "Monitor::run indx = %d, queueId =  %d, q = %p",
264                              indx, _entries[indx].queueId, q);
265                         PEGASUS_ASSERT(q !=0);
266 mday  1.37 
267            	     try 
268 mday  1.25 	     {
269 mday  1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
270            		{
271 kumpf 1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
272                                 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
273 mday  1.37 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
274            		   if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
275            		   {
276 kumpf 1.51                       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
277                                      "Monitor::run processing dying value > 0 for indx = %d, connection being closed.",
278                                      indx);
279 mday  1.37 		      _entries[indx]._status = _MonitorEntry::DYING;
280            		      MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
281            		      Message* message= new CloseConnectionMessage(_entries[indx].socket);
282            		      message->dest = o.getQueueId();
283            		      _entry_mut.unlock();
284            		      o.enqueue(message);
285            		      return true;
286            		   }
287            		   _entries[indx]._status = _MonitorEntry::BUSY;
288            		   _thread_pool->allocate_and_awaken((void *)q, _dispatch);
289            		}
290            		else
291 mday  1.25 		{
292 kumpf 1.51                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
293                                 "Non-connection entry, indx = %d, has been received.", indx);
294 mday  1.37 		   int events = 0;
295            		   events |= SocketMessage::READ;
296            		   Message *msg = new SocketMessage(_entries[indx].socket, events);
297            		   _entries[indx]._status = _MonitorEntry::BUSY;
298            		   _entry_mut.unlock();
299 mday  1.27 
300 mday  1.37 		   q->enqueue(msg);
301            		   _entries[indx]._status = _MonitorEntry::IDLE;
302 mday  1.25 		   return true;
303            		}
304            	     }
305 mday  1.37 	     catch(...)
306 mday  1.25 	     {
307            	     }
308            	     handled_events = true;
309            	  }
310                   }
311 mday  1.24     }
312 mday  1.37     _entry_mut.unlock();
313 mday  1.13     return(handled_events);
314 mike  1.2  }
315            
316 kumpf 1.48 void Monitor::stopListeningForConnections()
317            {
318                PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
319            
320                _stopConnections = 1;
321            
322                PEG_METHOD_EXIT();
323            }
324 mday  1.25 
325 mday  1.37 
326 mday  1.25 int  Monitor::solicitSocketMessages(
327 mike  1.2      Sint32 socket, 
328                Uint32 events,
329 mday  1.8      Uint32 queueId, 
330                int type)
331 mike  1.2  {
332 kumpf 1.4  
333 kumpf 1.31    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
334 mike  1.2  
335 mday  1.37    _entry_mut.lock(pegasus_thread_self());
336 mday  1.25    
337 kumpf 1.50    for(int index = 0; index < (int)_entries.size(); index++)
338 mday  1.25    {
339 mday  1.37       try 
340                  {
341            	 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
342            	 {
343            	    _entries[index].socket = socket;
344            	    _entries[index].queueId  = queueId;
345            	    _entries[index]._type = type;
346            	    _entries[index]._status = _MonitorEntry::IDLE;
347            	    _entry_mut.unlock();
348            	    
349            	    return index;
350            	 }
351                  }
352                  catch(...)
353 mday  1.25       {
354                  }
355 mday  1.37 
356 mday  1.25    }
357 kumpf 1.50    _entry_mut.unlock();
358 mday  1.25    PEG_METHOD_EXIT();
359 kumpf 1.50    return -1;
360 mike  1.2  }
361            
362 mday  1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
363 mike  1.2  {
364 kumpf 1.50 
365 mday  1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
366 mday  1.37     _entry_mut.lock(pegasus_thread_self());
367 mday  1.27     
368 mday  1.25     for(int index = 0; index < (int)_entries.size(); index++)
369 mike  1.2      {
370 mday  1.25        if(_entries[index].socket == socket)
371                   {
372            	  _entries[index]._status = _MonitorEntry::EMPTY;
373 kumpf 1.53 	  _entries[index].socket = -1;
374 mday  1.37 	  break;
375 mday  1.25        }
376 mike  1.2      }
377 mday  1.37     _entry_mut.unlock();
378 kumpf 1.4      PEG_METHOD_EXIT();
379 mike  1.2  }
380            
381 mday  1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
382            {
383 mday  1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
384 kumpf 1.51    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
385 kumpf 1.53         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",
386                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
387 kumpf 1.51    try
388               {
389                  dst->run(1);
390               }
391               catch (...)
392               {
393                  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
394                      "Monitor::_dispatch: exception received");
395               }
396               Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
397                      "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
398               
399 kumpf 1.50    dst->_monitor->_entry_mut.lock(pegasus_thread_self());
400               // It shouldn't be necessary to set status = _MonitorEntry::IDLE
401               // if the connection is being closed.  However, the current logic
402               // in Monitor::run requires this value to be set for the close
403               // to be processed. 
404 kumpf 1.53    
405               PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
406 kumpf 1.50    dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
407               if (dst->_connectionClosePending)
408               {
409                  dst->_dying = 1;
410               }
411               dst->_monitor->_entry_mut.unlock();
412 mday  1.8     return 0;
413 mday  1.40 }
414            
415            
416            
417            ////************************* monitor 2 *****************************////
418 mday  1.43 ////************************* monitor 2 *****************************////
419            ////************************* monitor 2 *****************************////
420            ////************************* monitor 2 *****************************////
421            ////************************* monitor 2 *****************************////
422            ////************************* monitor 2 *****************************////
423            ////************************* monitor 2 *****************************////
424 mday  1.40 
425            
426 mday  1.42 m2e_rep::m2e_rep(void)
427 mday  1.43   :Base(), state(IDLE)
428            
429 mday  1.42 {
430            }
431            
432            m2e_rep::m2e_rep(monitor_2_entry_type _type, 
433            		 pegasus_socket _sock, 
434            		 void* _accept, 
435            		 void* _dispatch)
436 mday  1.43   : Base(), type(_type), state(IDLE), psock(_sock), 
437 mday  1.42     accept_parm(_accept), dispatch_parm(_dispatch)
438            {
439              
440            }
441            
442            m2e_rep::~m2e_rep(void)
443            {
444            }
445            
446            m2e_rep::m2e_rep(const m2e_rep& r)
447              : Base()
448            {
449              if(this != &r){
450                type = r.type;
451                psock = r.psock;
452                accept_parm = r.accept_parm;
453                dispatch_parm = r.dispatch_parm;
454 mday  1.43     state = IDLE;
455                
456 mday  1.42   }
457            }
458            
459            
460            m2e_rep& m2e_rep::operator =(const m2e_rep& r)
461            {
462              if(this != &r) {
463                type = r.type;
464                psock = r.psock;
465                accept_parm = r.accept_parm;
466                dispatch_parm = r.dispatch_parm;
467 mday  1.43     state = IDLE;
468 mday  1.42   }
469              return *this;
470            }
471            
472            Boolean m2e_rep::operator ==(const m2e_rep& r)
473            {
474              if(this == &r)
475                return true;
476              return false;
477            }
478            
479            Boolean m2e_rep::operator ==(void* r)
480            {
481              if((void*)this == r)
482                return true;
483              return false;
484            }
485            
486            m2e_rep::operator pegasus_socket() const 
487            {
488              return psock;
489 mday  1.42 }
490            
491            
492 mday  1.40 monitor_2_entry::monitor_2_entry(void)
493            {
494 mday  1.42   _rep = new m2e_rep();
495 mday  1.40 }
496            
497 mday  1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, 
498            				 monitor_2_entry_type _type, 
499            				 void* _accept_parm, void* _dispatch_parm)
500 mday  1.40 {
501 mday  1.42   _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
502 mday  1.40 }
503            
504            monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
505            {
506              if(this != &e){
507 mday  1.42     Inc(this->_rep = e._rep);
508 mday  1.40   }
509            }
510            
511            monitor_2_entry::~monitor_2_entry(void)
512            {
513 mday  1.42   Dec(_rep);
514 mday  1.40 }
515            
516            monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
517            {
518              if(this != &e){
519 mday  1.42     Dec(_rep);
520                Inc(this->_rep = e._rep);
521 mday  1.40   }
522              return *this;
523            }
524            
525 mday  1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
526 mday  1.40 {
527              if(this == &me)
528                return true;
529              return false;
530            }
531            
532 mday  1.42 Boolean monitor_2_entry::operator ==(void* k) const
533 mday  1.40 {
534              if((void *)this == k)
535                return true;
536              return false;
537            }
538            
539            
540 mday  1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
541 mday  1.40 {
542 mday  1.42   return _rep->type;
543            }
544            
545            void monitor_2_entry::set_type(monitor_2_entry_type t)
546            {
547              _rep->type = t;
548            }
549            
550            
551 mday  1.43 monitor_2_entry_state  monitor_2_entry::get_state(void) const
552            {
553              return (monitor_2_entry_state) _rep->state.value();
554            }
555            
556            void monitor_2_entry::set_state(monitor_2_entry_state t)
557            {
558              _rep->state = t;
559            }
560            
561 mday  1.42 void* monitor_2_entry::get_accept(void) const
562            {
563              return _rep->accept_parm;
564            }
565            
566            void monitor_2_entry::set_accept(void* a)
567            {
568              _rep->accept_parm = a;
569            }
570            
571            
572            void* monitor_2_entry::get_dispatch(void) const
573            {
574              return _rep->dispatch_parm;
575            }
576            
577            void monitor_2_entry::set_dispatch(void* a)
578            {
579              _rep->dispatch_parm = a;
580            }
581            
582 mday  1.42 pegasus_socket monitor_2_entry::get_sock(void) const
583            {
584              return _rep->psock;
585            }
586            
587            
588            void monitor_2_entry::set_sock(pegasus_socket& s)
589            {
590              _rep->psock = s;
591              
592 mday  1.40 }
593            
594            
595 mday  1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
596            
597            
598 mday  1.40 monitor_2::monitor_2(void)
599 mday  1.42   : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0), 
600 mday  1.49     _ready(true, 0), _die(0), _requestCount(0)
601 mday  1.40 {
602              try {
603                
604                bsd_socket_factory _factory;
605            
606                // set up the listener/acceptor 
607                pegasus_socket temp = pegasus_socket(&_factory);
608                
609                temp.socket(PF_INET, SOCK_STREAM, 0);
610                // initialize the address
611                memset(&_tickle_addr, 0, sizeof(_tickle_addr));
612 marek 1.47 #ifdef PEGASUS_OS_ZOS
613                _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
614            #else
615 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
616            #pragma convert(37)
617            #endif
618 mday  1.40     _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
619 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
620            #pragma convert(0)
621            #endif
622 marek 1.47 #endif
623 mday  1.40     _tickle_addr.sin_family = PF_INET;
624                _tickle_addr.sin_port = 0;
625            
626                PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
627                
628                temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
629                temp.listen(3);  
630                temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
631            
632                // set up the connector
633            
634                pegasus_socket tickler = pegasus_socket(&_factory);
635                tickler.socket(PF_INET, SOCK_STREAM, 0);
636                struct sockaddr_in _addr;
637                memset(&_addr, 0, sizeof(_addr));
638 kumpf 1.48 #ifdef PEGASUS_OS_ZOS
639 marek 1.47     _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
640            #else
641 mday  1.40     _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
642 marek 1.47 #endif
643 mday  1.40     _addr.sin_family = PF_INET;
644                _addr.sin_port = 0;
645                tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
646                tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
647            
648 mday  1.42     _tickler.set_sock(tickler);
649                _tickler.set_type(INTERNAL);
650 mday  1.43     _tickler.set_state(BUSY);
651                
652 mday  1.40     struct sockaddr_in peer;
653                memset(&peer, 0, sizeof(peer));
654                PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
655            
656                pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
657 mday  1.42     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
658 mday  1.43     _tickle->set_state(BUSY);
659                
660 mday  1.40     _listeners.insert_first(_tickle);
661            
662              }
663              catch(...){  }
664            }
665            
666            monitor_2::~monitor_2(void)
667            {
668 mday  1.41   try {
669                monitor_2_entry* temp = _listeners.remove_first();
670                while(temp){
671                  delete temp;
672                  temp = _listeners.remove_first();
673                }
674              }
675              catch(...){  }
676 mday  1.40 }
677            
678            
679            void monitor_2::run(void)
680            {
681              monitor_2_entry* temp;
682              while(_die.value() == 0) {
683 mday  1.49      
684 mday  1.45      struct timeval tv = {0, 0};
685 mday  1.40 
686                // place all sockets in the select set 
687                FD_ZERO(&rd_fd_set);
688                try {
689                  _listeners.lock(pegasus_thread_self());
690                  temp = _listeners.next(0);
691                  while(temp != 0 ){
692 mday  1.43 	if(temp->get_state() == CLOSED ){
693            	  monitor_2_entry* closed = temp;
694            	  temp = _listeners.next(closed);
695            	  _listeners.remove_no_lock(closed);
696 mday  1.49 	  HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
697            	  delete cn;
698 mday  1.43 	  delete closed;
699            	}
700 mday  1.45 	if(temp == 0)
701            	   break;
702 mday  1.46 	Sint32 fd = (Sint32) temp->get_sock();
703            	if(fd >= 0 )
704            	   FD_SET(fd , &rd_fd_set);
705 mday  1.40 	temp = _listeners.next(temp);
706                  }
707                  _listeners.unlock();
708                } 
709                catch(...){
710                  return;
711                }
712 mday  1.42     // important -  the dispatch routine has pointers to all the 
713                // entries that are readable. These entries can be changed but 
714                // the pointer must not be tampered with. 
715 mday  1.40 
716                int events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
717                try {
718                  _listeners.lock(pegasus_thread_self());
719                  temp = _listeners.next(0);
720                  while(temp != 0 ){
721 mday  1.42 	Sint32 fd = (Sint32) temp->get_sock();
722 mday  1.46 	if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
723 mday  1.43 	  temp->set_state(BUSY);
724 mday  1.42 	  FD_CLR(fd,  &rd_fd_set);
725            	  monitor_2_entry* ready = new monitor_2_entry(*temp);
726 mday  1.49 	  try 
727            	  {
728            	     _ready.insert_first(ready);
729            	  }
730            	  catch(...)
731            	  {
732            	  }
733            	  
734 mday  1.42 	  _requestCount++;
735 mday  1.40 	}
736            	temp = _listeners.next(temp);
737                  }
738                  _listeners.unlock();
739                } 
740                catch(...){
741                  return;
742                }
743                // now handle the sockets that are ready to read 
744                _dispatch();
745              } // while alive 
746            }
747            
748 mday  1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
749 mday  1.40 {
750 mday  1.42   void* old = (void *)_session_dispatch;
751 mday  1.40   _session_dispatch = dp;
752              return old;
753            }
754            
755 mday  1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
756            {
757              void* old = (void*)_accept_dispatch;
758              _accept_dispatch = dp;
759              return old;
760              
761            }
762            
763 mday  1.40 
764 mday  1.42 // important -  the dispatch routine has pointers to all the 
765            // entries that are readable. These entries can be changed but 
766            // the pointer must not be tampered with. 
767 mday  1.40 void monitor_2::_dispatch(void)
768            {
769 mday  1.49    monitor_2_entry* entry;
770               
771               if(_ready.count() == 0 )
772                  return;
773               
774                  
775               try 
776               {
777            
778            	 entry = _ready.remove_first();
779               }
780               catch(...)
781               {
782               }
783               
784              while(entry != 0 ) {
785 mday  1.42     switch(entry->get_type()) {
786 mday  1.40     case INTERNAL:
787                  static char buffer[2];
788 mday  1.49       entry->get_sock().disableBlocking();
789 mday  1.42       entry->get_sock().read(&buffer, 2);
790 mday  1.49       entry->get_sock().enableBlocking();
791 mday  1.40       break;
792                case LISTEN:
793                  {
794            	static struct sockaddr peer;
795            	static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
796 mday  1.49 	entry->get_sock().disableBlocking();
797 mday  1.42 	pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
798 mday  1.49 	entry->get_sock().enableBlocking();
799 mday  1.42 	monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
800            	if(temp && _accept_dispatch != 0)
801 mday  1.49 	   _accept_dispatch(temp);
802 mday  1.40       }
803                  break;
804                case SESSION:
805                  if(_session_dispatch != 0 )
806 mday  1.42 	_session_dispatch(entry);
807 mday  1.40       else {
808            	static char buffer[4096];
809 mday  1.42 	int bytes = entry->get_sock().read(&buffer, 4096);
810 mday  1.40       }
811                
812                  break;
813                case UNTYPED:
814                default:
815                  break;
816                }
817 mday  1.42     _requestCount--;
818 mday  1.40     delete entry;
819 mday  1.49     
820                if(_ready.count() == 0 )
821                   break;
822                
823                try 
824                {
825                   entry = _ready.remove_first();
826                }
827                catch(...)
828                {
829                }
830                
831 mday  1.40   }
832            }
833            
834            void monitor_2::stop(void)
835            {
836              _die = 1;
837              tickle();
838              
839              // shut down the listener list, free the list nodes
840 mday  1.42   _tickler.get_sock().close();
841 mday  1.40   _listeners.shutdown_queue();
842            }
843            
844            void monitor_2::tickle(void)
845            {
846              static char _buffer[] = 
847                {
848                  '0','0'
849                };
850              
851 mday  1.42   _tickler.get_sock().write(&_buffer, 2);
852 mday  1.40 }
853            
854            
855 mday  1.42 monitor_2_entry*  monitor_2::add_entry(pegasus_socket& ps, 
856            				       monitor_2_entry_type type,
857            				       void* accept_parm, 
858            				       void* dispatch_parm)
859 mday  1.40 {
860 mday  1.42   monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
861 mday  1.40   
862              try{
863                _listeners.insert_first(m2e);
864              }
865              catch(...){
866                delete m2e;
867 mday  1.42     return 0;
868 mday  1.40   }
869              tickle();
870 mday  1.42   return m2e;
871 mday  1.40 }
872            
873            Boolean monitor_2::remove_entry(Sint32 s)
874            {
875              monitor_2_entry* temp;
876              try {
877                _listeners.try_lock(pegasus_thread_self());
878                temp = _listeners.next(0);
879                while(temp != 0){
880 mday  1.42       if(s == (Sint32)temp->_rep->psock ){
881 mday  1.40 	temp = _listeners.remove_no_lock(temp);
882            	delete temp;
883            	_listeners.unlock();
884            	return true;
885                  }
886                  temp = _listeners.next(temp);
887                }
888                _listeners.unlock();
889              }
890              catch(...){
891              }
892              return false;
893 mday  1.7  }
894 mday  1.37 
895 mday  1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
896            {
897              return _requestCount.value();
898              
899 mday  1.49 }
900            
901            
902            HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
903            {
904            
905               HTTPConnection2* temp;
906               try 
907               {
908                  monitor_2::_connections.lock(pegasus_thread_self());
909                  temp = monitor_2::_connections.next(0);
910                  while(temp != 0 )
911                  {
912            	 if(sock == temp->getSocket())
913            	 {
914            	    temp = monitor_2::_connections.remove_no_lock(temp);
915            	    monitor_2::_connections.unlock();
916            	    return temp;
917            	 }
918            	 temp = monitor_2::_connections.next(temp);
919                  }
920 mday  1.49       monitor_2::_connections.unlock();
921               }
922               catch(...)
923               {
924               }
925               return 0;
926            }
927            
928            Boolean monitor_2::insert_connection(HTTPConnection2* connection)
929            {
930               try 
931               {
932                  monitor_2::_connections.insert_first(connection);
933               }
934               catch(...)
935               {
936                  return false;
937               }
938               return true;
939 mday  1.42 }
940 mday  1.7  
941 mike  1.2  
942            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2