(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.36 //%2003////////////////////////////////////////////////////////////////////////
  2 mike  1.4  //
  3 karl  1.36 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
  4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
  6            // IBM Corp.; EMC Corporation, The Open Group.
  7 mike  1.4  //
  8            // Permission is hereby granted, free of charge, to any person obtaining a copy
  9 chip  1.8  // of this software and associated documentation files (the "Software"), to
 10            // deal in the Software without restriction, including without limitation the
 11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 12 mike  1.4  // sell copies of the Software, and to permit persons to whom the Software is
 13            // furnished to do so, subject to the following conditions:
 14 kumpf 1.32 // 
 15 chip  1.8  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 16 mike  1.4  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 18 chip  1.8  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 21 mike  1.4  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 23            //
 24            //==============================================================================
 25            //
 26            // Author: Mike Brasher (mbrasher@bmc.com)
 27            //
 28            // Modified By:
 29            //
 30            //%/////////////////////////////////////////////////////////////////////////////
 31            
 32 mike  1.5  #include <Pegasus/Common/HashTable.h>
 33 mike  1.7  #include <Pegasus/Common/IPC.h>
 34 kumpf 1.19 #include <Pegasus/Common/Tracer.h>
 35 mike  1.4  #include "MessageQueue.h"
 36 mday  1.24 #include "MessageQueueService.h"
 37 mike  1.4  PEGASUS_USING_STD;
 38            
 39            PEGASUS_NAMESPACE_BEGIN
 40            
 41 mike  1.5  typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 42                QueueTable;
 43            
 44 mday  1.33 static QueueTable _queueTable(256);
 45 mday  1.28 static Mutex q_table_mut ;
 46 mike  1.5  
 47 mday  1.35 void MessageQueue::remove_myself(Uint32 qid)
 48            {
 49               q_table_mut.lock(pegasus_thread_self());
 50               
 51               _queueTable.remove(qid);
 52               q_table_mut.unlock();
 53            }
 54            
 55            
 56 mike  1.16 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 57 mike  1.5  {
 58 mday  1.22    static Uint32 _nextQueueId = 2;
 59 mike  1.16 
 60               //
 61               // Lock mutex:
 62               //
 63 chip  1.8  
 64 mday  1.28    static Mutex _id_mut ;
 65 mike  1.16    _id_mut.lock(pegasus_thread_self());
 66 chip  1.8  
 67 kumpf 1.34    Uint32 queueId;
 68 chip  1.8  
 69 kumpf 1.34    // Assign the next queue ID that is not already in use
 70               do
 71               {
 72                  // Handle wrap around and never assign zero or one as a queue id:
 73                  if (_nextQueueId == 0)
 74                  {
 75                     _nextQueueId = 2;
 76                  }
 77 chip  1.8  
 78 kumpf 1.34       queueId = _nextQueueId++;
 79               } while (lookup(queueId) != 0);
 80 mike  1.7  
 81 mike  1.16    //
 82               // Unlock mutex:
 83               //
 84 mike  1.5  
 85 mike  1.16    _id_mut.unlock();
 86 mday  1.12 
 87 mike  1.16    return queueId;
 88            }
 89 mday  1.12 
 90 mday  1.24 
 91            
 92 mike  1.16 MessageQueue::MessageQueue(
 93                const char* name, 
 94                Boolean async,
 95                Uint32 queueId)
 96 mday  1.24    : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
 97 mike  1.5  {
 98 mike  1.16     //
 99                // Copy the name:
100                //
101            
102 kumpf 1.25    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
103 kumpf 1.19 
104 mike  1.16     if (!name)
105 mday  1.24 	name = ""; 
106 mike  1.16 
107                _name = new char[strlen(name) + 1];
108                strcpy(_name, name);
109            
110 kumpf 1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
111 kumpf 1.19         "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
112            
113 mike  1.16     //
114                // Insert into queue table:
115                //
116 mike  1.5  
117 chip  1.8      q_table_mut.lock(pegasus_thread_self());
118            
119 mike  1.16     while (!_queueTable.insert(_queueId, this))
120 mike  1.7         ;
121 mike  1.16 
122 mike  1.7      q_table_mut.unlock();
123 chip  1.8  
124 mday  1.12     
125 kumpf 1.25    PEG_METHOD_EXIT();
126 mike  1.5  }
127            
128            MessageQueue::~MessageQueue()
129            {
130                // ATTN-A: thread safety!
131 kumpf 1.19 
132 kumpf 1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
133 kumpf 1.19 
134 kumpf 1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
135 kumpf 1.19         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
136            
137 chip  1.8      q_table_mut.lock(pegasus_thread_self());
138            
139 mike  1.5      _queueTable.remove(_queueId);
140 mike  1.7      q_table_mut.unlock();
141 chip  1.8  	
142 mike  1.16     // Free the name:
143 mday  1.12     
144 mike  1.16     delete [] _name;
145 kumpf 1.19 
146 kumpf 1.25     PEG_METHOD_EXIT();
147 chip  1.8  }
148            
149 kumpf 1.37 void MessageQueue::enqueue(Message* message)
150 mike  1.4  {
151 kumpf 1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
152 kumpf 1.19 
153                if (!message) 
154                {
155 kumpf 1.25        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
156 kumpf 1.19         "MessageQueue::enqueue failure");
157 kumpf 1.25        PEG_METHOD_EXIT();
158 mday  1.9         throw NullPointer();
159 kumpf 1.19     }
160 mike  1.7  
161 kumpf 1.31     PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3, 
162                                  String("Queue name: ") + getQueueName() ) ;
163                Tracer::trace   ( TRC_MESSAGEQUEUESERVICE, 
164                                  Tracer::LEVEL3,
165                                  "Message: [%s, %d]", 
166                                  MessageTypeToString(message->getType()), 
167                                  message->getKey() );
168 mday  1.35     
169 mday  1.9      _mut.lock(pegasus_thread_self());
170 mike  1.4      if (_back)
171                {
172 mday  1.9         _back->_next = message;
173                   message->_prev = _back;
174                   message->_next = 0;
175                   _back = message;
176 mike  1.4      }
177                else
178                {
179 mday  1.9         _front = message;
180                   _back = message;
181                   message->_prev = 0;
182                   message->_next = 0;
183 mike  1.4      }
184                message->_owner = this;
185 mday  1.30        
186 mike  1.4      _count++;
187 kumpf 1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
188 mday  1.30 		  "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
189                   
190 mike  1.7      _mut.unlock();
191 mday  1.30     
192 mday  1.22     handleEnqueue();
193 kumpf 1.25     PEG_METHOD_EXIT();
194 mday  1.15 }
195            
196 mike  1.7  Message* MessageQueue::dequeue() throw(IPCException)
197 mike  1.4  {
198 kumpf 1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
199            
200 mike  1.7     _mut.lock(pegasus_thread_self());
201 mike  1.4      if (_front)
202                {
203            	Message* message = _front;
204            	_front = _front->_next;
205            	if (_front)
206            	    _front->_prev = 0;
207            
208            	if (_back == message)
209            	    _back = 0;
210 kumpf 1.25 
211 mike  1.7  	_count--;
212 kumpf 1.25         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
213 kumpf 1.26             "MessageQueue::dequeue _queueId = %d, _count = %d", 
214                        _queueId, _count);
215 kumpf 1.25 
216 mike  1.7  	_mut.unlock();
217 mike  1.4  	message->_next = 0;
218            	message->_prev = 0;
219            	message->_owner = 0;
220 kumpf 1.25 
221                    PEG_METHOD_EXIT();
222 mike  1.4  	return message;
223                }
224 mike  1.7      _mut.unlock();
225 kumpf 1.25 
226                PEG_METHOD_EXIT();
227 mike  1.4      return 0;
228            }
229 kumpf 1.25 ;
230 mday  1.23 
231 mike  1.4  
232 mike  1.7  void MessageQueue::remove(Message* message) throw(IPCException)
233 mike  1.4  {
234 kumpf 1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
235            
236 mike  1.4      if (!message)
237 kumpf 1.25     {
238                    PEG_METHOD_EXIT();
239 mike  1.4  	throw NullPointer();
240 kumpf 1.25     }
241 mike  1.4  
242                if (message->_owner != this)
243 kumpf 1.25     {
244                    PEG_METHOD_EXIT();
245 mike  1.4  	throw NoSuchMessageOnQueue();
246 kumpf 1.25     }
247 mike  1.4  
248 mike  1.7      _mut.lock(pegasus_thread_self());
249 chip  1.8  
250 mike  1.4      if (message->_next)
251            	message->_next->_prev = message->_prev;
252                else
253            	_back = message->_prev;
254            
255                if (message->_prev)
256            	message->_prev->_next = message->_next;
257                else
258            	_front = message->_next;
259            
260 mike  1.7      _count--;
261 kumpf 1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
262                   "MessageQueue::remove _count = %d", _count);
263            
264 mike  1.7      _mut.unlock();
265 chip  1.8  
266 mike  1.4      message->_prev = 0;
267                message->_next = 0;
268                message->_owner = 0;
269 kumpf 1.25 
270                PEG_METHOD_EXIT();
271 mike  1.4  }
272            
273 mike  1.7  Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
274 mike  1.4  {
275 mike  1.7     _mut.lock(pegasus_thread_self());
276 chip  1.8  
277 mike  1.4      for (Message* m = front(); m; m = m->getNext())
278                {
279 mike  1.7         if (m->getType() == type)
280                   {
281            	  _mut.unlock();
282            	  return m;
283                   }
284 mike  1.4      }
285 mike  1.7      _mut.unlock();
286 mike  1.4      return 0;
287            }
288            
289 mike  1.7  Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
290 mike  1.4  {
291 mike  1.7     _mut.lock(pegasus_thread_self());
292 chip  1.8  
293 mike  1.4      for (Message* m = front(); m; m = m->getNext())
294                {
295 mike  1.7         if (m->getKey() == key)
296                   {
297            	  _mut.unlock();
298            	  return m;
299                   }
300 chip  1.8  
301 mike  1.4      }
302 mike  1.7      _mut.unlock();
303 mike  1.4      return 0;
304            }
305            
306 mike  1.7  void MessageQueue::print(ostream& os) const throw(IPCException)
307 mike  1.4  {
308 mike  1.7     const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
309 chip  1.8  
310 mike  1.7     for (const Message* m = front(); m; m = m->getNext())
311 mike  1.4  	m->print(os);
312 mike  1.7     const_cast<MessageQueue *>(this)->_mut.unlock();
313 mike  1.4  }
314            
315 mike  1.7  Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
316 mike  1.4  {
317 mike  1.7     _mut.lock(pegasus_thread_self());
318 chip  1.8  
319 mike  1.4      for (Message* m = front(); m; m = m->getNext())
320                {
321 mike  1.7         if (m->getType() == type && m->getKey() == key)
322                   {
323            	  _mut.unlock();
324            	  return m;
325                   }
326 mike  1.4      }
327 mike  1.7      _mut.unlock();
328 mike  1.4  
329 mike  1.5      return 0;
330            }
331            
332 mike  1.7  void MessageQueue::lock() throw(IPCException)
333 mike  1.5  {
334 mike  1.7     _mut.lock(pegasus_thread_self());
335            }
336 mike  1.5  
337 chip  1.8  void MessageQueue::unlock()
338 mike  1.7  {
339               _mut.unlock();
340 mike  1.5  }
341            
342 mike  1.7  const char* MessageQueue::getQueueName() const
343 mike  1.5  {
344 kumpf 1.29    return _name;
345 mike  1.5  }
346            
347 mike  1.7  MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
348 mike  1.5  {
349 kumpf 1.19 
350 mike  1.5      MessageQueue* queue = 0;
351 mike  1.7      q_table_mut.lock(pegasus_thread_self());
352 chip  1.8  
353 mike  1.5      if (_queueTable.lookup(queueId, queue))
354 mike  1.7      {
355                   q_table_mut.unlock();
356                   return queue;
357                }
358 chip  1.8  
359 mike  1.7      // Not found!
360 mike  1.5  
361 mike  1.7      q_table_mut.unlock();
362 chip  1.8  
363 kumpf 1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
364 kumpf 1.19         "MessageQueue::lookup failure queueId = %i", queueId);
365            
366 mike  1.4      return 0;
367 mike  1.6  }
368 mike  1.7  
369            
370            MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
371            {
372 kumpf 1.19 
373 mike  1.7     if(name == NULL)
374                  throw NullPointer();
375               q_table_mut.lock(pegasus_thread_self());
376 chip  1.8  
377 mike  1.7     for(QueueTable::Iterator i = _queueTable.start(); i; i++)
378               {
379                    // ATTN: Need to decide how many characters to compare in queue names
380 mday  1.27       if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
381 mike  1.7        {
382            	 q_table_mut.unlock();
383            	 return( (MessageQueue *)i.value());
384                  }
385 chip  1.8  
386 mike  1.7     }
387               q_table_mut.unlock();
388 kumpf 1.19 
389 kumpf 1.25    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
390 kumpf 1.19         "MessageQueue::lookup failure - name = %s", name);
391 chip  1.8  
392 mike  1.7     return 0;
393            }
394            
395 mike  1.6  
396            void MessageQueue::handleEnqueue()
397            {
398            
399 mike  1.4  }
400            
401            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2