(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12            // 
 13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Brasher (mbrasher@bmc.com)
 25            //
 26            // Modified By:
 27            //
 28            //%/////////////////////////////////////////////////////////////////////////////
 29            
 30            #include <Pegasus/Common/Config.h>
 31 mday  1.40 
 32 mike  1.2  #include <cstring>
 33            #include "Monitor.h"
 34            #include "MessageQueue.h"
 35            #include "Socket.h"
 36 kumpf 1.4  #include <Pegasus/Common/Tracer.h>
 37 mday  1.7  #include <Pegasus/Common/HTTPConnection.h>
 38 mike  1.2  
 39            #ifdef PEGASUS_OS_TYPE_WINDOWS
 40            # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 41            #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 42            of <winsock.h>. It may have been indirectly included (e.g., by including \
 43 mday  1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
 44 mike  1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 45            otherwise, less than 64 clients (the default) will be able to connect to the \
 46            CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 47 mday  1.5  
 48 mike  1.2  # endif
 49            # define FD_SETSIZE 1024
 50 mday  1.5  # include <windows.h>
 51 mike  1.2  #else
 52            # include <sys/types.h>
 53            # include <sys/socket.h>
 54            # include <sys/time.h>
 55            # include <netinet/in.h>
 56            # include <netdb.h>
 57            # include <arpa/inet.h>
 58            #endif
 59            
 60            PEGASUS_USING_STD;
 61            
 62            PEGASUS_NAMESPACE_BEGIN
 63            
 64 mday  1.18 
 65 mday  1.25 static AtomicInt _connections = 0;
 66            
 67            
 68            static struct timeval create_time = {0, 1};
 69 mday  1.38 static struct timeval destroy_time = {300, 0};
 70 mday  1.26 static struct timeval deadlock_time = {0, 0};
 71 mday  1.18 
 72 mike  1.2  ////////////////////////////////////////////////////////////////////////////////
 73            //
 74            // MonitorRep
 75            //
 76            ////////////////////////////////////////////////////////////////////////////////
 77            
 78            struct MonitorRep
 79            {
 80                fd_set rd_fd_set;
 81                fd_set wr_fd_set;
 82                fd_set ex_fd_set;
 83                fd_set active_rd_fd_set;
 84                fd_set active_wr_fd_set;
 85                fd_set active_ex_fd_set;
 86            };
 87            
 88            ////////////////////////////////////////////////////////////////////////////////
 89            //
 90            // Monitor
 91            //
 92            ////////////////////////////////////////////////////////////////////////////////
 93 mike  1.2  
 94            Monitor::Monitor()
 95 mday  1.7     : _module_handle(0), _controller(0), _async(false)
 96 mike  1.2  {
 97                Socket::initializeInterface();
 98 mday  1.25     _rep = 0;
 99 mday  1.37     _entries.reserveCapacity(32);
100                for( int i = 0; i < 32; i++ )
101                {
102                   _MonitorEntry entry(0, 0, 0);
103                   _entries.append(entry);
104                }
105 mike  1.2  }
106            
107 mday  1.18 Monitor::Monitor(Boolean async)
108               : _module_handle(0), _controller(0), _async(async)
109            {
110                Socket::initializeInterface();
111 mday  1.25     _rep = 0;
112 mday  1.37     _entries.reserveCapacity(32);
113                for( int i = 0; i < 32; i++ )
114                {
115                   _MonitorEntry entry(0, 0, 0);
116                   _entries.append(entry);
117                }
118 mday  1.19     if( _async == true )
119                {
120                   _thread_pool = new ThreadPool(0, 
121            				     "Monitor", 
122 mday  1.38 				     0, 
123 mday  1.25 				     0,
124 mday  1.19 				     create_time, 
125            				     destroy_time, 
126            				     deadlock_time);
127                }
128 mday  1.20     else 
129                   _thread_pool = 0;
130 mday  1.18 }
131 mday  1.20 
132 mike  1.2  Monitor::~Monitor()
133            {
134 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
135                              "deregistering with module controller");
136 kumpf 1.10 
137 kumpf 1.11     if(_module_handle != NULL)
138 mday  1.8      {
139                   _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
140                   _controller = 0;
141 kumpf 1.10        delete _module_handle;
142 mday  1.8      }
143 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
144 mday  1.8     
145 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
146 mike  1.2      Socket::uninitializeInterface();
147 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
148                              "returning from monitor destructor");
149 mday  1.21     if(_async == true)
150 mday  1.20        delete _thread_pool;
151 mike  1.2  }
152            
153 mday  1.7  
154 mday  1.18 int Monitor::kill_idle_threads()
155            {
156               static struct timeval now, last;
157               gettimeofday(&now, NULL);
158 mday  1.20    int dead_threads = 0;
159 mday  1.18    
160 mday  1.38    if( now.tv_sec - last.tv_sec > 120 )
161 mday  1.18    {
162                  gettimeofday(&last, NULL);
163 mday  1.20       try 
164                  {
165            	 dead_threads =  _thread_pool->kill_dead_threads();
166                  }
167                  catch(IPCException& )
168                  {
169                  }
170                  
171 mday  1.18    }
172 mday  1.20    return dead_threads;
173 mday  1.18 }
174            
175 mday  1.7  
176 mike  1.2  Boolean Monitor::run(Uint32 milliseconds)
177            {
178 mday  1.18 
179 mday  1.25     Boolean handled_events = false;
180 mday  1.40      int i = 0;
181 mday  1.37     #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)
182 kumpf 1.36     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
183            #else
184 kumpf 1.35     struct timeval tv = {0, 1};
185            #endif
186 mday  1.25     fd_set fdread;
187                FD_ZERO(&fdread);
188 mday  1.37     _entry_mut.lock(pegasus_thread_self());
189 mday  1.13     
190 mday  1.25     for( int indx = 0; indx < (int)_entries.size(); indx++)
191 mike  1.2      {
192 mday  1.37        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
193 mday  1.25        {
194            	  FD_SET(_entries[indx].socket, &fdread);
195                   }
196 mday  1.13     }
197 mday  1.37 
198 mday  1.25     
199                int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
200            
201 mike  1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
202 mday  1.25     if(events && events != SOCKET_ERROR )
203 mike  1.2  #else
204 mday  1.25     if(events && events != -1 )
205 mike  1.2  #endif
206 mday  1.13     {
207 mday  1.25        for( int indx = 0; indx < (int)_entries.size(); indx++)
208                   {
209            	  if(FD_ISSET(_entries[indx].socket, &fdread))
210            	  {
211            	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
212            	     if(q == 0)
213            	     {
214 mday  1.37 		try
215            		{
216            		   _entries[indx]._status = _MonitorEntry::EMPTY;
217            		}
218            		catch(...)
219            		{
220            
221            		}
222            		continue;
223 mday  1.25 	     }
224 mday  1.37 	     try 
225 mday  1.25 	     {
226 mday  1.37 		if(_entries[indx]._type == Monitor::CONNECTION)
227            		{
228            		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
229            		   if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
230            		   {
231            		      _entries[indx]._status = _MonitorEntry::DYING;
232            		      MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
233            		      Message* message= new CloseConnectionMessage(_entries[indx].socket);
234            		      message->dest = o.getQueueId();
235            		      _entry_mut.unlock();
236            		      o.enqueue(message);
237            		      return true;
238            		   }
239            		   _entries[indx]._status = _MonitorEntry::BUSY;
240            		   _thread_pool->allocate_and_awaken((void *)q, _dispatch);
241            		}
242            		else
243 mday  1.25 		{
244 mday  1.37 		   int events = 0;
245            		   events |= SocketMessage::READ;
246            		   Message *msg = new SocketMessage(_entries[indx].socket, events);
247            		   _entries[indx]._status = _MonitorEntry::BUSY;
248            		   _entry_mut.unlock();
249 mday  1.27 
250 mday  1.37 		   q->enqueue(msg);
251            		   _entries[indx]._status = _MonitorEntry::IDLE;
252 mday  1.25 		   return true;
253            		}
254            	     }
255 mday  1.37 	     catch(...)
256 mday  1.25 	     {
257            	     }
258            	     handled_events = true;
259            	  }
260                   }
261 mday  1.24     }
262 mday  1.37     _entry_mut.unlock();
263 mday  1.13     return(handled_events);
264 mike  1.2  }
265            
266 mday  1.25 
267 mday  1.37 
268 mday  1.25 int  Monitor::solicitSocketMessages(
269 mike  1.2      Sint32 socket, 
270                Uint32 events,
271 mday  1.8      Uint32 queueId, 
272                int type)
273 mike  1.2  {
274 kumpf 1.4  
275 kumpf 1.31    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
276 mike  1.2  
277 mday  1.37    int index = -1;
278               _entry_mut.lock(pegasus_thread_self());
279 mday  1.25    
280               for(index = 0; index < (int)_entries.size(); index++)
281               {
282 mday  1.37       try 
283                  {
284            	 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
285            	 {
286            	    _entries[index].socket = socket;
287            	    _entries[index].queueId  = queueId;
288            	    _entries[index]._type = type;
289            	    _entries[index]._status = _MonitorEntry::IDLE;
290            	    _entry_mut.unlock();
291            	    
292            	    return index;
293            	 }
294                  }
295                  catch(...)
296 mday  1.25       {
297                  }
298 mday  1.37 
299 mday  1.25    }
300 mday  1.37       _entry_mut.unlock();
301 mday  1.25    PEG_METHOD_EXIT();
302               return index;
303 mike  1.2  }
304            
305 mday  1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
306 mike  1.2  {
307 mday  1.25     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
308 mday  1.37     _entry_mut.lock(pegasus_thread_self());
309 mday  1.27     
310 mday  1.25     for(int index = 0; index < (int)_entries.size(); index++)
311 mike  1.2      {
312 mday  1.25        if(_entries[index].socket == socket)
313                   {
314            	  _entries[index]._status = _MonitorEntry::EMPTY;
315 mday  1.37 	  break;
316 mday  1.25        }
317 mike  1.2      }
318 mday  1.37     _entry_mut.unlock();
319 kumpf 1.4      PEG_METHOD_EXIT();
320 mike  1.2  }
321            
322 mday  1.7  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
323            {
324 mday  1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
325 mday  1.37    
326 mday  1.25    dst->run(1);
327 mday  1.37    if(  dst->_monitor->_entries.size() > (Uint32)dst->_entry_index )
328                  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
329               
330 mday  1.8     return 0;
331 mday  1.40 }
332            
333            
334            
335            ////************************* monitor 2 *****************************////
336            
337            
338            monitor_2_entry::monitor_2_entry(void)
339              : type(UNTYPED)
340            {
341            }
342            
343            monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, monitor_2_entry_type _type)
344              : type(_type), psock(_psock)
345            {
346            
347            }
348            
349            monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
350            {
351              if(this != &e){
352 mday  1.40     psock = e.psock;
353                type = e.type;
354              }
355            }
356            
357            monitor_2_entry::~monitor_2_entry(void)
358            {
359            }
360            
361            monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
362            {
363              if(this != &e){
364                psock = e.psock;
365                type = e.type;
366              }
367              return *this;
368            }
369            
370            Boolean monitor_2_entry::operator ==(const monitor_2_entry& me)
371            {
372              if(this == &me)
373 mday  1.40     return true;
374              if((Sint32)this->psock == (Sint32)me.psock)
375                return true;
376              return false;
377            }
378            
379            Boolean monitor_2_entry::operator ==(void* k)
380            {
381              if((void *)this == k)
382                return true;
383              return false;
384            }
385            
386            
387            monitor_2_entry::operator pegasus_socket() const
388            {
389              return psock;
390            }
391            
392            
393            monitor_2::monitor_2(void)
394 mday  1.40   : _session_dispatch(0), _listeners(true, 0),_ready(true),  _die(0)
395            {
396              try {
397                
398                bsd_socket_factory _factory;
399            
400                // set up the listener/acceptor 
401                pegasus_socket temp = pegasus_socket(&_factory);
402                
403                temp.socket(PF_INET, SOCK_STREAM, 0);
404                // initialize the address
405                memset(&_tickle_addr, 0, sizeof(_tickle_addr));
406                _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
407                _tickle_addr.sin_family = PF_INET;
408                _tickle_addr.sin_port = 0;
409            
410                PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
411                
412                temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
413                temp.listen(3);  
414                temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
415 mday  1.40 
416                // set up the connector
417            
418                pegasus_socket tickler = pegasus_socket(&_factory);
419                tickler.socket(PF_INET, SOCK_STREAM, 0);
420                struct sockaddr_in _addr;
421                memset(&_addr, 0, sizeof(_addr));
422                _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
423                _addr.sin_family = PF_INET;
424                _addr.sin_port = 0;
425                tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
426                tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
427            
428                _tickler.psock = tickler;
429                _tickler.type = INTERNAL;
430            
431                struct sockaddr_in peer;
432                memset(&peer, 0, sizeof(peer));
433                PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
434            
435                pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
436 mday  1.40     monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL);
437                _listeners.insert_first(_tickle);
438            
439              }
440              catch(...){  }
441            }
442            
443            monitor_2::~monitor_2(void)
444            {
445             
446              
447            }
448            
449            
450            void monitor_2::run(void)
451            {
452              monitor_2_entry* temp;
453              while(_die.value() == 0) {
454                struct timeval tv = {0, 0};
455            
456                // place all sockets in the select set 
457 mday  1.40     FD_ZERO(&rd_fd_set);
458                try {
459                  _listeners.lock(pegasus_thread_self());
460                  temp = _listeners.next(0);
461                  while(temp != 0 ){
462            	FD_SET((Sint32)temp->psock, &rd_fd_set);
463            	temp = _listeners.next(temp);
464                  }
465                  _listeners.unlock();
466                } 
467                catch(...){
468                  return;
469                }
470            
471                int events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
472                try {
473                  _listeners.lock(pegasus_thread_self());
474                 
475                  temp = _listeners.next(0);
476                  while(temp != 0 ){
477            	if(FD_ISSET((Sint32)temp->psock, &rd_fd_set)) {
478 mday  1.40 	  FD_CLR((Sint32)temp->psock,  &rd_fd_set);
479            	  monitor_2_entry* entry = new monitor_2_entry(*temp);
480            	  _ready.insert_first((void*)entry);
481            	}
482            	temp = _listeners.next(temp);
483                  }
484                  _listeners.unlock();
485                } 
486                catch(...){
487                  return;
488                }
489            
490                // now handle the sockets that are ready to read 
491                _dispatch();
492              } // while alive 
493            }
494            
495            void* monitor_2::set_session_dispatch(void (*dp)(pegasus_socket&))
496            {
497              void* old = (void*)_session_dispatch;
498              _session_dispatch = dp;
499 mday  1.40   return old;
500            }
501            
502            
503            void monitor_2::_dispatch(void)
504            {
505              monitor_2_entry* entry = (monitor_2_entry*) _ready.remove_first();
506              while(entry != 0 ){
507                switch(entry->type) {
508                case INTERNAL:
509                  static char buffer[2];
510                  entry->psock.read(&buffer, 2);
511                  break;
512                case LISTEN:
513                  {
514            	static struct sockaddr peer;
515            	static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
516            	pegasus_socket connected = entry->psock.accept(&peer, &peer_size);
517            	add_entry(connected, SESSION);
518                  }
519                  break;
520 mday  1.40     case SESSION:
521                  if(_session_dispatch != 0 )
522            	_session_dispatch(entry->psock);
523                  else {
524            	static char buffer[4096];
525            	int bytes = entry->psock.read(&buffer, 4096);
526                  }
527                
528                  break;
529                
530                case UNTYPED:
531                default:
532                  break;
533                
534                }
535                delete entry;
536                entry = (monitor_2_entry*) _ready.remove_first();
537                
538              }
539            }
540            
541 mday  1.40 
542            void monitor_2::stop(void)
543            {
544              _die = 1;
545              tickle();
546              
547              // shut down the listener list, free the list nodes
548              _tickler.psock.close();
549              _listeners.shutdown_queue();
550            }
551            
552            void monitor_2::tickle(void)
553            {
554              static char _buffer[] = 
555                {
556                  '0','0'
557                };
558              
559              _tickler.psock.write(&_buffer, 2);
560            }
561            
562 mday  1.40 
563            Boolean monitor_2::add_entry(pegasus_socket& ps, monitor_2_entry_type type)
564            {
565              monitor_2_entry* m2e = new monitor_2_entry(ps, type);
566              
567              try{
568                _listeners.insert_first(m2e);
569              }
570              catch(...){
571                delete m2e;
572                return false;
573              }
574              tickle();
575              return true;
576            }
577            
578            Boolean monitor_2::remove_entry(Sint32 s)
579            {
580              monitor_2_entry* temp;
581              try {
582                _listeners.try_lock(pegasus_thread_self());
583 mday  1.40     temp = _listeners.next(0);
584                while(temp != 0){
585                  if(s == (Sint32)temp->psock ){
586            	temp = _listeners.remove_no_lock(temp);
587            	delete temp;
588            	_listeners.unlock();
589            	return true;
590                  }
591                  temp = _listeners.next(temp);
592                }
593                _listeners.unlock();
594              }
595              catch(...){
596              }
597              return false;
598 mday  1.7  }
599 mday  1.37 
600 mday  1.7  
601 mike  1.2  
602            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2