(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.36 //%2003////////////////////////////////////////////////////////////////////////
  2 mike  1.4  //
  3 karl  1.36 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
  4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
  6            // IBM Corp.; EMC Corporation, The Open Group.
  7 mike  1.4  //
  8            // Permission is hereby granted, free of charge, to any person obtaining a copy
  9 chip  1.8  // of this software and associated documentation files (the "Software"), to
 10            // deal in the Software without restriction, including without limitation the
 11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 12 mike  1.4  // sell copies of the Software, and to permit persons to whom the Software is
 13            // furnished to do so, subject to the following conditions:
 14 kumpf 1.32 // 
 15 chip  1.8  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 16 mike  1.4  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 18 chip  1.8  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 21 mike  1.4  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 23            //
 24            //==============================================================================
 25            //
 26            // Author: Mike Brasher (mbrasher@bmc.com)
 27            //
 28 a.arora 1.38 // Modified By: Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
 29 mike    1.4  //
 30              //%/////////////////////////////////////////////////////////////////////////////
 31              
 32 mike    1.5  #include <Pegasus/Common/HashTable.h>
 33 mike    1.7  #include <Pegasus/Common/IPC.h>
 34 kumpf   1.19 #include <Pegasus/Common/Tracer.h>
 35 mike    1.4  #include "MessageQueue.h"
 36 mday    1.24 #include "MessageQueueService.h"
 37 mike    1.4  PEGASUS_USING_STD;
 38              
 39              PEGASUS_NAMESPACE_BEGIN
 40              
 41 mike    1.5  typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 42                  QueueTable;
 43              
 44 mday    1.33 static QueueTable _queueTable(256);
 45 mday    1.28 static Mutex q_table_mut ;
 46 mike    1.5  
 47 mday    1.35 void MessageQueue::remove_myself(Uint32 qid)
 48              {
 49 a.arora 1.38    AutoMutex autoMut(q_table_mut);   
 50 mday    1.35    _queueTable.remove(qid);
 51              }
 52              
 53              
 54 mike    1.16 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 55 mike    1.5  {
 56 mday    1.22    static Uint32 _nextQueueId = 2;
 57 mike    1.16 
 58                 //
 59                 // Lock mutex:
 60                 //
 61 chip    1.8  
 62 mday    1.28    static Mutex _id_mut ;
 63 a.arora 1.38    AutoMutex autoMut(_id_mut);
 64 chip    1.8  
 65 kumpf   1.34    Uint32 queueId;
 66 chip    1.8  
 67 kumpf   1.34    // Assign the next queue ID that is not already in use
 68                 do
 69                 {
 70                    // Handle wrap around and never assign zero or one as a queue id:
 71                    if (_nextQueueId == 0)
 72                    {
 73                       _nextQueueId = 2;
 74                    }
 75 chip    1.8  
 76 kumpf   1.34       queueId = _nextQueueId++;
 77                 } while (lookup(queueId) != 0);
 78 mike    1.7  
 79 mike    1.16    return queueId;
 80              }
 81 mday    1.12 
 82 mday    1.24 
 83              
 84 mike    1.16 MessageQueue::MessageQueue(
 85                  const char* name, 
 86                  Boolean async,
 87                  Uint32 queueId)
 88 mday    1.24    : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
 89 mike    1.5  {
 90 mike    1.16     //
 91                  // Copy the name:
 92                  //
 93              
 94 kumpf   1.25    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
 95 kumpf   1.19 
 96 mike    1.16     if (!name)
 97 mday    1.24 	name = ""; 
 98 mike    1.16 
 99                  _name = new char[strlen(name) + 1];
100                  strcpy(_name, name);
101              
102 kumpf   1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
103 kumpf   1.19         "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
104              
105 mike    1.16     //
106                  // Insert into queue table:
107                  //
108 mike    1.5  
109 a.arora 1.38     AutoMutex autoMut(q_table_mut);
110 chip    1.8  
111 mike    1.16     while (!_queueTable.insert(_queueId, this))
112 mike    1.7         ;
113 a.arora 1.38   
114 kumpf   1.25    PEG_METHOD_EXIT();
115 mike    1.5  }
116              
117              MessageQueue::~MessageQueue()
118              {
119                  // ATTN-A: thread safety!
120 kumpf   1.19 
121 kumpf   1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
122 kumpf   1.19 
123 kumpf   1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
124 kumpf   1.19         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
125              
126 a.arora 1.38     
127                  {
128                    AutoMutex autoMut(q_table_mut);
129                    _queueTable.remove(_queueId);
130                  } // mutex unlocks here
131 chip    1.8  	
132 mike    1.16     // Free the name:
133 mday    1.12     
134 mike    1.16     delete [] _name;
135 kumpf   1.19 
136 kumpf   1.25     PEG_METHOD_EXIT();
137 chip    1.8  }
138              
139 kumpf   1.37 void MessageQueue::enqueue(Message* message)
140 mike    1.4  {
141 kumpf   1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
142 kumpf   1.19 
143                  if (!message) 
144                  {
145 kumpf   1.25        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
146 kumpf   1.19         "MessageQueue::enqueue failure");
147 kumpf   1.25        PEG_METHOD_EXIT();
148 mday    1.9         throw NullPointer();
149 kumpf   1.19     }
150 mike    1.7  
151 kumpf   1.31     PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3, 
152                                    String("Queue name: ") + getQueueName() ) ;
153                  Tracer::trace   ( TRC_MESSAGEQUEUESERVICE, 
154                                    Tracer::LEVEL3,
155                                    "Message: [%s, %d]", 
156                                    MessageTypeToString(message->getType()), 
157                                    message->getKey() );
158 mday    1.35     
159 a.arora 1.38     {
160                  AutoMutex autoMut(_mut);
161 mike    1.4      if (_back)
162                  {
163 mday    1.9         _back->_next = message;
164                     message->_prev = _back;
165                     message->_next = 0;
166                     _back = message;
167 mike    1.4      }
168                  else
169                  {
170 mday    1.9         _front = message;
171                     _back = message;
172                     message->_prev = 0;
173                     message->_next = 0;
174 mike    1.4      }
175                  message->_owner = this;
176 mday    1.30        
177 mike    1.4      _count++;
178 kumpf   1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
179 mday    1.30 		  "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
180                     
181 a.arora 1.38     } // mutex unlocks here
182 mday    1.30     
183 mday    1.22     handleEnqueue();
184 kumpf   1.25     PEG_METHOD_EXIT();
185 mday    1.15 }
186              
187 mike    1.7  Message* MessageQueue::dequeue() throw(IPCException)
188 mike    1.4  {
189 kumpf   1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
190              
191 a.arora 1.38     AutoMutex autoMut(_mut);
192 mike    1.4      if (_front)
193                  {
194              	Message* message = _front;
195              	_front = _front->_next;
196              	if (_front)
197              	    _front->_prev = 0;
198              
199              	if (_back == message)
200              	    _back = 0;
201 kumpf   1.25 
202 mike    1.7  	_count--;
203 kumpf   1.25         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
204 kumpf   1.26             "MessageQueue::dequeue _queueId = %d, _count = %d", 
205                          _queueId, _count);
206 kumpf   1.25 
207 mike    1.4  	message->_next = 0;
208              	message->_prev = 0;
209              	message->_owner = 0;
210 kumpf   1.25 
211                      PEG_METHOD_EXIT();
212 mike    1.4  	return message;
213                  }
214 kumpf   1.25 
215                  PEG_METHOD_EXIT();
216 mike    1.4      return 0;
217              }
218 kumpf   1.25 ;
219 mday    1.23 
220 mike    1.4  
221 mike    1.7  void MessageQueue::remove(Message* message) throw(IPCException)
222 mike    1.4  {
223 kumpf   1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
224              
225 mike    1.4      if (!message)
226 kumpf   1.25     {
227                      PEG_METHOD_EXIT();
228 mike    1.4  	throw NullPointer();
229 kumpf   1.25     }
230 mike    1.4  
231                  if (message->_owner != this)
232 kumpf   1.25     {
233                      PEG_METHOD_EXIT();
234 mike    1.4  	throw NoSuchMessageOnQueue();
235 kumpf   1.25     }
236 mike    1.4  
237 a.arora 1.38     {
238                  AutoMutex autoMut(_mut);
239 chip    1.8  
240 mike    1.4      if (message->_next)
241              	message->_next->_prev = message->_prev;
242                  else
243              	_back = message->_prev;
244              
245                  if (message->_prev)
246              	message->_prev->_next = message->_next;
247                  else
248              	_front = message->_next;
249              
250 mike    1.7      _count--;
251 kumpf   1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
252                     "MessageQueue::remove _count = %d", _count);
253              
254 a.arora 1.38     } // mutex unlocks here
255 chip    1.8  
256 mike    1.4      message->_prev = 0;
257                  message->_next = 0;
258                  message->_owner = 0;
259 kumpf   1.25 
260                  PEG_METHOD_EXIT();
261 mike    1.4  }
262              
263 mike    1.7  Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
264 mike    1.4  {
265 a.arora 1.38    AutoMutex autoMut(_mut);
266 chip    1.8  
267 mike    1.4      for (Message* m = front(); m; m = m->getNext())
268                  {
269 mike    1.7         if (m->getType() == type)
270                     {
271 a.arora 1.38          return m;
272 mike    1.7         }
273 mike    1.4      }
274                  return 0;
275              }
276              
277 mike    1.7  Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
278 mike    1.4  {
279 a.arora 1.38    AutoMutex autoMut(_mut);
280 chip    1.8  
281 mike    1.4      for (Message* m = front(); m; m = m->getNext())
282                  {
283 mike    1.7         if (m->getKey() == key)
284                     {
285 a.arora 1.38           return m;
286 mike    1.7         }
287 chip    1.8  
288 mike    1.4      }
289                  return 0;
290              }
291              
292 mike    1.7  void MessageQueue::print(ostream& os) const throw(IPCException)
293 mike    1.4  {
294 a.arora 1.38    AutoMutex autoMut(const_cast<MessageQueue *>(this)->_mut);
295 chip    1.8  
296 mike    1.7     for (const Message* m = front(); m; m = m->getNext())
297 mike    1.4  	m->print(os);
298              }
299              
300 mike    1.7  Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
301 mike    1.4  {
302 a.arora 1.38    AutoMutex autoMut(_mut);
303 chip    1.8  
304 mike    1.4      for (Message* m = front(); m; m = m->getNext())
305                  {
306 mike    1.7         if (m->getType() == type && m->getKey() == key)
307                     {
308 a.arora 1.38          return m;
309 mike    1.7         }
310 mike    1.4      }
311              
312 mike    1.5      return 0;
313              }
314              
315 mike    1.7  void MessageQueue::lock() throw(IPCException)
316 mike    1.5  {
317 mike    1.7     _mut.lock(pegasus_thread_self());
318              }
319 mike    1.5  
320 chip    1.8  void MessageQueue::unlock()
321 mike    1.7  {
322                 _mut.unlock();
323 mike    1.5  }
324              
325 mike    1.7  const char* MessageQueue::getQueueName() const
326 mike    1.5  {
327 kumpf   1.29    return _name;
328 mike    1.5  }
329              
330 mike    1.7  MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
331 mike    1.5  {
332 kumpf   1.19 
333 mike    1.5      MessageQueue* queue = 0;
334 a.arora 1.38     AutoMutex autoMut(q_table_mut);
335 chip    1.8  
336 mike    1.5      if (_queueTable.lookup(queueId, queue))
337 mike    1.7      {
338                     return queue;
339                  }
340 chip    1.8  
341 mike    1.7      // Not found!
342 mike    1.5  
343 kumpf   1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
344 kumpf   1.19         "MessageQueue::lookup failure queueId = %i", queueId);
345              
346 mike    1.4      return 0;
347 mike    1.6  }
348 mike    1.7  
349              
350              MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
351              {
352 kumpf   1.19 
353 mike    1.7     if(name == NULL)
354                    throw NullPointer();
355 a.arora 1.38    AutoMutex autoMut(q_table_mut);
356 chip    1.8  
357 mike    1.7     for(QueueTable::Iterator i = _queueTable.start(); i; i++)
358                 {
359                      // ATTN: Need to decide how many characters to compare in queue names
360 mday    1.27       if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
361 mike    1.7        {
362 a.arora 1.38          return( (MessageQueue *)i.value());
363 mike    1.7        }
364 chip    1.8  
365 mike    1.7     }
366 kumpf   1.19 
367 kumpf   1.25    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
368 kumpf   1.19         "MessageQueue::lookup failure - name = %s", name);
369 chip    1.8  
370 mike    1.7     return 0;
371              }
372              
373 mike    1.6  
374              void MessageQueue::handleEnqueue()
375              {
376              
377 mike    1.4  }
378              
379              PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2