1 mike 1.4 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
6 chip 1.8 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
9 mike 1.4 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
|
11 chip 1.8 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
13 mike 1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
15 chip 1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
18 mike 1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 //
23 // Author: Mike Brasher (mbrasher@bmc.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
|
29 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
30 mike 1.7 #include <Pegasus/Common/IPC.h>
|
31 mike 1.4 #include "MessageQueue.h"
32
33 PEGASUS_USING_STD;
34
35 PEGASUS_NAMESPACE_BEGIN
36
|
37 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
38 QueueTable;
39
40 static QueueTable _queueTable(128);
|
41 mike 1.7 static Mutex q_table_mut = Mutex();
|
42 mike 1.5
|
43 mike 1.7 static Uint32 _GetNextQueueId() throw(IPCException)
|
44 mike 1.5 {
|
45 chip 1.8
|
46 mike 1.7 static Uint32 _queueId = 2;
47 static Mutex _id_mut = Mutex();
|
48 chip 1.8
|
49 mike 1.7 _id_mut.lock(pegasus_thread_self());
|
50 chip 1.8
|
51 mike 1.7 // Handle wrap-around!
52 if (_queueId == 0)
53 _queueId = MessageQueue::_CIMOM_Q_ID;
54 Uint32 ret = _queueId++;
55 _id_mut.unlock();
|
56 chip 1.8
|
57 mike 1.7 return ret;
58 }
59
60 Uint32 MessageQueue::_CIMOM_Q_ID = 1;
|
61 mike 1.5
|
62 kumpf 1.13 MessageQueue::MessageQueue(const char * name, Boolean async)
|
63 chip 1.8 : _mut( ), _count(0), _front(0), _back(0),
|
64 mday 1.12 _async(async),
65 _workThread(MessageQueue::workThread, this, false),
66 _workSemaphore(0)
67
68
|
69 mike 1.5 {
|
70 mike 1.7 if(name != NULL)
71 {
72 strncpy(_name, name, 25);
73 _name[25] = 0x00;
74 }
75 else
76 memset(_name, 0x00,25);
|
77 mike 1.5
|
78 chip 1.8 q_table_mut.lock(pegasus_thread_self());
79
|
80 mike 1.5 while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
|
81 mike 1.7 ;
82 q_table_mut.unlock();
|
83 chip 1.8
|
84 mday 1.12
85 if(_async == true)
86 {
87 _workThread.run();
88 }
89
|
90 mike 1.5 }
91
92 MessageQueue::~MessageQueue()
93 {
94 // ATTN-A: thread safety!
|
95 chip 1.8 q_table_mut.lock(pegasus_thread_self());
96
|
97 mike 1.5 _queueTable.remove(_queueId);
|
98 mike 1.7 q_table_mut.unlock();
|
99 chip 1.8
|
100 mday 1.12 if(_async == true)
101 {
102 _workThread.cancel(); // cancel thread
103 _workSemaphore.signal();// wake thread
104 _workThread.join(); // wait for thread to complete
105 }
106
|
107 chip 1.8 }
108
|
109 mday 1.12
|
110 chip 1.8 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)
111 {
112 // get thread from argument
113 Thread * thread = (Thread *)arg;
114
115 PEGASUS_ASSERT(thread != 0);
116
117 // get message queue from thread
118 MessageQueue * queue = (MessageQueue *)thread->get_parm();
119
120 PEGASUS_ASSERT(queue != 0);
121
122 while(true)
123 {
|
124 mday 1.14 if(thread->is_cancelled())
125 {
126 break;
127 }
128
129 // wait for work
130 queue->_workSemaphore.wait();
131
132 // stop the thread when the message queue has been destroyed.
133 // ATTN: should check the thread cancel flag that is not yet exposed!
134 if(MessageQueue::lookup(queue->_queueId) == 0)
135 {
136 break;
137 }
138
139 // ensure the queue has a message before dispatching
140 if(queue->_count != 0)
141 {
142 queue->handleEnqueue();
143 }
|
144 chip 1.8 }
145
146 thread->exit_self(PEGASUS_THREAD_RETURN(0));
147
148 return(0);
|
149 mike 1.5 }
150
|
151 mike 1.7 void MessageQueue::enqueue(Message* message) throw(IPCException)
|
152 mike 1.4 {
153 if (!message)
|
154 mday 1.9 throw NullPointer();
|
155 mike 1.7
156 if (getenv("PEGASUS_TRACE"))
157 {
|
158 mday 1.9 cout << "===== " << getQueueName() << ": ";
159 message->print(cout);
|
160 mike 1.7 }
161
|
162 mday 1.9 _mut.lock(pegasus_thread_self());
|
163 mike 1.4 if (_back)
164 {
|
165 mday 1.9 _back->_next = message;
166 message->_prev = _back;
167 message->_next = 0;
168 _back = message;
|
169 mike 1.4 }
170 else
171 {
|
172 mday 1.9 _front = message;
173 _back = message;
174 message->_prev = 0;
175 message->_next = 0;
|
176 mike 1.4 }
177 message->_owner = this;
178 _count++;
|
179 mday 1.14 if( _async == true )
180 {
181 _workSemaphore.signal();
182 }
183
|
184 mike 1.7 _mut.unlock();
|
185 chip 1.11
|
186 mday 1.14 if(_async == false )
|
187 mday 1.12 handleEnqueue();
|
188 mike 1.4 }
189
|
190 mday 1.15
191 Boolean MessageQueue::accept_async(Message *message) throw(IPCException)
192 {
193 if(! message)
194 throw NullPointer();
195 if(_async == false)
196 return false;
197
198 if (getenv("PEGASUS_TRACE"))
199 {
200 cout << "==~ accept() ~== " << getQueueName() << ": ";
201 message->print(cout);
202 }
203
204
205 // in derived methods, evaluate the message here to determine
206 // whether or not you can handle it.
207
208 _mut.lock(pegasus_thread_self());
209 if (_back)
210 {
211 mday 1.15 _back->_next = message;
212 message->_prev = _back;
213 message->_next = 0;
214 _back = message;
215 }
216 else
217 {
218 _front = message;
219 _back = message;
220 message->_prev = 0;
221 message->_next = 0;
222 }
223 message->_owner = this;
224 _count++;
225 _workSemaphore.signal();
226 _mut.unlock();
227
228 return true;
229 }
230
231
|
232 mike 1.7 Message* MessageQueue::dequeue() throw(IPCException)
|
233 mike 1.4 {
|
234 mike 1.7 _mut.lock(pegasus_thread_self());
|
235 mike 1.4 if (_front)
236 {
237 Message* message = _front;
238 _front = _front->_next;
239 if (_front)
240 _front->_prev = 0;
241
242 if (_back == message)
243 _back = 0;
|
244 mike 1.7 _count--;
245 _mut.unlock();
|
246 mike 1.4 message->_next = 0;
247 message->_prev = 0;
248 message->_owner = 0;
249 return message;
250 }
|
251 mike 1.7 _mut.unlock();
|
252 mike 1.4 return 0;
253 }
254
|
255 mike 1.7 void MessageQueue::remove(Message* message) throw(IPCException)
|
256 mike 1.4 {
257 if (!message)
258 throw NullPointer();
259
260 if (message->_owner != this)
261 throw NoSuchMessageOnQueue();
262
|
263 mike 1.7 _mut.lock(pegasus_thread_self());
|
264 chip 1.8
|
265 mike 1.4 if (message->_next)
266 message->_next->_prev = message->_prev;
267 else
268 _back = message->_prev;
269
270 if (message->_prev)
271 message->_prev->_next = message->_next;
272 else
273 _front = message->_next;
274
|
275 mike 1.7 _count--;
276 _mut.unlock();
|
277 chip 1.8
|
278 mike 1.4 message->_prev = 0;
279 message->_next = 0;
280 message->_owner = 0;
281 }
282
|
283 mike 1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
284 mike 1.4 {
|
285 mike 1.7 _mut.lock(pegasus_thread_self());
|
286 chip 1.8
|
287 mike 1.4 for (Message* m = front(); m; m = m->getNext())
288 {
|
289 mike 1.7 if (m->getType() == type)
290 {
291 _mut.unlock();
292 return m;
293 }
|
294 mike 1.4 }
|
295 mike 1.7 _mut.unlock();
|
296 mike 1.4 return 0;
297 }
298
|
299 mike 1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
300 mike 1.4 {
|
301 mike 1.7 _mut.lock(pegasus_thread_self());
|
302 chip 1.8
|
303 mike 1.4 for (Message* m = front(); m; m = m->getNext())
304 {
|
305 mike 1.7 if (m->getKey() == key)
306 {
307 _mut.unlock();
308 return m;
309 }
|
310 chip 1.8
|
311 mike 1.4 }
|
312 mike 1.7 _mut.unlock();
|
313 mike 1.4 return 0;
314 }
315
|
316 mike 1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
|
317 mike 1.4 {
|
318 mike 1.7 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
|
319 chip 1.8
|
320 mike 1.7 for (const Message* m = front(); m; m = m->getNext())
|
321 mike 1.4 m->print(os);
|
322 mike 1.7 const_cast<MessageQueue *>(this)->_mut.unlock();
|
323 mike 1.4 }
324
|
325 mike 1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
326 mike 1.4 {
|
327 mike 1.7 _mut.lock(pegasus_thread_self());
|
328 chip 1.8
|
329 mike 1.4 for (Message* m = front(); m; m = m->getNext())
330 {
|
331 mike 1.7 if (m->getType() == type && m->getKey() == key)
332 {
333 _mut.unlock();
334 return m;
335 }
|
336 mike 1.4 }
|
337 mike 1.7 _mut.unlock();
|
338 mike 1.4
|
339 mike 1.5 return 0;
340 }
341
|
342 mike 1.7 void MessageQueue::lock() throw(IPCException)
|
343 mike 1.5 {
|
344 mike 1.7 _mut.lock(pegasus_thread_self());
345 }
|
346 mike 1.5
|
347 chip 1.8 void MessageQueue::unlock()
|
348 mike 1.7 {
349 _mut.unlock();
|
350 mike 1.5 }
351
|
352 mike 1.7 const char* MessageQueue::getQueueName() const
|
353 mike 1.5 {
|
354 mike 1.7 if(_name[0] != 0x00)
355 return _name;
356 return "unknown";
|
357 mike 1.5 }
358
|
359 mike 1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
360 mike 1.5 {
361 MessageQueue* queue = 0;
|
362 mike 1.7 q_table_mut.lock(pegasus_thread_self());
|
363 chip 1.8
|
364 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
365 mike 1.7 {
366 q_table_mut.unlock();
367 return queue;
368 }
|
369 chip 1.8
|
370 mike 1.7 // Not found!
|
371 mike 1.5
|
372 mike 1.7 q_table_mut.unlock();
|
373 chip 1.8
|
374 mike 1.4 return 0;
|
375 mike 1.6 }
|
376 mike 1.7
377
378 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
379 {
380 if(name == NULL)
381 throw NullPointer();
382 q_table_mut.lock(pegasus_thread_self());
|
383 chip 1.8
|
384 mike 1.7 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
385 {
386 // ATTN: Need to decide how many characters to compare in queue names
387 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
388 {
389 q_table_mut.unlock();
390 return( (MessageQueue *)i.value());
391 }
|
392 chip 1.8
|
393 mike 1.7 }
394 q_table_mut.unlock();
|
395 chip 1.8
|
396 mike 1.7 return 0;
397 }
398
|
399 mike 1.6
400 void MessageQueue::handleEnqueue()
401 {
402
|
403 mike 1.4 }
404
405 PEGASUS_NAMESPACE_END
|