(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.4 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6           // of this software and associated documentation files (the "Software"), to 
  7           // deal in the Software without restriction, including without limitation the 
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
  9           // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11           // 
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 
 13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 
 18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22 mike  1.4 //
 23           // Author: Mike Brasher (mbrasher@bmc.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29 mike  1.5 #include <Pegasus/Common/HashTable.h>
 30 mday  1.6.2.5 #include <Pegasus/Common/IPC.h>
 31 mike  1.4     #include "MessageQueue.h"
 32               
 33               PEGASUS_USING_STD;
 34               
 35               PEGASUS_NAMESPACE_BEGIN
 36               
 37 mike  1.5     typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 38                   QueueTable;
 39               
 40               static QueueTable _queueTable(128);
 41 mday  1.6.2.7 static Mutex q_table_mut = Mutex();
 42 mike  1.5     
 43 mday  1.6.2.6 static Uint32 _GetNextQueueId() throw(IPCException)
 44 mike  1.5     {
 45 mday  1.6.2.5    
 46                  static Uint32 _queueId = 2;
 47               
 48 mday  1.6.2.6    q_table_mut.lock(pegasus_thread_self());
 49 mday  1.6.2.5    
 50                  // Handle wrap-around!
 51                  if (_queueId == 0)
 52                     _queueId = MessageQueue::_CIMOM_Q_ID;
 53                  Uint32 ret = _queueId++;
 54 mday  1.6.2.6    q_table_mut.unlock();
 55 mday  1.6.2.5    
 56                  return ret;
 57 mike  1.5     }
 58 mday  1.6.2.5 
 59               Uint32 MessageQueue::_CIMOM_Q_ID = 1;
 60 mike  1.5     
 61 mday  1.6.2.6 MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)
 62 mike  1.5     {
 63                   // ATTN-A: thread safety!
 64               
 65 mday  1.6.2.6    memset(_name, 0x00, 26);
 66                  q_table_mut.lock(pegasus_thread_self());
 67                  
 68 mike  1.5         while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
 69               	;
 70 mday  1.6.2.6     q_table_mut.unlock();
 71 mike  1.5     }
 72               
 73 mday  1.6.2.6 MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)
 74 mday  1.6.2.3 {
 75                  if(name != NULL)
 76                  {
 77 mday  1.6.2.6       strncpy(_name, name, 25);
 78                     _name[25] = 0x00;
 79 mday  1.6.2.3    }
 80                  
 81                  else
 82 mday  1.6.2.6       memset(_name, 0x00,25);
 83               
 84                  q_table_mut.lock(pegasus_thread_self());
 85                  
 86 mday  1.6.2.3     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
 87                      ;
 88 mday  1.6.2.6     q_table_mut.unlock();
 89                   
 90 mday  1.6.2.3 }
 91               
 92               
 93 mike  1.5     MessageQueue::~MessageQueue()
 94               {
 95                   // ATTN-A: thread safety!
 96 mday  1.6.2.6    q_table_mut.lock(pegasus_thread_self());
 97                  
 98 mike  1.5         _queueTable.remove(_queueId);
 99 mday  1.6.2.6     q_table_mut.unlock();
100                   
101 mike  1.5     }
102               
103 mday  1.6.2.6 void MessageQueue::enqueue(Message* message) throw(IPCException)
104 mike  1.4     {
105                   if (!message)
106               	throw NullPointer();
107               
108 mday  1.6.2.6     _mut.lock(pegasus_thread_self());
109               
110 mike  1.6.2.1     if (getenv("PEGASUS_TRACE"))
111                   {
112 mike  1.6.2.2 	cout << "===== " << getQueueName() << ": ";
113 mike  1.6.2.1 	message->print(cout);
114                   }
115               
116 mike  1.4         if (_back)
117                   {
118               	_back->_next = message;
119               	message->_prev = _back;
120               	message->_next = 0;
121               	_back = message;
122                   }
123                   else
124                   {
125               	_front = message;
126               	_back = message;
127               	message->_prev = 0;
128               	message->_next = 0;
129                   }
130                   message->_owner = this;
131                   _count++;
132 mday  1.6.2.6     _mut.unlock();
133 mike  1.6     
134                   handleEnqueue();
135 mike  1.4     }
136               
137 mday  1.6.2.6 Message* MessageQueue::dequeue() throw(IPCException)
138 mike  1.4     {
139 mday  1.6.2.6    _mut.lock(pegasus_thread_self());
140 mike  1.4         if (_front)
141                   {
142               	Message* message = _front;
143               	_front = _front->_next;
144               	if (_front)
145               	    _front->_prev = 0;
146               
147               	if (_back == message)
148               	    _back = 0;
149 mday  1.6.2.6 	_count--;
150               	_mut.unlock();
151 mike  1.4     	message->_next = 0;
152               	message->_prev = 0;
153               	message->_owner = 0;
154               	return message;
155                   }
156 mday  1.6.2.6     _mut.unlock();
157 mike  1.4         return 0;
158               }
159               
160 mday  1.6.2.6 void MessageQueue::remove(Message* message) throw(IPCException)
161 mike  1.4     {
162                   if (!message)
163               	throw NullPointer();
164               
165                   if (message->_owner != this)
166               	throw NoSuchMessageOnQueue();
167               
168 mday  1.6.2.6     _mut.lock(pegasus_thread_self());
169                   
170 mike  1.4         if (message->_next)
171               	message->_next->_prev = message->_prev;
172                   else
173               	_back = message->_prev;
174               
175                   if (message->_prev)
176               	message->_prev->_next = message->_next;
177                   else
178               	_front = message->_next;
179               
180 mday  1.6.2.6     _count--;
181                   _mut.unlock();
182                   
183 mike  1.4         message->_prev = 0;
184                   message->_next = 0;
185                   message->_owner = 0;
186               }
187               
188 mday  1.6.2.6 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
189 mike  1.4     {
190 mday  1.6.2.6    _mut.lock(pegasus_thread_self());
191                  
192 mike  1.4         for (Message* m = front(); m; m = m->getNext())
193                   {
194 mday  1.6.2.6        if (m->getType() == type)
195                      {
196               	  _mut.unlock();
197               	  return m;
198                      }
199 mike  1.4         }
200 mday  1.6.2.6     _mut.unlock();
201 mike  1.4         return 0;
202               }
203               
204 mday  1.6.2.6 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
205 mike  1.4     {
206 mday  1.6.2.6    _mut.lock(pegasus_thread_self());
207                  
208 mike  1.4         for (Message* m = front(); m; m = m->getNext())
209                   {
210 mday  1.6.2.6        if (m->getKey() == key)
211                      {
212               	  _mut.unlock();
213               	  return m;
214                      }
215                      
216 mike  1.4         }
217 mday  1.6.2.6     _mut.unlock();
218 mike  1.4         return 0;
219               }
220               
221 mday  1.6.2.6 void MessageQueue::print(ostream& os) const throw(IPCException)
222 mike  1.4     {
223 mday  1.6.2.6    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
224                  
225                  for (const Message* m = front(); m; m = m->getNext())
226 mike  1.4     	m->print(os);
227 mday  1.6.2.6    const_cast<MessageQueue *>(this)->_mut.unlock();
228 mike  1.4     }
229               
230 mday  1.6.2.6 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
231 mike  1.4     {
232 mday  1.6.2.6    _mut.lock(pegasus_thread_self());
233                  
234 mike  1.4         for (Message* m = front(); m; m = m->getNext())
235                   {
236 mday  1.6.2.6        if (m->getType() == type && m->getKey() == key)
237                      {
238               	  _mut.unlock();
239               	  return m;
240                      }
241 mike  1.4         }
242 mday  1.6.2.6     _mut.unlock();
243 mike  1.4     
244 mike  1.5         return 0;
245               }
246               
247 mday  1.6.2.6 void MessageQueue::lock() throw(IPCException)
248 mike  1.5     {
249 mday  1.6.2.6    _mut.lock(pegasus_thread_self());
250 mike  1.5     }
251               
252 mday  1.6.2.6 void MessageQueue::unlock() 
253 mike  1.5     {
254 mday  1.6.2.6    _mut.unlock();
255 mike  1.6.2.1 }
256               
257               const char* MessageQueue::getQueueName() const
258               {
259 mday  1.6.2.3    if(_name[0] != 0x00)
260                     return _name;
261                  return "unknown";
262 mike  1.5     }
263               
264 mday  1.6.2.6 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
265 mike  1.5     {
266                   MessageQueue* queue = 0;
267 mday  1.6.2.6     q_table_mut.lock(pegasus_thread_self());
268                   
269 mike  1.5         if (_queueTable.lookup(queueId, queue))
270 mday  1.6.2.6     {
271                      q_table_mut.unlock();
272                      return queue;
273                   }
274                   
275 mike  1.5         // Not found!
276 mday  1.6.2.6 
277                   q_table_mut.unlock();
278                   
279 mike  1.4         return 0;
280 mike  1.6     }
281 mday  1.6.2.3 
282               
283 mday  1.6.2.6 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
284 mday  1.6.2.3 {
285                  if(name == NULL)
286                     throw NullPointer();
287 mday  1.6.2.6    q_table_mut.lock(pegasus_thread_self());
288                  
289 mday  1.6.2.3    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
290                  {
291 kumpf 1.6.2.4         // ATTN: Need to decide how many characters to compare in queue names
292 mday  1.6.2.6       if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
293                     {
294               	 q_table_mut.unlock();
295               	 return( (MessageQueue *)i.value());
296                     }
297                     
298 mday  1.6.2.3    }
299 mday  1.6.2.6    q_table_mut.unlock();
300                     
301 mday  1.6.2.3    return 0;
302               }
303               
304 mike  1.6     
305               void MessageQueue::handleEnqueue()
306               {
307               
308 mike  1.4     }
309               
310               PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2