(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4            // The Open Group, Tivoli Systems
  5 mike  1.2  //
  6            // Permission is hereby granted, free of charge, to any person obtaining a copy
  7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
  8            // deal in the Software without restriction, including without limitation the
  9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10 mike  1.2  // sell copies of the Software, and to permit persons to whom the Software is
 11            // furnished to do so, subject to the following conditions:
 12            // 
 13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14 mike  1.2  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19 mike  1.2  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21            //
 22            //==============================================================================
 23            //
 24            // Author: Mike Brasher (mbrasher@bmc.com)
 25            //
 26            // Modified By:
 27            //
 28            //%/////////////////////////////////////////////////////////////////////////////
 29            
 30 mday  1.32.2.2 #include "Monitor.h"
 31 mike  1.2      
 32                #ifdef PEGASUS_OS_TYPE_WINDOWS
 33                # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 34                #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 35                of <winsock.h>. It may have been indirectly included (e.g., by including \
 36 mday  1.25     <windows.h>). Finthe inclusion of that header which is visible to this \
 37 mike  1.2      compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 38                otherwise, less than 64 clients (the default) will be able to connect to the \
 39                CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
 40 mday  1.5      
 41 mike  1.2      # endif
 42                # define FD_SETSIZE 1024
 43 mday  1.5      # include <windows.h>
 44 mike  1.2      #else
 45                # include <sys/types.h>
 46                # include <sys/socket.h>
 47                # include <sys/time.h>
 48                # include <netinet/in.h>
 49                # include <netdb.h>
 50                # include <arpa/inet.h>
 51 mday  1.32.2.1 # include <unistd.h>
 52 mike  1.2      #endif
 53                
 54                PEGASUS_USING_STD;
 55                
 56                PEGASUS_NAMESPACE_BEGIN
 57                
 58 mday  1.18     
 59 mday  1.25     static AtomicInt _connections = 0;
 60                
 61                
 62                static struct timeval create_time = {0, 1};
 63 mday  1.26     static struct timeval destroy_time = {15, 0};
 64 mday  1.32.2.5 static struct timeval deadlock_time = {300, 0};
 65 mday  1.18     
 66 mike  1.2      ////////////////////////////////////////////////////////////////////////////////
 67                //
 68                // MonitorRep
 69                //
 70                ////////////////////////////////////////////////////////////////////////////////
 71                
 72                struct MonitorRep
 73                {
 74                    fd_set rd_fd_set;
 75                    fd_set wr_fd_set;
 76                    fd_set ex_fd_set;
 77                    fd_set active_rd_fd_set;
 78                    fd_set active_wr_fd_set;
 79                    fd_set active_ex_fd_set;
 80                };
 81                
 82                ////////////////////////////////////////////////////////////////////////////////
 83                //
 84                // Monitor
 85                //
 86                ////////////////////////////////////////////////////////////////////////////////
 87 mike  1.2      
 88                Monitor::Monitor()
 89 mday  1.32.2.2    : _async(false)
 90 mike  1.2      {
 91                    Socket::initializeInterface();
 92 mday  1.25         _rep = 0;
 93 mday  1.32.2.1     _entries.reserveCapacity(32);
 94                    int i = 0;
 95                    for( ; i < 32; i++ )
 96                    {
 97                       _MonitorEntry entry(0, 0, 0);
 98                       _entries.append(entry);
 99                    }
100 mike  1.2      }
101                
102 mday  1.18     Monitor::Monitor(Boolean async)
103 mday  1.32.2.2    : _async(async)
104 mday  1.18     {
105                    Socket::initializeInterface();
106 mday  1.25         _rep = 0;
107 mday  1.32.2.3     _entries.reserveCapacity(32);
108                    int i = 0;
109                    for( ; i < 32; i++ )
110                    {
111                       _MonitorEntry entry(0, 0, 0);
112                       _entries.append(entry);
113                    }
114 mday  1.32.2.1 
115 mday  1.19         if( _async == true )
116                    {
117                       _thread_pool = new ThreadPool(0, 
118                				     "Monitor", 
119 mday  1.32.2.1 				     1, 
120 mday  1.25     				     0,
121 mday  1.19     				     create_time, 
122                				     destroy_time, 
123                				     deadlock_time);
124                    }
125 mday  1.20         else 
126                       _thread_pool = 0;
127 mday  1.18     }
128 mday  1.20     
129 mike  1.2      Monitor::~Monitor()
130                {
131 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
132                                  "deregistering with module controller");
133 kumpf 1.10     
134 mday  1.32.2.2 
135 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
136 mday  1.8         
137 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
138 mike  1.2          Socket::uninitializeInterface();
139 kumpf 1.11         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
140                                  "returning from monitor destructor");
141 mday  1.21         if(_async == true)
142 mday  1.20            delete _thread_pool;
143 mike  1.2      }
144                
145 mday  1.7      
146 mday  1.18     int Monitor::kill_idle_threads()
147                {
148                   static struct timeval now, last;
149                   gettimeofday(&now, NULL);
150 mday  1.20        int dead_threads = 0;
151 mday  1.18        
152 mday  1.32.2.1    if( now.tv_sec - last.tv_sec > 300 )
153 mday  1.18        {
154 mday  1.32.2.1       PEGASUS_STD(cout) << "Monitor Thread Pool currently has " << 
155                	 _thread_pool->running_count() + 
156                	 _thread_pool->pool_count() << " Threads." << PEGASUS_STD(endl);
157 mday  1.18           gettimeofday(&last, NULL);
158 mday  1.20           try 
159                      {
160                	 dead_threads =  _thread_pool->kill_dead_threads();
161                      }
162                      catch(IPCException& )
163                      {
164                      }
165                      
166 mday  1.18        }
167 mday  1.20        return dead_threads;
168 mday  1.18     }
169                
170 mday  1.7      
171 mike  1.2      Boolean Monitor::run(Uint32 milliseconds)
172                {
173 mday  1.18     
174 mday  1.25         Boolean handled_events = false;
175                    int i = 0;
176 mday  1.18         
177 mday  1.25         struct timeval tv = {0,1};
178                    fd_set fdread;
179                    FD_ZERO(&fdread);
180 mday  1.32.2.4     _entry_mut.lock(pegasus_thread_self());
181                    
182 mday  1.25         for( int indx = 0; indx < (int)_entries.size(); indx++)
183 mike  1.2          {
184 mday  1.32.2.1        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
185 mday  1.25            {
186                	  FD_SET(_entries[indx].socket, &fdread);
187                       }
188 mday  1.13         }
189 mday  1.32.2.1 
190 mday  1.32.2.4     
191 mday  1.25         int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
192                
193 mike  1.2      #ifdef PEGASUS_OS_TYPE_WINDOWS
194 mday  1.25         if(events && events != SOCKET_ERROR )
195 mike  1.2      #else
196 mday  1.25         if(events && events != -1 )
197 mike  1.2      #endif
198 mday  1.13         {
199 mday  1.25            for( int indx = 0; indx < (int)_entries.size(); indx++)
200                       {
201                	  if(FD_ISSET(_entries[indx].socket, &fdread))
202                	  {
203                	     MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
204                	     if(q == 0)
205                	     {
206 mday  1.32.2.1 		PEGASUS_STD(cout) << "Monitor:: found an empty connection slot" << PEGASUS_STD(endl);
207 mday  1.32.2.5 		try
208 mday  1.32.2.1 		{
209 mday  1.32.2.5 		   _entries[indx]._status = _MonitorEntry::EMPTY;
210 mday  1.32.2.1 		}
211                		catch(...)
212                		{
213 mday  1.32.2.5 
214 mday  1.32.2.1 		}
215 mday  1.32.2.5 		continue;
216 mday  1.25     	     }
217 mday  1.32.2.1 	     try 
218 mday  1.25     	     {
219 mday  1.32.2.1 		if(_entries[indx]._type == Monitor::CONNECTION)
220 mday  1.25     		{
221 mday  1.32.2.1 		   static_cast<HTTPConnection *>(q)->_entry_index = indx;
222                		   if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
223                		   {
224                		      _entries[indx]._status = _MonitorEntry::DYING;
225                		      MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
226                		      Message* message= new CloseConnectionMessage(_entries[indx].socket);
227                		      message->dest = o.getQueueId();
228 mday  1.32.2.4 		      _entry_mut.unlock();
229 mday  1.32.2.1 		      o.enqueue(message);
230                		      return true;
231                		   }
232                		   _entries[indx]._status = _MonitorEntry::BUSY;
233                		   _thread_pool->allocate_and_awaken((void *)q, _dispatch);
234                		}
235                		else
236                		{
237                		   int events = 0;
238                		   events |= SocketMessage::READ;
239                		   Message *msg = new SocketMessage(_entries[indx].socket, events);
240                		   _entries[indx]._status = _MonitorEntry::BUSY;
241 mday  1.32.2.4 		   _entry_mut.unlock();
242 mday  1.32.2.1 		   q->enqueue(msg);
243                		   _entries[indx]._status = _MonitorEntry::IDLE;
244 mday  1.25     		   return true;
245                		}
246                	     }
247 mday  1.32.2.1 	     catch(...)
248 mday  1.25     	     {
249                	     }
250                	     handled_events = true;
251                	  }
252                       }
253 mday  1.24         }
254 mday  1.32.2.4     _entry_mut.unlock();
255 mday  1.13         return(handled_events);
256 mike  1.2      }
257                
258 mday  1.25     
259                int  Monitor::solicitSocketMessages(
260 mike  1.2          Sint32 socket, 
261                    Uint32 events,
262 mday  1.8          Uint32 queueId, 
263                    int type)
264 mike  1.2      {
265 kumpf 1.4      
266 mday  1.32.2.1    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage");
267 mike  1.2      
268 mday  1.32.2.1    int index = -1;
269 mday  1.32.2.4    _entry_mut.lock(pegasus_thread_self());
270                   
271 mday  1.25        for(index = 0; index < (int)_entries.size(); index++)
272                   {
273 mday  1.32.2.1       try 
274                      {
275                	 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
276                	 {
277                	    _entries[index].socket = socket;
278                	    _entries[index].queueId  = queueId;
279                	    _entries[index]._type = type;
280                	    _entries[index]._status = _MonitorEntry::IDLE;
281 mday  1.32.2.4 	    _entry_mut.unlock();
282                	    
283 mday  1.32.2.1 	    return index;
284                	 }
285                      }
286                      catch(...)
287 mday  1.25           {
288                      }
289 mday  1.32.2.4 
290 mday  1.25        }
291 mday  1.32.2.4       _entry_mut.unlock();
292 mday  1.25        PEG_METHOD_EXIT();
293                   return index;
294 mike  1.2      }
295                
296 mday  1.25     void Monitor::unsolicitSocketMessages(Sint32 socket)
297 mike  1.2      {
298 mday  1.25         PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
299 mday  1.32.2.4     _entry_mut.lock(pegasus_thread_self());
300 mday  1.27         
301 mday  1.25         for(int index = 0; index < (int)_entries.size(); index++)
302 mike  1.2          {
303 mday  1.25            if(_entries[index].socket == socket)
304                       {
305                	  _entries[index]._status = _MonitorEntry::EMPTY;
306 mday  1.32.2.4 	  break;
307 mday  1.25            }
308 mike  1.2          }
309 mday  1.32.2.4     	  
310                    _entry_mut.unlock();
311 mday  1.25         
312 mday  1.32.2.1 PEG_METHOD_EXIT();
313                if( _async  == true )
314                   PEGASUS_STD(cout) << "Monitor:: running " << _thread_pool->running_count() << 
315                   " idle " << _thread_pool->pool_count() << PEGASUS_STD(endl);
316 mike  1.2      
317 mday  1.32.2.1 }
318 mday  1.7      
319                
320                PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
321                {
322 mday  1.8         HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
323 mday  1.32.2.1    
324 mday  1.25        dst->run(1);
325 mday  1.32.2.1    if(  dst->_monitor->_entries.size() > (Uint32)dst->_entry_index )
326                      dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
327                   
328 mday  1.8         return 0;
329 mday  1.7      }
330                
331 mike  1.2      
332                PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2