1 karl 1.36 //%2003////////////////////////////////////////////////////////////////////////
|
2 mike 1.4 //
|
3 karl 1.36 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mike 1.4 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
9 chip 1.8 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
12 mike 1.4 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
|
14 kumpf 1.32 //
|
15 chip 1.8 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
16 mike 1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
18 chip 1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
21 mike 1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Brasher (mbrasher@bmc.com)
27 //
28 // Modified By:
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
|
32 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
33 mike 1.7 #include <Pegasus/Common/IPC.h>
|
34 kumpf 1.19 #include <Pegasus/Common/Tracer.h>
|
35 mike 1.4 #include "MessageQueue.h"
|
36 mday 1.24 #include "MessageQueueService.h"
|
37 mike 1.4 PEGASUS_USING_STD;
38
39 PEGASUS_NAMESPACE_BEGIN
40
|
41 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
42 QueueTable;
43
|
44 mday 1.33 static QueueTable _queueTable(256);
|
45 mday 1.28 static Mutex q_table_mut ;
|
46 mike 1.5
|
47 mday 1.35 void MessageQueue::remove_myself(Uint32 qid)
48 {
49 q_table_mut.lock(pegasus_thread_self());
50
51 _queueTable.remove(qid);
52 q_table_mut.unlock();
53 }
54
55
|
56 mike 1.16 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
|
57 mike 1.5 {
|
58 mday 1.22 static Uint32 _nextQueueId = 2;
|
59 mike 1.16
60 //
61 // Lock mutex:
62 //
|
63 chip 1.8
|
64 mday 1.28 static Mutex _id_mut ;
|
65 mike 1.16 _id_mut.lock(pegasus_thread_self());
|
66 chip 1.8
|
67 kumpf 1.34 Uint32 queueId;
|
68 chip 1.8
|
69 kumpf 1.34 // Assign the next queue ID that is not already in use
70 do
71 {
72 // Handle wrap around and never assign zero or one as a queue id:
73 if (_nextQueueId == 0)
74 {
75 _nextQueueId = 2;
76 }
|
77 chip 1.8
|
78 kumpf 1.34 queueId = _nextQueueId++;
79 } while (lookup(queueId) != 0);
|
80 mike 1.7
|
81 mike 1.16 //
82 // Unlock mutex:
83 //
|
84 mike 1.5
|
85 mike 1.16 _id_mut.unlock();
|
86 mday 1.12
|
87 mike 1.16 return queueId;
88 }
|
89 mday 1.12
|
90 mday 1.24
91
|
92 mike 1.16 MessageQueue::MessageQueue(
93 const char* name,
94 Boolean async,
95 Uint32 queueId)
|
96 mday 1.24 : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
|
97 mike 1.5 {
|
98 mike 1.16 //
99 // Copy the name:
100 //
101
|
102 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
|
103 kumpf 1.19
|
104 mike 1.16 if (!name)
|
105 mday 1.24 name = "";
|
106 mike 1.16
107 _name = new char[strlen(name) + 1];
108 strcpy(_name, name);
109
|
110 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
111 kumpf 1.19 "MessageQueue::MessageQueue name = %s, queueId = %i", name, queueId);
112
|
113 mike 1.16 //
114 // Insert into queue table:
115 //
|
116 mike 1.5
|
117 chip 1.8 q_table_mut.lock(pegasus_thread_self());
118
|
119 mike 1.16 while (!_queueTable.insert(_queueId, this))
|
120 mike 1.7 ;
|
121 mike 1.16
|
122 mike 1.7 q_table_mut.unlock();
|
123 chip 1.8
|
124 mday 1.12
|
125 kumpf 1.25 PEG_METHOD_EXIT();
|
126 mike 1.5 }
127
128 MessageQueue::~MessageQueue()
129 {
130 // ATTN-A: thread safety!
|
131 kumpf 1.19
|
132 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
|
133 kumpf 1.19
|
134 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
135 kumpf 1.19 "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
136
|
137 chip 1.8 q_table_mut.lock(pegasus_thread_self());
138
|
139 mike 1.5 _queueTable.remove(_queueId);
|
140 mike 1.7 q_table_mut.unlock();
|
141 chip 1.8
|
142 mike 1.16 // Free the name:
|
143 mday 1.12
|
144 mike 1.16 delete [] _name;
|
145 kumpf 1.19
|
146 kumpf 1.25 PEG_METHOD_EXIT();
|
147 chip 1.8 }
148
|
149 kumpf 1.37 void MessageQueue::enqueue(Message* message)
|
150 mike 1.4 {
|
151 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
|
152 kumpf 1.19
153 if (!message)
154 {
|
155 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
156 kumpf 1.19 "MessageQueue::enqueue failure");
|
157 kumpf 1.25 PEG_METHOD_EXIT();
|
158 mday 1.9 throw NullPointer();
|
159 kumpf 1.19 }
|
160 mike 1.7
|
161 kumpf 1.31 PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
162 String("Queue name: ") + getQueueName() ) ;
163 Tracer::trace ( TRC_MESSAGEQUEUESERVICE,
164 Tracer::LEVEL3,
165 "Message: [%s, %d]",
166 MessageTypeToString(message->getType()),
167 message->getKey() );
|
168 mday 1.35
|
169 mday 1.9 _mut.lock(pegasus_thread_self());
|
170 mike 1.4 if (_back)
171 {
|
172 mday 1.9 _back->_next = message;
173 message->_prev = _back;
174 message->_next = 0;
175 _back = message;
|
176 mike 1.4 }
177 else
178 {
|
179 mday 1.9 _front = message;
180 _back = message;
181 message->_prev = 0;
182 message->_next = 0;
|
183 mike 1.4 }
184 message->_owner = this;
|
185 mday 1.30
|
186 mike 1.4 _count++;
|
187 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
|
188 mday 1.30 "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
189
|
190 mike 1.7 _mut.unlock();
|
191 mday 1.30
|
192 mday 1.22 handleEnqueue();
|
193 kumpf 1.25 PEG_METHOD_EXIT();
|
194 mday 1.15 }
195
|
196 mike 1.7 Message* MessageQueue::dequeue() throw(IPCException)
|
197 mike 1.4 {
|
198 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
199
|
200 mike 1.7 _mut.lock(pegasus_thread_self());
|
201 mike 1.4 if (_front)
202 {
203 Message* message = _front;
204 _front = _front->_next;
205 if (_front)
206 _front->_prev = 0;
207
208 if (_back == message)
209 _back = 0;
|
210 kumpf 1.25
|
211 mike 1.7 _count--;
|
212 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
|
213 kumpf 1.26 "MessageQueue::dequeue _queueId = %d, _count = %d",
214 _queueId, _count);
|
215 kumpf 1.25
|
216 mike 1.7 _mut.unlock();
|
217 mike 1.4 message->_next = 0;
218 message->_prev = 0;
219 message->_owner = 0;
|
220 kumpf 1.25
221 PEG_METHOD_EXIT();
|
222 mike 1.4 return message;
223 }
|
224 mike 1.7 _mut.unlock();
|
225 kumpf 1.25
226 PEG_METHOD_EXIT();
|
227 mike 1.4 return 0;
228 }
|
229 kumpf 1.25 ;
|
230 mday 1.23
|
231 mike 1.4
|
232 mike 1.7 void MessageQueue::remove(Message* message) throw(IPCException)
|
233 mike 1.4 {
|
234 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
235
|
236 mike 1.4 if (!message)
|
237 kumpf 1.25 {
238 PEG_METHOD_EXIT();
|
239 mike 1.4 throw NullPointer();
|
240 kumpf 1.25 }
|
241 mike 1.4
242 if (message->_owner != this)
|
243 kumpf 1.25 {
244 PEG_METHOD_EXIT();
|
245 mike 1.4 throw NoSuchMessageOnQueue();
|
246 kumpf 1.25 }
|
247 mike 1.4
|
248 mike 1.7 _mut.lock(pegasus_thread_self());
|
249 chip 1.8
|
250 mike 1.4 if (message->_next)
251 message->_next->_prev = message->_prev;
252 else
253 _back = message->_prev;
254
255 if (message->_prev)
256 message->_prev->_next = message->_next;
257 else
258 _front = message->_next;
259
|
260 mike 1.7 _count--;
|
261 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
262 "MessageQueue::remove _count = %d", _count);
263
|
264 mike 1.7 _mut.unlock();
|
265 chip 1.8
|
266 mike 1.4 message->_prev = 0;
267 message->_next = 0;
268 message->_owner = 0;
|
269 kumpf 1.25
270 PEG_METHOD_EXIT();
|
271 mike 1.4 }
272
|
273 mike 1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
274 mike 1.4 {
|
275 mike 1.7 _mut.lock(pegasus_thread_self());
|
276 chip 1.8
|
277 mike 1.4 for (Message* m = front(); m; m = m->getNext())
278 {
|
279 mike 1.7 if (m->getType() == type)
280 {
281 _mut.unlock();
282 return m;
283 }
|
284 mike 1.4 }
|
285 mike 1.7 _mut.unlock();
|
286 mike 1.4 return 0;
287 }
288
|
289 mike 1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
290 mike 1.4 {
|
291 mike 1.7 _mut.lock(pegasus_thread_self());
|
292 chip 1.8
|
293 mike 1.4 for (Message* m = front(); m; m = m->getNext())
294 {
|
295 mike 1.7 if (m->getKey() == key)
296 {
297 _mut.unlock();
298 return m;
299 }
|
300 chip 1.8
|
301 mike 1.4 }
|
302 mike 1.7 _mut.unlock();
|
303 mike 1.4 return 0;
304 }
305
|
306 mike 1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
|
307 mike 1.4 {
|
308 mike 1.7 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
|
309 chip 1.8
|
310 mike 1.7 for (const Message* m = front(); m; m = m->getNext())
|
311 mike 1.4 m->print(os);
|
312 mike 1.7 const_cast<MessageQueue *>(this)->_mut.unlock();
|
313 mike 1.4 }
314
|
315 mike 1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
316 mike 1.4 {
|
317 mike 1.7 _mut.lock(pegasus_thread_self());
|
318 chip 1.8
|
319 mike 1.4 for (Message* m = front(); m; m = m->getNext())
320 {
|
321 mike 1.7 if (m->getType() == type && m->getKey() == key)
322 {
323 _mut.unlock();
324 return m;
325 }
|
326 mike 1.4 }
|
327 mike 1.7 _mut.unlock();
|
328 mike 1.4
|
329 mike 1.5 return 0;
330 }
331
|
332 mike 1.7 void MessageQueue::lock() throw(IPCException)
|
333 mike 1.5 {
|
334 mike 1.7 _mut.lock(pegasus_thread_self());
335 }
|
336 mike 1.5
|
337 chip 1.8 void MessageQueue::unlock()
|
338 mike 1.7 {
339 _mut.unlock();
|
340 mike 1.5 }
341
|
342 mike 1.7 const char* MessageQueue::getQueueName() const
|
343 mike 1.5 {
|
344 kumpf 1.29 return _name;
|
345 mike 1.5 }
346
|
347 mike 1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
348 mike 1.5 {
|
349 kumpf 1.19
|
350 mike 1.5 MessageQueue* queue = 0;
|
351 mike 1.7 q_table_mut.lock(pegasus_thread_self());
|
352 chip 1.8
|
353 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
354 mike 1.7 {
355 q_table_mut.unlock();
356 return queue;
357 }
|
358 chip 1.8
|
359 mike 1.7 // Not found!
|
360 mike 1.5
|
361 mike 1.7 q_table_mut.unlock();
|
362 chip 1.8
|
363 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
364 kumpf 1.19 "MessageQueue::lookup failure queueId = %i", queueId);
365
|
366 mike 1.4 return 0;
|
367 mike 1.6 }
|
368 mike 1.7
369
370 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
371 {
|
372 kumpf 1.19
|
373 mike 1.7 if(name == NULL)
374 throw NullPointer();
375 q_table_mut.lock(pegasus_thread_self());
|
376 chip 1.8
|
377 mike 1.7 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
378 {
379 // ATTN: Need to decide how many characters to compare in queue names
|
380 mday 1.27 if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
|
381 mike 1.7 {
382 q_table_mut.unlock();
383 return( (MessageQueue *)i.value());
384 }
|
385 chip 1.8
|
386 mike 1.7 }
387 q_table_mut.unlock();
|
388 kumpf 1.19
|
389 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
390 kumpf 1.19 "MessageQueue::lookup failure - name = %s", name);
|
391 chip 1.8
|
392 mike 1.7 return 0;
393 }
394
|
395 mike 1.6
396 void MessageQueue::handleEnqueue()
397 {
398
|
399 mike 1.4 }
400
401 PEGASUS_NAMESPACE_END
|