(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.4 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6 chip  1.8 // of this software and associated documentation files (the "Software"), to
  7           // deal in the Software without restriction, including without limitation the
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9 mike  1.4 // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11 chip  1.8 //
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 13 mike  1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15 chip  1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 18 mike  1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22           //
 23           // Author: Mike Brasher (mbrasher@bmc.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29 mike  1.5 #include <Pegasus/Common/HashTable.h>
 30 mike  1.7 #include <Pegasus/Common/IPC.h>
 31 kumpf 1.19 #include <Pegasus/Common/Tracer.h>
 32 mike  1.4  #include "MessageQueue.h"
 33            
 34            PEGASUS_USING_STD;
 35            
 36            PEGASUS_NAMESPACE_BEGIN
 37            
 38 mike  1.5  typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 39                QueueTable;
 40            
 41            static QueueTable _queueTable(128);
 42 mike  1.7  static Mutex q_table_mut = Mutex();
 43 mike  1.5  
 44 mike  1.16 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 45 mike  1.5  {
 46 mday  1.22    static Uint32 _nextQueueId = 2;
 47 mike  1.16 
 48               //
 49               // Lock mutex:
 50               //
 51 chip  1.8  
 52 mike  1.7     static Mutex _id_mut = Mutex();
 53 mike  1.16    _id_mut.lock(pegasus_thread_self());
 54 chip  1.8  
 55 mike  1.16    // Assign next queue id. Handle wrap around and never assign zero as
 56               // a queue id:
 57 chip  1.8  
 58 mike  1.16    if (_nextQueueId == 0)
 59 mday  1.22       _nextQueueId = 2;
 60 chip  1.8  
 61 mike  1.16    Uint32 queueId = _nextQueueId++;
 62 mike  1.7  
 63 mike  1.16    //
 64               // Unlock mutex:
 65               //
 66 mike  1.5  
 67 mike  1.16    _id_mut.unlock();
 68 mday  1.12 
 69 mike  1.16    return queueId;
 70            }
 71 mday  1.12 
 72 mike  1.16 MessageQueue::MessageQueue(
 73                const char* name, 
 74                Boolean async,
 75                Uint32 queueId)
 76 mday  1.22     : _queueId(queueId), _count(0), _front(0), _back(0), _async(async)
 77 mike  1.5  {
 78 mike  1.16     //
 79                // Copy the name:
 80                //
 81            
 82 kumpf 1.19    PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
 83            
 84 mike  1.16     if (!name)
 85            	name = "";
 86            
 87                _name = new char[strlen(name) + 1];
 88                strcpy(_name, name);
 89            
 90 kumpf 1.19     Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
 91                    "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
 92            
 93 mike  1.16     //
 94                // Insert into queue table:
 95                //
 96 mike  1.5  
 97 chip  1.8      q_table_mut.lock(pegasus_thread_self());
 98            
 99 mike  1.16     while (!_queueTable.insert(_queueId, this))
100 mike  1.7         ;
101 mike  1.16 
102 mike  1.7      q_table_mut.unlock();
103 chip  1.8  
104 mday  1.12     
105 kumpf 1.19    PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
106 mike  1.5  }
107            
108            MessageQueue::~MessageQueue()
109            {
110                // ATTN-A: thread safety!
111 kumpf 1.19 
112                PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
113            
114                Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
115                    "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
116            
117 chip  1.8      q_table_mut.lock(pegasus_thread_self());
118            
119 mike  1.5      _queueTable.remove(_queueId);
120 mike  1.7      q_table_mut.unlock();
121 chip  1.8  	
122 mike  1.16     // Free the name:
123 mday  1.12     
124 mike  1.16     delete [] _name;
125 kumpf 1.19 
126                PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
127 chip  1.8  }
128            
129 mike  1.7  void MessageQueue::enqueue(Message* message) throw(IPCException)
130 mike  1.4  {
131 kumpf 1.19 
132                if (!message) 
133                {
134                   Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
135                    "MessageQueue::enqueue failure");
136 mday  1.9         throw NullPointer();
137 kumpf 1.19     }
138 mike  1.7  
139                if (getenv("PEGASUS_TRACE"))
140                {
141 mday  1.9         cout << "===== " << getQueueName() << ": ";
142                   message->print(cout);
143 mike  1.7      }
144            
145 mday  1.9      _mut.lock(pegasus_thread_self());
146 mike  1.4      if (_back)
147                {
148 mday  1.9         _back->_next = message;
149                   message->_prev = _back;
150                   message->_next = 0;
151                   _back = message;
152 mike  1.4      }
153                else
154                {
155 mday  1.9         _front = message;
156                   _back = message;
157                   message->_prev = 0;
158                   message->_next = 0;
159 mike  1.4      }
160                message->_owner = this;
161                _count++;
162 mday  1.20        
163 mike  1.7      _mut.unlock();
164 chip  1.11 
165 mday  1.22     handleEnqueue();
166 kumpf 1.19 
167 mday  1.15 }
168            
169            
170 mike  1.7  Message* MessageQueue::dequeue() throw(IPCException)
171 mike  1.4  {
172 mike  1.7     _mut.lock(pegasus_thread_self());
173 mike  1.4      if (_front)
174                {
175            	Message* message = _front;
176            	_front = _front->_next;
177            	if (_front)
178            	    _front->_prev = 0;
179            
180            	if (_back == message)
181            	    _back = 0;
182 mike  1.7  	_count--;
183            	_mut.unlock();
184 mike  1.4  	message->_next = 0;
185            	message->_prev = 0;
186            	message->_owner = 0;
187            	return message;
188                }
189 mike  1.7      _mut.unlock();
190 mike  1.4      return 0;
191            }
192            
193 mike  1.7  void MessageQueue::remove(Message* message) throw(IPCException)
194 mike  1.4  {
195                if (!message)
196            	throw NullPointer();
197            
198                if (message->_owner != this)
199            	throw NoSuchMessageOnQueue();
200            
201 mike  1.7      _mut.lock(pegasus_thread_self());
202 chip  1.8  
203 mike  1.4      if (message->_next)
204            	message->_next->_prev = message->_prev;
205                else
206            	_back = message->_prev;
207            
208                if (message->_prev)
209            	message->_prev->_next = message->_next;
210                else
211            	_front = message->_next;
212            
213 mike  1.7      _count--;
214                _mut.unlock();
215 chip  1.8  
216 mike  1.4      message->_prev = 0;
217                message->_next = 0;
218                message->_owner = 0;
219            }
220            
221 mike  1.7  Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
222 mike  1.4  {
223 mike  1.7     _mut.lock(pegasus_thread_self());
224 chip  1.8  
225 mike  1.4      for (Message* m = front(); m; m = m->getNext())
226                {
227 mike  1.7         if (m->getType() == type)
228                   {
229            	  _mut.unlock();
230            	  return m;
231                   }
232 mike  1.4      }
233 mike  1.7      _mut.unlock();
234 mike  1.4      return 0;
235            }
236            
237 mike  1.7  Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
238 mike  1.4  {
239 mike  1.7     _mut.lock(pegasus_thread_self());
240 chip  1.8  
241 mike  1.4      for (Message* m = front(); m; m = m->getNext())
242                {
243 mike  1.7         if (m->getKey() == key)
244                   {
245            	  _mut.unlock();
246            	  return m;
247                   }
248 chip  1.8  
249 mike  1.4      }
250 mike  1.7      _mut.unlock();
251 mike  1.4      return 0;
252            }
253            
254 mike  1.7  void MessageQueue::print(ostream& os) const throw(IPCException)
255 mike  1.4  {
256 mike  1.7     const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
257 chip  1.8  
258 mike  1.7     for (const Message* m = front(); m; m = m->getNext())
259 mike  1.4  	m->print(os);
260 mike  1.7     const_cast<MessageQueue *>(this)->_mut.unlock();
261 mike  1.4  }
262            
263 mike  1.7  Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
264 mike  1.4  {
265 mike  1.7     _mut.lock(pegasus_thread_self());
266 chip  1.8  
267 mike  1.4      for (Message* m = front(); m; m = m->getNext())
268                {
269 mike  1.7         if (m->getType() == type && m->getKey() == key)
270                   {
271            	  _mut.unlock();
272            	  return m;
273                   }
274 mike  1.4      }
275 mike  1.7      _mut.unlock();
276 mike  1.4  
277 mike  1.5      return 0;
278            }
279            
280 mike  1.7  void MessageQueue::lock() throw(IPCException)
281 mike  1.5  {
282 mike  1.7     _mut.lock(pegasus_thread_self());
283            }
284 mike  1.5  
285 chip  1.8  void MessageQueue::unlock()
286 mike  1.7  {
287               _mut.unlock();
288 mike  1.5  }
289            
290 mike  1.7  const char* MessageQueue::getQueueName() const
291 mike  1.5  {
292 mike  1.7     if(_name[0] != 0x00)
293                  return _name;
294               return "unknown";
295 mike  1.5  }
296            
297 mike  1.7  MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
298 mike  1.5  {
299 kumpf 1.19 
300 mike  1.5      MessageQueue* queue = 0;
301 mike  1.7      q_table_mut.lock(pegasus_thread_self());
302 chip  1.8  
303 mike  1.5      if (_queueTable.lookup(queueId, queue))
304 mike  1.7      {
305                   q_table_mut.unlock();
306                   return queue;
307                }
308 chip  1.8  
309 mike  1.7      // Not found!
310 mike  1.5  
311 mike  1.7      q_table_mut.unlock();
312 chip  1.8  
313 kumpf 1.19     Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
314                    "MessageQueue::lookup failure queueId = %i", queueId);
315            
316 mike  1.4      return 0;
317 mike  1.6  }
318 mike  1.7  
319            
320            MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
321            {
322 kumpf 1.19 
323 mike  1.7     if(name == NULL)
324                  throw NullPointer();
325               q_table_mut.lock(pegasus_thread_self());
326 chip  1.8  
327 mike  1.7     for(QueueTable::Iterator i = _queueTable.start(); i; i++)
328               {
329                    // ATTN: Need to decide how many characters to compare in queue names
330                  if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
331                  {
332            	 q_table_mut.unlock();
333            	 return( (MessageQueue *)i.value());
334                  }
335 chip  1.8  
336 mike  1.7     }
337               q_table_mut.unlock();
338 kumpf 1.19 
339               Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
340                    "MessageQueue::lookup failure - name = %s", name);
341 chip  1.8  
342 mike  1.7     return 0;
343            }
344            
345 mike  1.6  
346            void MessageQueue::handleEnqueue()
347            {
348            
349 mike  1.4  }
350            
351            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2