(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.4 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6           // of this software and associated documentation files (the "Software"), to 
  7           // deal in the Software without restriction, including without limitation the 
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
  9           // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11           // 
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 
 13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 
 18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22 mike  1.4 //
 23           // Author: Mike Brasher (mbrasher@bmc.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29 mike  1.5 #include <Pegasus/Common/HashTable.h>
 30 mike  1.7 #include <Pegasus/Common/IPC.h>
 31 mike  1.4 #include "MessageQueue.h"
 32           
 33           PEGASUS_USING_STD;
 34           
 35           PEGASUS_NAMESPACE_BEGIN
 36           
 37 mike  1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 38               QueueTable;
 39           
 40           static QueueTable _queueTable(128);
 41 mike  1.7 static Mutex q_table_mut = Mutex();
 42 mike  1.5 
 43 mike  1.7 static Uint32 _GetNextQueueId() throw(IPCException)
 44 mike  1.5 {
 45 mike  1.7    
 46              static Uint32 _queueId = 2;
 47              static Mutex _id_mut = Mutex();
 48              
 49              _id_mut.lock(pegasus_thread_self());
 50              
 51              // Handle wrap-around!
 52              if (_queueId == 0)
 53                 _queueId = MessageQueue::_CIMOM_Q_ID;
 54              Uint32 ret = _queueId++;
 55              _id_mut.unlock();
 56              
 57              return ret;
 58           }
 59           
 60           Uint32 MessageQueue::_CIMOM_Q_ID = 1;
 61 mike  1.5 
 62 mike  1.7 MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)
 63           {
 64               // ATTN-A: thread safety!
 65              q_table_mut.lock(pegasus_thread_self());
 66 mike  1.5 
 67 mike  1.7    memset(_name, 0x00, 26);
 68              
 69               while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
 70           	;
 71               q_table_mut.unlock();
 72 mike  1.5 }
 73           
 74 mike  1.7 MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)
 75 mike  1.5 {
 76 mike  1.7    if(name != NULL)
 77              {
 78                 strncpy(_name, name, 25);
 79                 _name[25] = 0x00;
 80              }
 81              
 82              else
 83                 memset(_name, 0x00,25);
 84 mike  1.5 
 85 mike  1.7    q_table_mut.lock(pegasus_thread_self());
 86              
 87 mike  1.5     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
 88 mike  1.7        ;
 89               q_table_mut.unlock();
 90               
 91 mike  1.5 }
 92           
 93 mike  1.7 
 94 mike  1.5 MessageQueue::~MessageQueue()
 95           {
 96               // ATTN-A: thread safety!
 97 mike  1.7    q_table_mut.lock(pegasus_thread_self());
 98              
 99 mike  1.5     _queueTable.remove(_queueId);
100 mike  1.7     q_table_mut.unlock();
101               
102 mike  1.5 }
103           
104 mike  1.7 void MessageQueue::enqueue(Message* message) throw(IPCException)
105 mike  1.4 {
106               if (!message)
107           	throw NullPointer();
108           
109 mike  1.7     _mut.lock(pegasus_thread_self());
110           
111               if (getenv("PEGASUS_TRACE"))
112               {
113           	cout << "===== " << getQueueName() << ": ";
114           	message->print(cout);
115               }
116           
117 mike  1.4     if (_back)
118               {
119           	_back->_next = message;
120           	message->_prev = _back;
121           	message->_next = 0;
122           	_back = message;
123               }
124               else
125               {
126           	_front = message;
127           	_back = message;
128           	message->_prev = 0;
129           	message->_next = 0;
130               }
131               message->_owner = this;
132               _count++;
133 mike  1.7     _mut.unlock();
134 mike  1.6 
135               handleEnqueue();
136 mike  1.4 }
137           
138 mike  1.7 Message* MessageQueue::dequeue() throw(IPCException)
139 mike  1.4 {
140 mike  1.7    _mut.lock(pegasus_thread_self());
141 mike  1.4     if (_front)
142               {
143           	Message* message = _front;
144           	_front = _front->_next;
145           	if (_front)
146           	    _front->_prev = 0;
147           
148           	if (_back == message)
149           	    _back = 0;
150 mike  1.7 	_count--;
151           	_mut.unlock();
152 mike  1.4 	message->_next = 0;
153           	message->_prev = 0;
154           	message->_owner = 0;
155           	return message;
156               }
157 mike  1.7     _mut.unlock();
158 mike  1.4     return 0;
159           }
160           
161 mike  1.7 void MessageQueue::remove(Message* message) throw(IPCException)
162 mike  1.4 {
163               if (!message)
164           	throw NullPointer();
165           
166               if (message->_owner != this)
167           	throw NoSuchMessageOnQueue();
168           
169 mike  1.7     _mut.lock(pegasus_thread_self());
170               
171 mike  1.4     if (message->_next)
172           	message->_next->_prev = message->_prev;
173               else
174           	_back = message->_prev;
175           
176               if (message->_prev)
177           	message->_prev->_next = message->_next;
178               else
179           	_front = message->_next;
180           
181 mike  1.7     _count--;
182               _mut.unlock();
183               
184 mike  1.4     message->_prev = 0;
185               message->_next = 0;
186               message->_owner = 0;
187           }
188           
189 mike  1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
190 mike  1.4 {
191 mike  1.7    _mut.lock(pegasus_thread_self());
192              
193 mike  1.4     for (Message* m = front(); m; m = m->getNext())
194               {
195 mike  1.7        if (m->getType() == type)
196                  {
197           	  _mut.unlock();
198           	  return m;
199                  }
200 mike  1.4     }
201 mike  1.7     _mut.unlock();
202 mike  1.4     return 0;
203           }
204           
205 mike  1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
206 mike  1.4 {
207 mike  1.7    _mut.lock(pegasus_thread_self());
208              
209 mike  1.4     for (Message* m = front(); m; m = m->getNext())
210               {
211 mike  1.7        if (m->getKey() == key)
212                  {
213           	  _mut.unlock();
214           	  return m;
215                  }
216                  
217 mike  1.4     }
218 mike  1.7     _mut.unlock();
219 mike  1.4     return 0;
220           }
221           
222 mike  1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
223 mike  1.4 {
224 mike  1.7    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
225              
226              for (const Message* m = front(); m; m = m->getNext())
227 mike  1.4 	m->print(os);
228 mike  1.7    const_cast<MessageQueue *>(this)->_mut.unlock();
229 mike  1.4 }
230           
231 mike  1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
232 mike  1.4 {
233 mike  1.7    _mut.lock(pegasus_thread_self());
234              
235 mike  1.4     for (Message* m = front(); m; m = m->getNext())
236               {
237 mike  1.7        if (m->getType() == type && m->getKey() == key)
238                  {
239           	  _mut.unlock();
240           	  return m;
241                  }
242 mike  1.4     }
243 mike  1.7     _mut.unlock();
244 mike  1.4 
245 mike  1.5     return 0;
246           }
247           
248 mike  1.7 void MessageQueue::lock() throw(IPCException)
249 mike  1.5 {
250 mike  1.7    _mut.lock(pegasus_thread_self());
251           }
252 mike  1.5 
253 mike  1.7 void MessageQueue::unlock() 
254           {
255              _mut.unlock();
256 mike  1.5 }
257           
258 mike  1.7 const char* MessageQueue::getQueueName() const
259 mike  1.5 {
260 mike  1.7    if(_name[0] != 0x00)
261                 return _name;
262              return "unknown";
263 mike  1.5 }
264           
265 mike  1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
266 mike  1.5 {
267               MessageQueue* queue = 0;
268 mike  1.7     q_table_mut.lock(pegasus_thread_self());
269               
270 mike  1.5     if (_queueTable.lookup(queueId, queue))
271 mike  1.7     {
272                  q_table_mut.unlock();
273                  return queue;
274               }
275               
276               // Not found!
277 mike  1.5 
278 mike  1.7     q_table_mut.unlock();
279               
280 mike  1.4     return 0;
281 mike  1.6 }
282 mike  1.7 
283           
284           MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
285           {
286              if(name == NULL)
287                 throw NullPointer();
288              q_table_mut.lock(pegasus_thread_self());
289              
290              for(QueueTable::Iterator i = _queueTable.start(); i; i++)
291              {
292                   // ATTN: Need to decide how many characters to compare in queue names
293                 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
294                 {
295           	 q_table_mut.unlock();
296           	 return( (MessageQueue *)i.value());
297                 }
298                 
299              }
300              q_table_mut.unlock();
301                 
302              return 0;
303 mike  1.7 }
304           
305 mike  1.6 
306           void MessageQueue::handleEnqueue()
307           {
308           
309 mike  1.4 }
310           
311           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2