(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.4 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6 chip  1.8 // of this software and associated documentation files (the "Software"), to
  7           // deal in the Software without restriction, including without limitation the
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9 mike  1.4 // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11 chip  1.8 //
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 13 mike  1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15 chip  1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 18 mike  1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22           //
 23           // Author: Mike Brasher (mbrasher@bmc.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29 mike  1.5 #include <Pegasus/Common/HashTable.h>
 30 mike  1.7 #include <Pegasus/Common/IPC.h>
 31 mike  1.4 #include "MessageQueue.h"
 32           
 33           PEGASUS_USING_STD;
 34           
 35           PEGASUS_NAMESPACE_BEGIN
 36           
 37 mike  1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 38               QueueTable;
 39           
 40           static QueueTable _queueTable(128);
 41 mike  1.7 static Mutex q_table_mut = Mutex();
 42 mike  1.5 
 43 mike  1.7 static Uint32 _GetNextQueueId() throw(IPCException)
 44 mike  1.5 {
 45 chip  1.8 
 46 mike  1.7    static Uint32 _queueId = 2;
 47              static Mutex _id_mut = Mutex();
 48 chip  1.8 
 49 mike  1.7    _id_mut.lock(pegasus_thread_self());
 50 chip  1.8 
 51 mike  1.7    // Handle wrap-around!
 52              if (_queueId == 0)
 53                 _queueId = MessageQueue::_CIMOM_Q_ID;
 54              Uint32 ret = _queueId++;
 55              _id_mut.unlock();
 56 chip  1.8 
 57 mike  1.7    return ret;
 58           }
 59           
 60           Uint32 MessageQueue::_CIMOM_Q_ID = 1;
 61 mike  1.5 
 62 kumpf 1.13 MessageQueue::MessageQueue(const char * name, Boolean async)
 63 chip  1.8  	: _mut( ), _count(0), _front(0), _back(0),
 64 mday  1.12 	  _async(async),
 65            	  _workThread(MessageQueue::workThread, this, false),
 66            	  _workSemaphore(0)
 67            
 68            
 69 mike  1.5  {
 70 mike  1.7     if(name != NULL)
 71               {
 72                  strncpy(_name, name, 25);
 73                  _name[25] = 0x00;
 74               }
 75               else
 76                  memset(_name, 0x00,25);
 77 mike  1.5  
 78 chip  1.8      q_table_mut.lock(pegasus_thread_self());
 79            
 80 mike  1.5      while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
 81 mike  1.7         ;
 82                q_table_mut.unlock();
 83 chip  1.8  
 84 mday  1.12     
 85                if(_async == true)
 86                {
 87                   _workThread.run();
 88                }
 89                
 90 mike  1.5  }
 91            
 92            MessageQueue::~MessageQueue()
 93            {
 94                // ATTN-A: thread safety!
 95 chip  1.8      q_table_mut.lock(pegasus_thread_self());
 96            
 97 mike  1.5      _queueTable.remove(_queueId);
 98 mike  1.7      q_table_mut.unlock();
 99 chip  1.8  	
100 mday  1.12     if(_async == true)
101                {
102                   _workThread.cancel();	// cancel thread
103                   _workSemaphore.signal();// wake thread
104                   _workThread.join();     // wait for thread to complete
105                }
106                
107 chip  1.8  }
108            
109 mday  1.12 
110 chip  1.8  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)
111            {
112            	// get thread from argument
113            	Thread * thread = (Thread *)arg;
114            
115            	PEGASUS_ASSERT(thread != 0);
116            
117            	// get message queue from thread
118            	MessageQueue * queue = (MessageQueue *)thread->get_parm();
119            	
120            	PEGASUS_ASSERT(queue != 0);
121            
122            	while(true)
123            	{
124 mday  1.14 	   if(thread->is_cancelled())
125            	   {
126            	      break;
127            	   }
128            	   
129            	   // wait for work
130            	   queue->_workSemaphore.wait();
131            	   
132            	   // stop the thread when the message queue has been destroyed.
133            	   // ATTN: should check the thread cancel flag that is not yet exposed!
134            	   if(MessageQueue::lookup(queue->_queueId) == 0)
135            	   {
136            	      break;
137            	   }
138            	   
139            	   // ensure the queue has a message before dispatching
140            	   if(queue->_count != 0)
141            	   {
142            	      queue->handleEnqueue();
143            	   }
144 chip  1.8  	}
145            
146            	thread->exit_self(PEGASUS_THREAD_RETURN(0));
147            	
148            	return(0);
149 mike  1.5  }
150            
151 mike  1.7  void MessageQueue::enqueue(Message* message) throw(IPCException)
152 mike  1.4  {
153                if (!message)
154 mday  1.9         throw NullPointer();
155 mike  1.7  
156                if (getenv("PEGASUS_TRACE"))
157                {
158 mday  1.9         cout << "===== " << getQueueName() << ": ";
159                   message->print(cout);
160 mike  1.7      }
161            
162 mday  1.9      _mut.lock(pegasus_thread_self());
163 mike  1.4      if (_back)
164                {
165 mday  1.9         _back->_next = message;
166                   message->_prev = _back;
167                   message->_next = 0;
168                   _back = message;
169 mike  1.4      }
170                else
171                {
172 mday  1.9         _front = message;
173                   _back = message;
174                   message->_prev = 0;
175                   message->_next = 0;
176 mike  1.4      }
177                message->_owner = this;
178                _count++;
179 mday  1.14     if( _async == true )
180                {
181                   _workSemaphore.signal();
182                }
183                
184 mike  1.7      _mut.unlock();
185 chip  1.11 
186 mday  1.14     if(_async == false )
187 mday  1.12        handleEnqueue();
188 mike  1.4  }
189            
190 mday  1.15 
191            Boolean MessageQueue::accept_async(Message *message) throw(IPCException)
192            {
193               if(! message)
194                  throw NullPointer();
195               if(_async == false)
196                  return false;
197               
198               if (getenv("PEGASUS_TRACE"))
199               {
200                  cout << "==~ accept() ~== " << getQueueName() << ": ";
201                  message->print(cout);
202               }
203            
204            
205               // in derived methods, evaluate the message here to determine
206               // whether or not you can handle it. 
207            
208                _mut.lock(pegasus_thread_self());
209                if (_back)
210                {
211 mday  1.15        _back->_next = message;
212                   message->_prev = _back;
213                   message->_next = 0;
214                   _back = message;
215                }
216                else
217                {
218                   _front = message;
219                   _back = message;
220                   message->_prev = 0;
221                   message->_next = 0;
222                }
223                message->_owner = this;
224                _count++;
225                _workSemaphore.signal();
226                _mut.unlock();
227            
228               return true;
229            }
230            
231            
232 mike  1.7  Message* MessageQueue::dequeue() throw(IPCException)
233 mike  1.4  {
234 mike  1.7     _mut.lock(pegasus_thread_self());
235 mike  1.4      if (_front)
236                {
237            	Message* message = _front;
238            	_front = _front->_next;
239            	if (_front)
240            	    _front->_prev = 0;
241            
242            	if (_back == message)
243            	    _back = 0;
244 mike  1.7  	_count--;
245            	_mut.unlock();
246 mike  1.4  	message->_next = 0;
247            	message->_prev = 0;
248            	message->_owner = 0;
249            	return message;
250                }
251 mike  1.7      _mut.unlock();
252 mike  1.4      return 0;
253            }
254            
255 mike  1.7  void MessageQueue::remove(Message* message) throw(IPCException)
256 mike  1.4  {
257                if (!message)
258            	throw NullPointer();
259            
260                if (message->_owner != this)
261            	throw NoSuchMessageOnQueue();
262            
263 mike  1.7      _mut.lock(pegasus_thread_self());
264 chip  1.8  
265 mike  1.4      if (message->_next)
266            	message->_next->_prev = message->_prev;
267                else
268            	_back = message->_prev;
269            
270                if (message->_prev)
271            	message->_prev->_next = message->_next;
272                else
273            	_front = message->_next;
274            
275 mike  1.7      _count--;
276                _mut.unlock();
277 chip  1.8  
278 mike  1.4      message->_prev = 0;
279                message->_next = 0;
280                message->_owner = 0;
281            }
282            
283 mike  1.7  Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
284 mike  1.4  {
285 mike  1.7     _mut.lock(pegasus_thread_self());
286 chip  1.8  
287 mike  1.4      for (Message* m = front(); m; m = m->getNext())
288                {
289 mike  1.7         if (m->getType() == type)
290                   {
291            	  _mut.unlock();
292            	  return m;
293                   }
294 mike  1.4      }
295 mike  1.7      _mut.unlock();
296 mike  1.4      return 0;
297            }
298            
299 mike  1.7  Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
300 mike  1.4  {
301 mike  1.7     _mut.lock(pegasus_thread_self());
302 chip  1.8  
303 mike  1.4      for (Message* m = front(); m; m = m->getNext())
304                {
305 mike  1.7         if (m->getKey() == key)
306                   {
307            	  _mut.unlock();
308            	  return m;
309                   }
310 chip  1.8  
311 mike  1.4      }
312 mike  1.7      _mut.unlock();
313 mike  1.4      return 0;
314            }
315            
316 mike  1.7  void MessageQueue::print(ostream& os) const throw(IPCException)
317 mike  1.4  {
318 mike  1.7     const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
319 chip  1.8  
320 mike  1.7     for (const Message* m = front(); m; m = m->getNext())
321 mike  1.4  	m->print(os);
322 mike  1.7     const_cast<MessageQueue *>(this)->_mut.unlock();
323 mike  1.4  }
324            
325 mike  1.7  Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
326 mike  1.4  {
327 mike  1.7     _mut.lock(pegasus_thread_self());
328 chip  1.8  
329 mike  1.4      for (Message* m = front(); m; m = m->getNext())
330                {
331 mike  1.7         if (m->getType() == type && m->getKey() == key)
332                   {
333            	  _mut.unlock();
334            	  return m;
335                   }
336 mike  1.4      }
337 mike  1.7      _mut.unlock();
338 mike  1.4  
339 mike  1.5      return 0;
340            }
341            
342 mike  1.7  void MessageQueue::lock() throw(IPCException)
343 mike  1.5  {
344 mike  1.7     _mut.lock(pegasus_thread_self());
345            }
346 mike  1.5  
347 chip  1.8  void MessageQueue::unlock()
348 mike  1.7  {
349               _mut.unlock();
350 mike  1.5  }
351            
352 mike  1.7  const char* MessageQueue::getQueueName() const
353 mike  1.5  {
354 mike  1.7     if(_name[0] != 0x00)
355                  return _name;
356               return "unknown";
357 mike  1.5  }
358            
359 mike  1.7  MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
360 mike  1.5  {
361                MessageQueue* queue = 0;
362 mike  1.7      q_table_mut.lock(pegasus_thread_self());
363 chip  1.8  
364 mike  1.5      if (_queueTable.lookup(queueId, queue))
365 mike  1.7      {
366                   q_table_mut.unlock();
367                   return queue;
368                }
369 chip  1.8  
370 mike  1.7      // Not found!
371 mike  1.5  
372 mike  1.7      q_table_mut.unlock();
373 chip  1.8  
374 mike  1.4      return 0;
375 mike  1.6  }
376 mike  1.7  
377            
378            MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
379            {
380               if(name == NULL)
381                  throw NullPointer();
382               q_table_mut.lock(pegasus_thread_self());
383 chip  1.8  
384 mike  1.7     for(QueueTable::Iterator i = _queueTable.start(); i; i++)
385               {
386                    // ATTN: Need to decide how many characters to compare in queue names
387                  if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
388                  {
389            	 q_table_mut.unlock();
390            	 return( (MessageQueue *)i.value());
391                  }
392 chip  1.8  
393 mike  1.7     }
394               q_table_mut.unlock();
395 chip  1.8  
396 mike  1.7     return 0;
397            }
398            
399 mike  1.6  
400            void MessageQueue::handleEnqueue()
401            {
402            
403 mike  1.4  }
404            
405            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2