(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12            // 
 13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Brasher (mbrasher@bmc.com)
 25            //
 26            // Modified By:
 27            //
 28            //%/////////////////////////////////////////////////////////////////////////////
 29            
 30            #include <Pegasus/Common/Config.h>
 31            #include <cstring>
 32            #include "Monitor.h"
 33            #include "MessageQueue.h"
 34            #include "Socket.h"
 35 kumpf 1.4  #include <Pegasus/Common/Tracer.h>
 36 mday  1.7  #include <Pegasus/Common/HTTPConnection.h>
 37 mike  1.2  
 38            #ifdef PEGASUS_OS_TYPE_WINDOWS
 39            # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 40            #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 41            of <winsock.h>. It may have been indirectly included (e.g., by including \
 42            <windows.h>). Find the inclusion of that header which is visible to this \
 43            compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 44            otherwise, less than 64 clients (the default) will be able to connect to the \
 45            CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 46 mday  1.5  
 47 mike  1.2  # endif
 48            # define FD_SETSIZE 1024
 49 mday  1.5  # include <windows.h>
 50 mike  1.2  #else
 51            # include <sys/types.h>
 52            # include <sys/socket.h>
 53            # include <sys/time.h>
 54            # include <netinet/in.h>
 55            # include <netdb.h>
 56            # include <arpa/inet.h>
 57            # include <unistd.h>
 58            #endif
 59            
 60            PEGASUS_USING_STD;
 61            
 62            PEGASUS_NAMESPACE_BEGIN
 63            
 64 mday  1.18 
 65 mday  1.19 static struct timeval create_time = {0, 10};
 66            static struct timeval destroy_time = {5, 0};
 67 mday  1.20 static struct timeval deadlock_time = {1000, 0};
 68 mday  1.18 
 69 mike  1.2  ////////////////////////////////////////////////////////////////////////////////
 70            //
 71            // MonitorRep
 72            //
 73            ////////////////////////////////////////////////////////////////////////////////
 74            
 75            struct MonitorRep
 76            {
 77                fd_set rd_fd_set;
 78                fd_set wr_fd_set;
 79                fd_set ex_fd_set;
 80                fd_set active_rd_fd_set;
 81                fd_set active_wr_fd_set;
 82                fd_set active_ex_fd_set;
 83            };
 84            
 85            ////////////////////////////////////////////////////////////////////////////////
 86            //
 87            // Monitor
 88            //
 89            ////////////////////////////////////////////////////////////////////////////////
 90 mike  1.2  
 91            Monitor::Monitor()
 92 mday  1.7     : _module_handle(0), _controller(0), _async(false)
 93 mike  1.2  {
 94                Socket::initializeInterface();
 95                _rep = new MonitorRep;
 96                FD_ZERO(&_rep->rd_fd_set);
 97                FD_ZERO(&_rep->wr_fd_set);
 98                FD_ZERO(&_rep->ex_fd_set);
 99                FD_ZERO(&_rep->active_rd_fd_set);
100                FD_ZERO(&_rep->active_wr_fd_set);
101                FD_ZERO(&_rep->active_ex_fd_set);
102            }
103            
104 mday  1.18 Monitor::Monitor(Boolean async)
105               : _module_handle(0), _controller(0), _async(async)
106            {
107                Socket::initializeInterface();
108                _rep = new MonitorRep;
109                FD_ZERO(&_rep->rd_fd_set);
110                FD_ZERO(&_rep->wr_fd_set);
111                FD_ZERO(&_rep->ex_fd_set);
112                FD_ZERO(&_rep->active_rd_fd_set);
113                FD_ZERO(&_rep->active_wr_fd_set);
114                FD_ZERO(&_rep->active_ex_fd_set);
115 mday  1.19     if( _async == true )
116                {
117                   _thread_pool = new ThreadPool(0, 
118            				     "Monitor", 
119            				     0, 
120            				     20,
121            				     create_time, 
122            				     destroy_time, 
123            				     deadlock_time);
124                }
125 mday  1.20     else 
126                   _thread_pool = 0;
127 mday  1.18 }
128 mday  1.20 
129 mike  1.2  Monitor::~Monitor()
130            {
131 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
132                              "deregistering with module controller");
133 kumpf 1.10 
134 kumpf 1.11     if(_module_handle != NULL)
135 mday  1.8      {
136                   _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
137                   _controller = 0;
138 kumpf 1.10        delete _module_handle;
139 mday  1.8      }
140 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
141 mday  1.8     
142 kumpf 1.6      delete _rep;
143 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
144 mike  1.2      Socket::uninitializeInterface();
145 kumpf 1.11     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
146                              "returning from monitor destructor");
147 mday  1.21     if(_async == true)
148 mday  1.20        delete _thread_pool;
149 mike  1.2  }
150            
151 mday  1.7  
152 mday  1.18 int Monitor::kill_idle_threads()
153            {
154               static struct timeval now, last;
155               gettimeofday(&now, NULL);
156 mday  1.20    int dead_threads = 0;
157 mday  1.18    
158               if( now.tv_sec - last.tv_sec > 0 )
159               {
160                  gettimeofday(&last, NULL);
161 mday  1.20       try 
162                  {
163            	 
164            	 dead_threads =  _thread_pool->kill_dead_threads();
165                  }
166                  catch(IPCException& )
167                  {
168                  }
169                  
170 mday  1.18    }
171 mday  1.20    return dead_threads;
172 mday  1.18 }
173            
174 mday  1.7  
175            //<<< Tue May 14 20:38:26 2002 mdd >>>
176            //  register with module controller
177            //  when it is time to enqueue the message, 
178            // use an async_thread_exec call to 
179            // isolate the entire if(events) { enqueue -> fd_clear } block
180            //  let the thread pool grow and shrink according to load. 
181            
182 mike  1.2  Boolean Monitor::run(Uint32 milliseconds)
183            {
184 mday  1.18 
185 mike  1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
186            
187                // Windows select() has a strange little bug. It returns immediately if
188                // there are no descriptors in the set even if the timeout is non-zero.
189                // To work around this, we call Sleep() for now:
190            
191                if (_entries.size() == 0)
192            	Sleep(milliseconds);
193 mday  1.18     
194 mike  1.2  #endif
195 mday  1.18     
196 mike  1.2      // Check for events on the selected file descriptors. Only do this if
197                // there were no undispatched events from last time.
198            
199 mday  1.8      int count = 0;
200 mday  1.13 
201                memcpy(&_rep->active_rd_fd_set, &_rep->rd_fd_set, sizeof(fd_set));
202 mday  1.16 //    memcpy(&_rep->active_wr_fd_set, &_rep->wr_fd_set, sizeof(fd_set));
203 mday  1.13     memcpy(&_rep->active_ex_fd_set, &_rep->ex_fd_set, sizeof(fd_set));
204                
205                const Uint32 SECONDS = milliseconds / 1000;
206                const Uint32 MICROSECONDS = (milliseconds % 1000) * 1000;
207                struct timeval tv = { SECONDS, MICROSECONDS };
208                
209                count = select(
210                   FD_SETSIZE,
211                   &_rep->active_rd_fd_set,
212 mday  1.16 //       &_rep->active_wr_fd_set,
213                   NULL,
214 mday  1.13        &_rep->active_ex_fd_set,
215                   &tv);
216 mday  1.18     if(count == 0)
217 mike  1.2      {
218 mday  1.13        return false;
219                }
220 mike  1.2  #ifdef PEGASUS_OS_TYPE_WINDOWS
221 mday  1.13     else if (count == SOCKET_ERROR)
222 mike  1.2  #else
223 mday  1.13     else if (count == -1)
224 mike  1.2  #endif
225 mday  1.13     {
226                   return false;
227 mike  1.2      }
228 mday  1.13     
229 mday  1.24     
230 mday  1.13     Boolean handled_events = false;
231 mday  1.24     try { _connection_mutex.try_lock(pegasus_thread_self()); }
232                catch(AlreadyLocked){
233                  pegasus_sleep(1);
234                  return false;
235                }
236                
237 mday  1.13     for (Uint32 i = 0, n = _entries.size(); i < _entries.size(); i++)
238 mike  1.2      {
239            	Sint32 socket = _entries[i].socket;
240            	Uint32 events = 0;
241            
242 mday  1.8  	if(_entries[i].dying.value() > 0 )
243            	{
244            	   if(_entries[i]._type == Monitor::CONNECTION)
245            	   {
246            	      
247            	      MessageQueue *q = MessageQueue::lookup(_entries[i].queueId);
248 mday  1.12 	      if(q && static_cast<HTTPConnection *>(q)->is_dying() && 
249            		 (0 == static_cast<HTTPConnection *>(q)->refcount.value()))
250 mday  1.8  	      {
251            		 static_cast<HTTPConnection *>(q)->lock_connection();
252            		 static_cast<HTTPConnection *>(q)->unlock_connection();
253            		 
254            		 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
255            		 Message* message= new CloseConnectionMessage(static_cast<HTTPConnection *>(q)->getSocket());
256            		 message->dest = o.getQueueId();
257 mday  1.24  		 _connection_mutex.unlock();
258            		 
259 mday  1.8  		 o.enqueue(message);
260 mday  1.24 		 return true;
261 kumpf 1.11 		 i--;
262 mday  1.8  		 n = _entries.size();
263            	      }
264            	   }
265            	}
266            
267 mike  1.2  	if (FD_ISSET(socket, &_rep->active_rd_fd_set))
268            	    events |= SocketMessage::READ;
269            
270            	if (FD_ISSET(socket, &_rep->active_ex_fd_set))
271            	    events |= SocketMessage::EXCEPTION;
272 mday  1.7   
273 mike  1.2  	if (events)
274            	{
275 kumpf 1.4              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
276 mday  1.16 			  "Monitor::run - Socket Event Detected events = %d", events);
277            	    if (events & SocketMessage::READ)
278 mday  1.8  	    {
279 mday  1.16 	       FD_CLR(socket, &_rep->active_rd_fd_set);
280 mday  1.8  	       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
281 mday  1.16 			     "Monitor::run FD_CLR READ");
282 mday  1.8  	    }
283 mday  1.16 	    else if (events & SocketMessage::EXCEPTION)
284 mday  1.8  	    {
285              	       FD_CLR(socket, &_rep->active_ex_fd_set);
286            	       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
287            			     "Monitor::run FD_CLR EXECEPTION");
288            	    }
289 mike  1.2  	    MessageQueue* queue = MessageQueue::lookup(_entries[i].queueId);
290 mday  1.8  	    if( ! queue )
291            	    {
292 mday  1.22 	       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
293            			     "Monitor::run lookup for connection entry failed, unsoliciting");
294 mday  1.24  	       _connection_mutex.unlock();
295 mday  1.8  	       unsolicitSocketMessages(socket);
296 mday  1.24 	       return true;
297 mday  1.8  	    }
298 mday  1.7  	    
299 mday  1.18 	    if(_async == true && _entries[i]._type == Monitor::CONNECTION)
300 mday  1.7  	    {
301 mday  1.23 
302 mday  1.13 	       if( static_cast<HTTPConnection *>(queue)->refcount.value() == 0 )
303 mday  1.12 	       {
304 mday  1.22 		  Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
305            				"Monitor::run dispatching thread to idle connection");
306 mday  1.12 		  static_cast<HTTPConnection *>(queue)->refcount++;
307            		  if( false == static_cast<HTTPConnection *>(queue)->is_dying())
308 mday  1.19 		     _thread_pool->allocate_and_awaken((void *)queue, _dispatch);
309 mday  1.12 		  else
310            		     static_cast<HTTPConnection *>(queue)->refcount--;
311            	       }
312 mday  1.22 	       else
313 mday  1.23 		  pegasus_sleep(1);
314 mike  1.2  	    }
315 mday  1.7  	    else 
316 mike  1.2  	    {
317 mday  1.24  	      _connection_mutex.unlock();
318            	      
319 mday  1.22 	       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
320            			     "Monitor::run enqueueing to non-connection HTTP class");
321 mday  1.8  	       Message* message = new SocketMessage(socket, events);
322            	       queue->enqueue(message);
323 mday  1.24 	       return true;
324            	       
325 mike  1.2  	    }
326 mday  1.12 	    count--;
327 mday  1.14 	    pegasus_yield();
328 mike  1.2  	}
329 mday  1.13 	handled_events = true;
330 mike  1.2      }
331 mday  1.24     _connection_mutex.unlock();
332 mday  1.13     return(handled_events);
333 mike  1.2  }
334            
335            Boolean Monitor::solicitSocketMessages(
336                Sint32 socket, 
337                Uint32 events,
338 mday  1.8      Uint32 queueId, 
339                int type)
340 mike  1.2  {
341 kumpf 1.4      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage");
342            
343 mike  1.2      // See whether a handler is already registered for this one:
344                Uint32 pos = _findEntry(socket);
345            
346                if (pos != PEGASUS_NOT_FOUND)
347 kumpf 1.4      {
348                    PEG_METHOD_EXIT();
349 mike  1.2  	return false;
350 kumpf 1.4      }
351 mike  1.2  
352                // Set the events:
353            
354                if (events & SocketMessage::READ)
355            	FD_SET(socket, &_rep->rd_fd_set);
356            
357                if (events & SocketMessage::WRITE)
358            	FD_SET(socket, &_rep->wr_fd_set);
359            
360                if (events & SocketMessage::EXCEPTION)
361            	FD_SET(socket, &_rep->ex_fd_set);
362            
363                // Add the entry to the list:
364 mday  1.8      _MonitorEntry entry(socket, queueId, type);
365 mike  1.2      _entries.append(entry);
366 mday  1.8      
367 mike  1.2      // Success!
368 mday  1.12 
369 kumpf 1.4      PEG_METHOD_EXIT();
370 mike  1.2      return true;
371            }
372            
373            Boolean Monitor::unsolicitSocketMessages(Sint32 socket)
374            {
375 kumpf 1.4      PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessage");
376            
377 mike  1.2      // Look for the given entry and remove it:
378            
379 mday  1.24     _connection_mutex.lock(pegasus_thread_self());
380                
381 mike  1.2      for (Uint32 i = 0, n = _entries.size(); i < n; i++)
382                {
383            	if (_entries[i].socket == socket)
384            	{
385            	    Sint32 socket = _entries[i].socket;
386            	    FD_CLR(socket, &_rep->rd_fd_set);
387            	    FD_CLR(socket, &_rep->wr_fd_set);
388            	    FD_CLR(socket, &_rep->ex_fd_set);
389            	    _entries.remove(i);
390 kumpf 1.11             // ATTN-RK-P3-20020521: Need "Socket::close(socket);" here?
391 mday  1.18 	    Socket::close(socket);
392 kumpf 1.4              PEG_METHOD_EXIT();
393 mday  1.24 	    _connection_mutex.unlock();
394 mike  1.2  	    return true;
395            	}
396                }
397 kumpf 1.4      PEG_METHOD_EXIT();
398 mday  1.24     _connection_mutex.unlock();
399                
400 mike  1.2      return false;
401            }
402            
403 mday  1.8  Uint32 Monitor::_findEntry(Sint32 socket) 
404 mike  1.2  {
405 mday  1.24   _connection_mutex.lock(pegasus_thread_self());
406              
407 mday  1.7     for (Uint32 i = 0, n = _entries.size(); i < n; i++)
408 mike  1.2      {
409            	if (_entries[i].socket == socket)
410 mday  1.24 	  {
411            	    _connection_mutex.unlock();
412 mike  1.2  	    return i;
413 mday  1.24 	  }
414 mike  1.2      }
415 mday  1.24    _connection_mutex.unlock();
416 mike  1.2      return PEG_NOT_FOUND;
417 mday  1.7  }
418            
419            
420            PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
421            {
422 mday  1.8     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
423               if( true == dst->is_dying())
424 kumpf 1.11    {
425                  dst->refcount--;
426 mday  1.8        return 0;
427 kumpf 1.11    }
428 mday  1.8     if( false == dst->is_dying())
429 mday  1.12    {
430 mday  1.14       if(false == dst->run(1))
431            	 pegasus_sleep(1);
432                  
433 mday  1.12    }
434 kumpf 1.11    dst->refcount--;
435 mday  1.8     return 0;
436 mday  1.7  }
437            
438 mike  1.2  
439            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2