(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12            // 
 13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Brasher (mbrasher@bmc.com)
 25            //
 26            // Modified By:
 27            //
 28            //%/////////////////////////////////////////////////////////////////////////////
 29            
 30            #include <Pegasus/Common/Config.h>
 31            #include <cstring>
 32            #include "Monitor.h"
 33            #include "MessageQueue.h"
 34            #include "Socket.h"
 35 kumpf 1.4  #include <Pegasus/Common/Tracer.h>
 36 mday  1.7  #include <Pegasus/Common/HTTPConnection.h>
 37 mike  1.2  
 38            #ifdef PEGASUS_OS_TYPE_WINDOWS
 39            # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 40            #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 41            of <winsock.h>. It may have been indirectly included (e.g., by including \
 42 mday  1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
 43 mike  1.2  compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 44            otherwise, less than 64 clients (the default) will be able to connect to the \
 45            CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 46 mday  1.5  
 47 mike  1.2  # endif
 48            # define FD_SETSIZE 1024
 49 mday  1.39.6.1 g# include <windows.h>
 50 mike  1.2      #else
 51                # include <sys/types.h>
 52                # include <sys/socket.h>
 53                # include <sys/time.h>
 54                # include <netinet/in.h>
 55                # include <netdb.h>
 56                # include <arpa/inet.h>
 57                #endif
 58                
 59                PEGASUS_USING_STD;
 60                
 61                PEGASUS_NAMESPACE_BEGIN
 62                
 63 mday  1.18     
 64 mday  1.25     static AtomicInt _connections = 0;
 65                
 66                
 67                static struct timeval create_time = {0, 1};
 68 mday  1.38     static struct timeval destroy_time = {300, 0};
 69 mday  1.26     static struct timeval deadlock_time = {0, 0};
 70 mday  1.18     
 71 mike  1.2      ////////////////////////////////////////////////////////////////////////////////
 72                //
 73                // MonitorRep
 74                //
 75                ////////////////////////////////////////////////////////////////////////////////
 76                
 77                struct MonitorRep
 78                {
 79                    fd_set rd_fd_set;
 80                    fd_set wr_fd_set;
 81                    fd_set ex_fd_set;
 82                    fd_set active_rd_fd_set;
 83                    fd_set active_wr_fd_set;
 84                    fd_set active_ex_fd_set;
 85                };
 86                
 87                ////////////////////////////////////////////////////////////////////////////////
 88                //
 89                // Monitor
 90                //
 91                ////////////////////////////////////////////////////////////////////////////////
 92 mike  1.2      
 93                Monitor::Monitor()
 94 mday  1.7         : _module_handle(0), _controller(0), _async(false)
 95 mike  1.2      {
 96                    Socket::initializeInterface();
 97 mday  1.25         _rep = 0;
 98 mday  1.37         _entries.reserveCapacity(32);
 99                    for( int i = 0; i < 32; i++ )
100                    {
101                       _MonitorEntry entry(0, 0, 0);
102                       _entries.append(entry);
103                    }
104 mike  1.2      }
105                
106 mday  1.18     Monitor::Monitor(Boolean async)
107                   : _module_handle(0), _controller(0), _async(async)
108                {
109                    Socket::initializeInterface();
110 mday  1.25         _rep = 0;
111 mday  1.37         _entries.reserveCapacity(32);
112                    for( int i = 0; i < 32; i++ )
113                    {
114                       _MonitorEntry entry(0, 0, 0);
115                       _entries.append(entry);
116                    }
117 mday  1.19         if( _async == true )
118                    {
119                       _thread_pool = new ThreadPool(0, 
120                				     "Monitor", 
121 mday  1.38     				     0, 
122 mday  1.25     				     0,
123 mday  1.19     				     create_time, 
124                				     destroy_time, 
125                				     deadlock_time);
126                    }
127 mday  1.20         else 
128                       _thread_pool = 0;
129 mday  1.18     }
130 mday  1.20     
131 mike  1.2      Monitor::~Monitor()
132                {
133 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
134                                  "deregistering with module controller");
135 kumpf 1.10     
136 kumpf 1.11         if(_module_handle != NULL)
137 mday  1.8          {
138                       _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
139                       _controller = 0;
140 kumpf 1.10            delete _module_handle;
141 mday  1.8          }
142 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
143 mday  1.8         
144 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
145 mike  1.2          Socket::uninitializeInterface();
146 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
147                                  "returning from monitor destructor");
148 mday  1.21         if(_async == true)
149 mday  1.20            delete _thread_pool;
150 mike  1.2      }
151                
152 mday  1.7      
153 mday  1.18     int Monitor::kill_idle_threads()
154                {
155                   static struct timeval now, last;
156                   gettimeofday(&now, NULL);
157 mday  1.20        int dead_threads = 0;
158 mday  1.18        
159 mday  1.38        if( now.tv_sec - last.tv_sec > 120 )
160 mday  1.18        {
161                      gettimeofday(&last, NULL);
162 mday  1.20           try 
163                      {
164                	 dead_threads =  _thread_pool->kill_dead_threads();
165                      }
166                      catch(IPCException& )
167                      {
168                      }
169                      
170 mday  1.18        }
171 mday  1.20        return dead_threads;
172 mday  1.18     }
173                
174 mday  1.7      
175 mike  1.2      Boolean Monitor::run(Uint32 milliseconds)
176                {
177 mday  1.18     
178 mday  1.25         Boolean handled_events = false;
179                    int i = 0;
180 mday  1.37         #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)
181 kumpf 1.36         struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
182                #else
183 kumpf 1.35         struct timeval tv = {0, 1};
184                #endif
185 mday  1.25         fd_set fdread;
186                    FD_ZERO(&fdread);
187 mday  1.37         _entry_mut.lock(pegasus_thread_self());
188 mday  1.13         
189 mday  1.25         for( int indx = 0; indx < (int)_entries.size(); indx++)
190 mike  1.2          {
191 mday  1.37            if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
192 mday  1.25            {
193                	  FD_SET(_entries[indx].socket, &fdread);
194                       }
195 mday  1.13         }
196 mday  1.37     
197 mday  1.25         
198                    int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
199                
200 mike  1.2      #ifdef PEGASUS_OS_TYPE_WINDOWS
201 mday  1.25         if(events && events != SOCKET_ERROR )
202 mike  1.2      #else
203 mday  1.25         if(events && events != -1 )
204 mike  1.2      #endif
205 mday  1.13         {
206 mday  1.25            for( int indx = 0; indx < (int)_entries.size(); indx++)
207                       {
208                	  if(FD_ISSET(_entries[indx].socket, &fdread))
209                	  {
210                	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
211                	     if(q == 0)
212                	     {
213 mday  1.37     		try
214                		{
215                		   _entries[indx]._status = _MonitorEntry::EMPTY;
216                		}
217                		catch(...)
218                		{
219                
220                		}
221                		continue;
222 mday  1.25     	     }
223 mday  1.37     	     try 
224 mday  1.25     	     {
225 mday  1.37     		if(_entries[indx]._type == Monitor::CONNECTION)
226                		{
227                		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
228                		   if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
229                		   {
230                		      _entries[indx]._status = _MonitorEntry::DYING;
231                		      MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
232                		      Message* message= new CloseConnectionMessage(_entries[indx].socket);
233                		      message->dest = o.getQueueId();
234                		      _entry_mut.unlock();
235                		      o.enqueue(message);
236                		      return true;
237                		   }
238                		   _entries[indx]._status = _MonitorEntry::BUSY;
239                		   _thread_pool->allocate_and_awaken((void *)q, _dispatch);
240                		}
241                		else
242 mday  1.25     		{
243 mday  1.37     		   int events = 0;
244                		   events |= SocketMessage::READ;
245                		   Message *msg = new SocketMessage(_entries[indx].socket, events);
246                		   _entries[indx]._status = _MonitorEntry::BUSY;
247                		   _entry_mut.unlock();
248 mday  1.27     
249 mday  1.37     		   q->enqueue(msg);
250                		   _entries[indx]._status = _MonitorEntry::IDLE;
251 mday  1.25     		   return true;
252                		}
253                	     }
254 mday  1.37     	     catch(...)
255 mday  1.25     	     {
256                	     }
257                	     handled_events = true;
258                	  }
259                       }
260 mday  1.24         }
261 mday  1.37         _entry_mut.unlock();
262 mday  1.13         return(handled_events);
263 mike  1.2      }
264                
265 mday  1.25     
266 mday  1.37     
267 mday  1.25     int  Monitor::solicitSocketMessages(
268 mike  1.2          Sint32 socket, 
269                    Uint32 events,
270 mday  1.8          Uint32 queueId, 
271                    int type)
272 mike  1.2      {
273 kumpf 1.4      
274 kumpf 1.31        PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
275 mike  1.2      
276 mday  1.37        int index = -1;
277                   _entry_mut.lock(pegasus_thread_self());
278 mday  1.25        
279                   for(index = 0; index < (int)_entries.size(); index++)
280                   {
281 mday  1.37           try 
282                      {
283                	 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
284                	 {
285                	    _entries[index].socket = socket;
286                	    _entries[index].queueId  = queueId;
287                	    _entries[index]._type = type;
288                	    _entries[index]._status = _MonitorEntry::IDLE;
289                	    _entry_mut.unlock();
290                	    
291                	    return index;
292                	 }
293                      }
294                      catch(...)
295 mday  1.25           {
296                      }
297 mday  1.37     
298 mday  1.25        }
299 mday  1.37           _entry_mut.unlock();
300 mday  1.25        PEG_METHOD_EXIT();
301                   return index;
302 mike  1.2      }
303                
304 mday  1.25     void Monitor::unsolicitSocketMessages(Sint32 socket)
305 mike  1.2      {
306 mday  1.25         PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
307 mday  1.37         _entry_mut.lock(pegasus_thread_self());
308 mday  1.27         
309 mday  1.25         for(int index = 0; index < (int)_entries.size(); index++)
310 mike  1.2          {
311 mday  1.25            if(_entries[index].socket == socket)
312                       {
313                	  _entries[index]._status = _MonitorEntry::EMPTY;
314 mday  1.37     	  break;
315 mday  1.25            }
316 mike  1.2          }
317 mday  1.37         _entry_mut.unlock();
318 kumpf 1.4          PEG_METHOD_EXIT();
319 mike  1.2      }
320                
321 mday  1.7      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
322                {
323 mday  1.8         HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
324 mday  1.37        
325 mday  1.25        dst->run(1);
326 mday  1.37        if(  dst->_monitor->_entries.size() > (Uint32)dst->_entry_index )
327                      dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
328                   
329 mday  1.8         return 0;
330 mday  1.7      }
331 mday  1.37     
332 mday  1.7      
333 mike  1.2      
334                PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2