(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.4 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6 chip  1.8 // of this software and associated documentation files (the "Software"), to
  7           // deal in the Software without restriction, including without limitation the
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9 mike  1.4 // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11 chip  1.8 //
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 13 mike  1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15 chip  1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 18 mike  1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22           //
 23           // Author: Mike Brasher (mbrasher@bmc.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29 mike  1.5 #include <Pegasus/Common/HashTable.h>
 30 mike  1.7 #include <Pegasus/Common/IPC.h>
 31 mike  1.4 #include "MessageQueue.h"
 32           
 33           PEGASUS_USING_STD;
 34           
 35           PEGASUS_NAMESPACE_BEGIN
 36           
 37 mike  1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 38               QueueTable;
 39           
 40           static QueueTable _queueTable(128);
 41 mike  1.7 static Mutex q_table_mut = Mutex();
 42 mike  1.5 
 43 mike  1.7 static Uint32 _GetNextQueueId() throw(IPCException)
 44 mike  1.5 {
 45 chip  1.8 
 46 mike  1.7    static Uint32 _queueId = 2;
 47              static Mutex _id_mut = Mutex();
 48 chip  1.8 
 49 mike  1.7    _id_mut.lock(pegasus_thread_self());
 50 chip  1.8 
 51 mike  1.7    // Handle wrap-around!
 52              if (_queueId == 0)
 53                 _queueId = MessageQueue::_CIMOM_Q_ID;
 54              Uint32 ret = _queueId++;
 55              _id_mut.unlock();
 56 chip  1.8 
 57 mike  1.7    return ret;
 58           }
 59           
 60           Uint32 MessageQueue::_CIMOM_Q_ID = 1;
 61 mike  1.5 
 62 chip  1.8 MessageQueue::MessageQueue(const char * name)
 63           	: _mut( ), _count(0), _front(0), _back(0),
 64           	_workThread(MessageQueue::workThread, this, false),
 65           	_workSemaphore(0)
 66 mike  1.5 {
 67 mike  1.7    if(name != NULL)
 68              {
 69                 strncpy(_name, name, 25);
 70                 _name[25] = 0x00;
 71              }
 72              else
 73                 memset(_name, 0x00,25);
 74 mike  1.5 
 75 chip  1.8     q_table_mut.lock(pegasus_thread_self());
 76           
 77 mike  1.5     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
 78 mike  1.7        ;
 79               q_table_mut.unlock();
 80 chip  1.8 
 81           	_workThread.run();
 82 mike  1.5 }
 83           
 84           MessageQueue::~MessageQueue()
 85           {
 86               // ATTN-A: thread safety!
 87 chip  1.8     q_table_mut.lock(pegasus_thread_self());
 88           
 89 mike  1.5     _queueTable.remove(_queueId);
 90 mike  1.7     q_table_mut.unlock();
 91 chip  1.8 	
 92           	_workThread.cancel();	// cancel thread
 93           	_workSemaphore.signal();// wake thread
 94           	_workThread.join();		// wait for thread to complete
 95           }
 96           
 97           PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)
 98           {
 99           	// get thread from argument
100           	Thread * thread = (Thread *)arg;
101           
102           	PEGASUS_ASSERT(thread != 0);
103           
104           	// get message queue from thread
105           	MessageQueue * queue = (MessageQueue *)thread->get_parm();
106           	
107           	PEGASUS_ASSERT(queue != 0);
108           
109           	while(true)
110           	{
111 mday  1.9 	   thread->sleep(1);
112           	   continue;
113           	   
114 chip  1.8 		// wait for work
115 mday  1.9 //		queue->_workSemaphore.wait();
116           
117 chip  1.8 		// stop the thread when the message queue has been destroyed.
118           		// ATTN: should check the thread cancel flag that is not yet exposed!
119           		if(MessageQueue::lookup(queue->_queueId) == 0)
120           		{
121           			break;
122           		}
123           		
124           		// ensure the queue has a message before dispatching
125           		if(queue->_count != 0)
126           		{
127           			queue->handleEnqueue();
128           		}
129 mday  1.9 		else 
130           		{
131           		   thread->sleep(0);
132           		}
133           		
134 chip  1.8 	}
135           
136           	thread->exit_self(PEGASUS_THREAD_RETURN(0));
137           	
138           	return(0);
139 mike  1.5 }
140           
141 mike  1.7 void MessageQueue::enqueue(Message* message) throw(IPCException)
142 mike  1.4 {
143               if (!message)
144 mday  1.9        throw NullPointer();
145               
146 mike  1.4 
147 mike  1.7 
148               if (getenv("PEGASUS_TRACE"))
149               {
150 mday  1.9        cout << "===== " << getQueueName() << ": ";
151                  message->print(cout);
152 mike  1.7     }
153           
154 mday  1.9     _mut.lock(pegasus_thread_self());
155 mike  1.4     if (_back)
156               {
157 mday  1.9        _back->_next = message;
158                  message->_prev = _back;
159                  message->_next = 0;
160                  _back = message;
161 mike  1.4     }
162               else
163               {
164 mday  1.9        _front = message;
165                  _back = message;
166                  message->_prev = 0;
167                  message->_next = 0;
168 mike  1.4     }
169               message->_owner = this;
170               _count++;
171 mike  1.7     _mut.unlock();
172 mday  1.9     
173           //    _workSemaphore.signal();
174 mike  1.6 
175 mday  1.9     handleEnqueue();
176               
177 mike  1.4 }
178           
179 mike  1.7 Message* MessageQueue::dequeue() throw(IPCException)
180 mike  1.4 {
181 mike  1.7    _mut.lock(pegasus_thread_self());
182 mike  1.4     if (_front)
183               {
184           	Message* message = _front;
185           	_front = _front->_next;
186           	if (_front)
187           	    _front->_prev = 0;
188           
189           	if (_back == message)
190           	    _back = 0;
191 mike  1.7 	_count--;
192           	_mut.unlock();
193 mike  1.4 	message->_next = 0;
194           	message->_prev = 0;
195           	message->_owner = 0;
196           	return message;
197               }
198 mike  1.7     _mut.unlock();
199 mike  1.4     return 0;
200           }
201           
202 mike  1.7 void MessageQueue::remove(Message* message) throw(IPCException)
203 mike  1.4 {
204               if (!message)
205           	throw NullPointer();
206           
207               if (message->_owner != this)
208           	throw NoSuchMessageOnQueue();
209           
210 mike  1.7     _mut.lock(pegasus_thread_self());
211 chip  1.8 
212 mike  1.4     if (message->_next)
213           	message->_next->_prev = message->_prev;
214               else
215           	_back = message->_prev;
216           
217               if (message->_prev)
218           	message->_prev->_next = message->_next;
219               else
220           	_front = message->_next;
221           
222 mike  1.7     _count--;
223               _mut.unlock();
224 chip  1.8 
225 mike  1.4     message->_prev = 0;
226               message->_next = 0;
227               message->_owner = 0;
228           }
229           
230 mike  1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
231 mike  1.4 {
232 mike  1.7    _mut.lock(pegasus_thread_self());
233 chip  1.8 
234 mike  1.4     for (Message* m = front(); m; m = m->getNext())
235               {
236 mike  1.7        if (m->getType() == type)
237                  {
238           	  _mut.unlock();
239           	  return m;
240                  }
241 mike  1.4     }
242 mike  1.7     _mut.unlock();
243 mike  1.4     return 0;
244           }
245           
246 mike  1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
247 mike  1.4 {
248 mike  1.7    _mut.lock(pegasus_thread_self());
249 chip  1.8 
250 mike  1.4     for (Message* m = front(); m; m = m->getNext())
251               {
252 mike  1.7        if (m->getKey() == key)
253                  {
254           	  _mut.unlock();
255           	  return m;
256                  }
257 chip  1.8 
258 mike  1.4     }
259 mike  1.7     _mut.unlock();
260 mike  1.4     return 0;
261           }
262           
263 mike  1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
264 mike  1.4 {
265 mike  1.7    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
266 chip  1.8 
267 mike  1.7    for (const Message* m = front(); m; m = m->getNext())
268 mike  1.4 	m->print(os);
269 mike  1.7    const_cast<MessageQueue *>(this)->_mut.unlock();
270 mike  1.4 }
271           
272 mike  1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
273 mike  1.4 {
274 mike  1.7    _mut.lock(pegasus_thread_self());
275 chip  1.8 
276 mike  1.4     for (Message* m = front(); m; m = m->getNext())
277               {
278 mike  1.7        if (m->getType() == type && m->getKey() == key)
279                  {
280           	  _mut.unlock();
281           	  return m;
282                  }
283 mike  1.4     }
284 mike  1.7     _mut.unlock();
285 mike  1.4 
286 mike  1.5     return 0;
287           }
288           
289 mike  1.7 void MessageQueue::lock() throw(IPCException)
290 mike  1.5 {
291 mike  1.7    _mut.lock(pegasus_thread_self());
292           }
293 mike  1.5 
294 chip  1.8 void MessageQueue::unlock()
295 mike  1.7 {
296              _mut.unlock();
297 mike  1.5 }
298           
299 mike  1.7 const char* MessageQueue::getQueueName() const
300 mike  1.5 {
301 mike  1.7    if(_name[0] != 0x00)
302                 return _name;
303              return "unknown";
304 mike  1.5 }
305           
306 mike  1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
307 mike  1.5 {
308               MessageQueue* queue = 0;
309 mike  1.7     q_table_mut.lock(pegasus_thread_self());
310 chip  1.8 
311 mike  1.5     if (_queueTable.lookup(queueId, queue))
312 mike  1.7     {
313                  q_table_mut.unlock();
314                  return queue;
315               }
316 chip  1.8 
317 mike  1.7     // Not found!
318 mike  1.5 
319 mike  1.7     q_table_mut.unlock();
320 chip  1.8 
321 mike  1.4     return 0;
322 mike  1.6 }
323 mike  1.7 
324           
325           MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
326           {
327              if(name == NULL)
328                 throw NullPointer();
329              q_table_mut.lock(pegasus_thread_self());
330 chip  1.8 
331 mike  1.7    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
332              {
333                   // ATTN: Need to decide how many characters to compare in queue names
334                 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
335                 {
336           	 q_table_mut.unlock();
337           	 return( (MessageQueue *)i.value());
338                 }
339 chip  1.8 
340 mike  1.7    }
341              q_table_mut.unlock();
342 chip  1.8 
343 mike  1.7    return 0;
344           }
345           
346 mike  1.6 
347           void MessageQueue::handleEnqueue()
348           {
349           
350 mike  1.4 }
351           
352           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2