1 mike 1.4 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
6 chip 1.8 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
9 mike 1.4 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
|
11 chip 1.8 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
13 mike 1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
15 chip 1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
18 mike 1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 //
23 // Author: Mike Brasher (mbrasher@bmc.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
|
29 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
30 mike 1.7 #include <Pegasus/Common/IPC.h>
|
31 mike 1.4 #include "MessageQueue.h"
32
33 PEGASUS_USING_STD;
34
35 PEGASUS_NAMESPACE_BEGIN
36
|
37 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
38 QueueTable;
39
40 static QueueTable _queueTable(128);
|
41 mike 1.7 static Mutex q_table_mut = Mutex();
|
42 mike 1.5
|
43 mike 1.7 static Uint32 _GetNextQueueId() throw(IPCException)
|
44 mike 1.5 {
|
45 chip 1.8
|
46 mike 1.7 static Uint32 _queueId = 2;
47 static Mutex _id_mut = Mutex();
|
48 chip 1.8
|
49 mike 1.7 _id_mut.lock(pegasus_thread_self());
|
50 chip 1.8
|
51 mike 1.7 // Handle wrap-around!
52 if (_queueId == 0)
53 _queueId = MessageQueue::_CIMOM_Q_ID;
54 Uint32 ret = _queueId++;
55 _id_mut.unlock();
|
56 chip 1.8
|
57 mike 1.7 return ret;
58 }
59
60 Uint32 MessageQueue::_CIMOM_Q_ID = 1;
|
61 mike 1.5
|
62 chip 1.8 MessageQueue::MessageQueue(const char * name)
63 : _mut( ), _count(0), _front(0), _back(0),
64 _workThread(MessageQueue::workThread, this, false),
65 _workSemaphore(0)
|
66 mike 1.5 {
|
67 mike 1.7 if(name != NULL)
68 {
69 strncpy(_name, name, 25);
70 _name[25] = 0x00;
71 }
72 else
73 memset(_name, 0x00,25);
|
74 mike 1.5
|
75 chip 1.8 q_table_mut.lock(pegasus_thread_self());
76
|
77 mike 1.5 while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
|
78 mike 1.7 ;
79 q_table_mut.unlock();
|
80 chip 1.8
81 _workThread.run();
|
82 mike 1.5 }
83
84 MessageQueue::~MessageQueue()
85 {
86 // ATTN-A: thread safety!
|
87 chip 1.8 q_table_mut.lock(pegasus_thread_self());
88
|
89 mike 1.5 _queueTable.remove(_queueId);
|
90 mike 1.7 q_table_mut.unlock();
|
91 chip 1.8
92 _workThread.cancel(); // cancel thread
93 _workSemaphore.signal();// wake thread
94 _workThread.join(); // wait for thread to complete
95 }
96
97 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)
98 {
99 // get thread from argument
100 Thread * thread = (Thread *)arg;
101
102 PEGASUS_ASSERT(thread != 0);
103
104 // get message queue from thread
105 MessageQueue * queue = (MessageQueue *)thread->get_parm();
106
107 PEGASUS_ASSERT(queue != 0);
108
109 while(true)
110 {
|
111 mday 1.9 thread->sleep(1);
112 continue;
113
|
114 chip 1.8 // wait for work
|
115 mday 1.9 // queue->_workSemaphore.wait();
116
|
117 chip 1.8 // stop the thread when the message queue has been destroyed.
118 // ATTN: should check the thread cancel flag that is not yet exposed!
119 if(MessageQueue::lookup(queue->_queueId) == 0)
120 {
121 break;
122 }
123
124 // ensure the queue has a message before dispatching
125 if(queue->_count != 0)
126 {
127 queue->handleEnqueue();
128 }
|
129 mday 1.9 else
130 {
131 thread->sleep(0);
132 }
133
|
134 chip 1.8 }
135
136 thread->exit_self(PEGASUS_THREAD_RETURN(0));
137
138 return(0);
|
139 mike 1.5 }
140
|
141 mike 1.7 void MessageQueue::enqueue(Message* message) throw(IPCException)
|
142 mike 1.4 {
143 if (!message)
|
144 mday 1.9 throw NullPointer();
145
|
146 mike 1.4
|
147 mike 1.7
148 if (getenv("PEGASUS_TRACE"))
149 {
|
150 mday 1.9 cout << "===== " << getQueueName() << ": ";
151 message->print(cout);
|
152 mike 1.7 }
153
|
154 mday 1.9 _mut.lock(pegasus_thread_self());
|
155 mike 1.4 if (_back)
156 {
|
157 mday 1.9 _back->_next = message;
158 message->_prev = _back;
159 message->_next = 0;
160 _back = message;
|
161 mike 1.4 }
162 else
163 {
|
164 mday 1.9 _front = message;
165 _back = message;
166 message->_prev = 0;
167 message->_next = 0;
|
168 mike 1.4 }
169 message->_owner = this;
170 _count++;
|
171 mike 1.7 _mut.unlock();
|
172 mday 1.9
173 // _workSemaphore.signal();
|
174 mike 1.6
|
175 mday 1.9 handleEnqueue();
176
|
177 mike 1.4 }
178
|
179 mike 1.7 Message* MessageQueue::dequeue() throw(IPCException)
|
180 mike 1.4 {
|
181 mike 1.7 _mut.lock(pegasus_thread_self());
|
182 mike 1.4 if (_front)
183 {
184 Message* message = _front;
185 _front = _front->_next;
186 if (_front)
187 _front->_prev = 0;
188
189 if (_back == message)
190 _back = 0;
|
191 mike 1.7 _count--;
192 _mut.unlock();
|
193 mike 1.4 message->_next = 0;
194 message->_prev = 0;
195 message->_owner = 0;
196 return message;
197 }
|
198 mike 1.7 _mut.unlock();
|
199 mike 1.4 return 0;
200 }
201
|
202 mike 1.7 void MessageQueue::remove(Message* message) throw(IPCException)
|
203 mike 1.4 {
204 if (!message)
205 throw NullPointer();
206
207 if (message->_owner != this)
208 throw NoSuchMessageOnQueue();
209
|
210 mike 1.7 _mut.lock(pegasus_thread_self());
|
211 chip 1.8
|
212 mike 1.4 if (message->_next)
213 message->_next->_prev = message->_prev;
214 else
215 _back = message->_prev;
216
217 if (message->_prev)
218 message->_prev->_next = message->_next;
219 else
220 _front = message->_next;
221
|
222 mike 1.7 _count--;
223 _mut.unlock();
|
224 chip 1.8
|
225 mike 1.4 message->_prev = 0;
226 message->_next = 0;
227 message->_owner = 0;
228 }
229
|
230 mike 1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
231 mike 1.4 {
|
232 mike 1.7 _mut.lock(pegasus_thread_self());
|
233 chip 1.8
|
234 mike 1.4 for (Message* m = front(); m; m = m->getNext())
235 {
|
236 mike 1.7 if (m->getType() == type)
237 {
238 _mut.unlock();
239 return m;
240 }
|
241 mike 1.4 }
|
242 mike 1.7 _mut.unlock();
|
243 mike 1.4 return 0;
244 }
245
|
246 mike 1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
247 mike 1.4 {
|
248 mike 1.7 _mut.lock(pegasus_thread_self());
|
249 chip 1.8
|
250 mike 1.4 for (Message* m = front(); m; m = m->getNext())
251 {
|
252 mike 1.7 if (m->getKey() == key)
253 {
254 _mut.unlock();
255 return m;
256 }
|
257 chip 1.8
|
258 mike 1.4 }
|
259 mike 1.7 _mut.unlock();
|
260 mike 1.4 return 0;
261 }
262
|
263 mike 1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
|
264 mike 1.4 {
|
265 mike 1.7 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
|
266 chip 1.8
|
267 mike 1.7 for (const Message* m = front(); m; m = m->getNext())
|
268 mike 1.4 m->print(os);
|
269 mike 1.7 const_cast<MessageQueue *>(this)->_mut.unlock();
|
270 mike 1.4 }
271
|
272 mike 1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
273 mike 1.4 {
|
274 mike 1.7 _mut.lock(pegasus_thread_self());
|
275 chip 1.8
|
276 mike 1.4 for (Message* m = front(); m; m = m->getNext())
277 {
|
278 mike 1.7 if (m->getType() == type && m->getKey() == key)
279 {
280 _mut.unlock();
281 return m;
282 }
|
283 mike 1.4 }
|
284 mike 1.7 _mut.unlock();
|
285 mike 1.4
|
286 mike 1.5 return 0;
287 }
288
|
289 mike 1.7 void MessageQueue::lock() throw(IPCException)
|
290 mike 1.5 {
|
291 mike 1.7 _mut.lock(pegasus_thread_self());
292 }
|
293 mike 1.5
|
294 chip 1.8 void MessageQueue::unlock()
|
295 mike 1.7 {
296 _mut.unlock();
|
297 mike 1.5 }
298
|
299 mike 1.7 const char* MessageQueue::getQueueName() const
|
300 mike 1.5 {
|
301 mike 1.7 if(_name[0] != 0x00)
302 return _name;
303 return "unknown";
|
304 mike 1.5 }
305
|
306 mike 1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
307 mike 1.5 {
308 MessageQueue* queue = 0;
|
309 mike 1.7 q_table_mut.lock(pegasus_thread_self());
|
310 chip 1.8
|
311 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
312 mike 1.7 {
313 q_table_mut.unlock();
314 return queue;
315 }
|
316 chip 1.8
|
317 mike 1.7 // Not found!
|
318 mike 1.5
|
319 mike 1.7 q_table_mut.unlock();
|
320 chip 1.8
|
321 mike 1.4 return 0;
|
322 mike 1.6 }
|
323 mike 1.7
324
325 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
326 {
327 if(name == NULL)
328 throw NullPointer();
329 q_table_mut.lock(pegasus_thread_self());
|
330 chip 1.8
|
331 mike 1.7 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
332 {
333 // ATTN: Need to decide how many characters to compare in queue names
334 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
335 {
336 q_table_mut.unlock();
337 return( (MessageQueue *)i.value());
338 }
|
339 chip 1.8
|
340 mike 1.7 }
341 q_table_mut.unlock();
|
342 chip 1.8
|
343 mike 1.7 return 0;
344 }
345
|
346 mike 1.6
347 void MessageQueue::handleEnqueue()
348 {
349
|
350 mike 1.4 }
351
352 PEGASUS_NAMESPACE_END
|