1 karl 1.36 //%2003////////////////////////////////////////////////////////////////////////
|
2 mike 1.4 //
|
3 karl 1.36 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mike 1.4 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
9 chip 1.8 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
12 mike 1.4 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
|
14 kumpf 1.32 //
|
15 chip 1.8 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
16 mike 1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
18 chip 1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
21 mike 1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Brasher (mbrasher@bmc.com)
27 //
|
28 a.arora 1.38 // Modified By: Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
|
29 mike 1.4 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
|
32 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
33 mike 1.7 #include <Pegasus/Common/IPC.h>
|
34 kumpf 1.19 #include <Pegasus/Common/Tracer.h>
|
35 mike 1.4 #include "MessageQueue.h"
|
36 mday 1.24 #include "MessageQueueService.h"
|
37 mike 1.4 PEGASUS_USING_STD;
38
39 PEGASUS_NAMESPACE_BEGIN
40
|
41 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
42 QueueTable;
43
|
44 mday 1.33 static QueueTable _queueTable(256);
|
45 mday 1.28 static Mutex q_table_mut ;
|
46 mike 1.5
|
47 mday 1.35 void MessageQueue::remove_myself(Uint32 qid)
48 {
|
49 a.arora 1.38 AutoMutex autoMut(q_table_mut);
|
50 mday 1.35 _queueTable.remove(qid);
51 }
52
53
|
54 mike 1.16 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
|
55 mike 1.5 {
|
56 mday 1.22 static Uint32 _nextQueueId = 2;
|
57 mike 1.16
58 //
59 // Lock mutex:
60 //
|
61 chip 1.8
|
62 mday 1.28 static Mutex _id_mut ;
|
63 a.arora 1.38 AutoMutex autoMut(_id_mut);
|
64 chip 1.8
|
65 kumpf 1.34 Uint32 queueId;
|
66 chip 1.8
|
67 kumpf 1.34 // Assign the next queue ID that is not already in use
68 do
69 {
70 // Handle wrap around and never assign zero or one as a queue id:
71 if (_nextQueueId == 0)
72 {
73 _nextQueueId = 2;
74 }
|
75 chip 1.8
|
76 kumpf 1.34 queueId = _nextQueueId++;
77 } while (lookup(queueId) != 0);
|
78 mike 1.7
|
79 mike 1.16 return queueId;
80 }
|
81 mday 1.12
|
82 mday 1.24
83
|
84 mike 1.16 MessageQueue::MessageQueue(
85 const char* name,
86 Boolean async,
87 Uint32 queueId)
|
88 mday 1.24 : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
|
89 mike 1.5 {
|
90 mike 1.16 //
91 // Copy the name:
92 //
93
|
94 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
|
95 kumpf 1.19
|
96 mike 1.16 if (!name)
|
97 mday 1.24 name = "";
|
98 mike 1.16
99 _name = new char[strlen(name) + 1];
100 strcpy(_name, name);
101
|
102 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
103 kumpf 1.19 "MessageQueue::MessageQueue name = %s, queueId = %i", name, queueId);
104
|
105 mike 1.16 //
106 // Insert into queue table:
107 //
|
108 mike 1.5
|
109 a.arora 1.38 AutoMutex autoMut(q_table_mut);
|
110 chip 1.8
|
111 mike 1.16 while (!_queueTable.insert(_queueId, this))
|
112 mike 1.7 ;
|
113 a.arora 1.38
|
114 kumpf 1.25 PEG_METHOD_EXIT();
|
115 mike 1.5 }
116
117 MessageQueue::~MessageQueue()
118 {
119 // ATTN-A: thread safety!
|
120 kumpf 1.19
|
121 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
|
122 kumpf 1.19
|
123 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
124 kumpf 1.19 "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
125
|
126 a.arora 1.38
127 {
128 AutoMutex autoMut(q_table_mut);
129 _queueTable.remove(_queueId);
130 } // mutex unlocks here
|
131 chip 1.8
|
132 mike 1.16 // Free the name:
|
133 mday 1.12
|
134 mike 1.16 delete [] _name;
|
135 kumpf 1.19
|
136 kumpf 1.25 PEG_METHOD_EXIT();
|
137 chip 1.8 }
138
|
139 kumpf 1.37 void MessageQueue::enqueue(Message* message)
|
140 mike 1.4 {
|
141 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
|
142 kumpf 1.19
143 if (!message)
144 {
|
145 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
146 kumpf 1.19 "MessageQueue::enqueue failure");
|
147 kumpf 1.25 PEG_METHOD_EXIT();
|
148 mday 1.9 throw NullPointer();
|
149 kumpf 1.19 }
|
150 mike 1.7
|
151 kumpf 1.31 PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
152 String("Queue name: ") + getQueueName() ) ;
153 Tracer::trace ( TRC_MESSAGEQUEUESERVICE,
154 Tracer::LEVEL3,
155 "Message: [%s, %d]",
156 MessageTypeToString(message->getType()),
157 message->getKey() );
|
158 mday 1.35
|
159 a.arora 1.38 {
160 AutoMutex autoMut(_mut);
|
161 mike 1.4 if (_back)
162 {
|
163 mday 1.9 _back->_next = message;
164 message->_prev = _back;
165 message->_next = 0;
166 _back = message;
|
167 mike 1.4 }
168 else
169 {
|
170 mday 1.9 _front = message;
171 _back = message;
172 message->_prev = 0;
173 message->_next = 0;
|
174 mike 1.4 }
175 message->_owner = this;
|
176 mday 1.30
|
177 mike 1.4 _count++;
|
178 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
|
179 mday 1.30 "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
180
|
181 a.arora 1.38 } // mutex unlocks here
|
182 mday 1.30
|
183 mday 1.22 handleEnqueue();
|
184 kumpf 1.25 PEG_METHOD_EXIT();
|
185 mday 1.15 }
186
|
187 mike 1.7 Message* MessageQueue::dequeue() throw(IPCException)
|
188 mike 1.4 {
|
189 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
190
|
191 a.arora 1.38 AutoMutex autoMut(_mut);
|
192 mike 1.4 if (_front)
193 {
194 Message* message = _front;
195 _front = _front->_next;
196 if (_front)
197 _front->_prev = 0;
198
199 if (_back == message)
200 _back = 0;
|
201 kumpf 1.25
|
202 mike 1.7 _count--;
|
203 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
|
204 kumpf 1.26 "MessageQueue::dequeue _queueId = %d, _count = %d",
205 _queueId, _count);
|
206 kumpf 1.25
|
207 mike 1.4 message->_next = 0;
208 message->_prev = 0;
209 message->_owner = 0;
|
210 kumpf 1.25
211 PEG_METHOD_EXIT();
|
212 mike 1.4 return message;
213 }
|
214 kumpf 1.25
215 PEG_METHOD_EXIT();
|
216 mike 1.4 return 0;
217 }
|
218 kumpf 1.25 ;
|
219 mday 1.23
|
220 mike 1.4
|
221 mike 1.7 void MessageQueue::remove(Message* message) throw(IPCException)
|
222 mike 1.4 {
|
223 kumpf 1.25 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
224
|
225 mike 1.4 if (!message)
|
226 kumpf 1.25 {
227 PEG_METHOD_EXIT();
|
228 mike 1.4 throw NullPointer();
|
229 kumpf 1.25 }
|
230 mike 1.4
231 if (message->_owner != this)
|
232 kumpf 1.25 {
233 PEG_METHOD_EXIT();
|
234 mike 1.4 throw NoSuchMessageOnQueue();
|
235 kumpf 1.25 }
|
236 mike 1.4
|
237 a.arora 1.38 {
238 AutoMutex autoMut(_mut);
|
239 chip 1.8
|
240 mike 1.4 if (message->_next)
241 message->_next->_prev = message->_prev;
242 else
243 _back = message->_prev;
244
245 if (message->_prev)
246 message->_prev->_next = message->_next;
247 else
248 _front = message->_next;
249
|
250 mike 1.7 _count--;
|
251 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
252 "MessageQueue::remove _count = %d", _count);
253
|
254 a.arora 1.38 } // mutex unlocks here
|
255 chip 1.8
|
256 mike 1.4 message->_prev = 0;
257 message->_next = 0;
258 message->_owner = 0;
|
259 kumpf 1.25
260 PEG_METHOD_EXIT();
|
261 mike 1.4 }
262
|
263 mike 1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
264 mike 1.4 {
|
265 a.arora 1.38 AutoMutex autoMut(_mut);
|
266 chip 1.8
|
267 mike 1.4 for (Message* m = front(); m; m = m->getNext())
268 {
|
269 mike 1.7 if (m->getType() == type)
270 {
|
271 a.arora 1.38 return m;
|
272 mike 1.7 }
|
273 mike 1.4 }
274 return 0;
275 }
276
|
277 mike 1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
278 mike 1.4 {
|
279 a.arora 1.38 AutoMutex autoMut(_mut);
|
280 chip 1.8
|
281 mike 1.4 for (Message* m = front(); m; m = m->getNext())
282 {
|
283 mike 1.7 if (m->getKey() == key)
284 {
|
285 a.arora 1.38 return m;
|
286 mike 1.7 }
|
287 chip 1.8
|
288 mike 1.4 }
289 return 0;
290 }
291
|
292 mike 1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
|
293 mike 1.4 {
|
294 a.arora 1.38 AutoMutex autoMut(const_cast<MessageQueue *>(this)->_mut);
|
295 chip 1.8
|
296 mike 1.7 for (const Message* m = front(); m; m = m->getNext())
|
297 mike 1.4 m->print(os);
298 }
299
|
300 mike 1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
301 mike 1.4 {
|
302 a.arora 1.38 AutoMutex autoMut(_mut);
|
303 chip 1.8
|
304 mike 1.4 for (Message* m = front(); m; m = m->getNext())
305 {
|
306 mike 1.7 if (m->getType() == type && m->getKey() == key)
307 {
|
308 a.arora 1.38 return m;
|
309 mike 1.7 }
|
310 mike 1.4 }
311
|
312 mike 1.5 return 0;
313 }
314
|
315 mike 1.7 void MessageQueue::lock() throw(IPCException)
|
316 mike 1.5 {
|
317 mike 1.7 _mut.lock(pegasus_thread_self());
318 }
|
319 mike 1.5
|
320 chip 1.8 void MessageQueue::unlock()
|
321 mike 1.7 {
322 _mut.unlock();
|
323 mike 1.5 }
324
|
325 mike 1.7 const char* MessageQueue::getQueueName() const
|
326 mike 1.5 {
|
327 kumpf 1.29 return _name;
|
328 mike 1.5 }
329
|
330 mike 1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
331 mike 1.5 {
|
332 kumpf 1.19
|
333 mike 1.5 MessageQueue* queue = 0;
|
334 a.arora 1.38 AutoMutex autoMut(q_table_mut);
|
335 chip 1.8
|
336 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
337 mike 1.7 {
338 return queue;
339 }
|
340 chip 1.8
|
341 mike 1.7 // Not found!
|
342 mike 1.5
|
343 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
344 kumpf 1.19 "MessageQueue::lookup failure queueId = %i", queueId);
345
|
346 mike 1.4 return 0;
|
347 mike 1.6 }
|
348 mike 1.7
349
350 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
351 {
|
352 kumpf 1.19
|
353 mike 1.7 if(name == NULL)
354 throw NullPointer();
|
355 a.arora 1.38 AutoMutex autoMut(q_table_mut);
|
356 chip 1.8
|
357 mike 1.7 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
358 {
359 // ATTN: Need to decide how many characters to compare in queue names
|
360 mday 1.27 if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
|
361 mike 1.7 {
|
362 a.arora 1.38 return( (MessageQueue *)i.value());
|
363 mike 1.7 }
|
364 chip 1.8
|
365 mike 1.7 }
|
366 kumpf 1.19
|
367 kumpf 1.25 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
368 kumpf 1.19 "MessageQueue::lookup failure - name = %s", name);
|
369 chip 1.8
|
370 mike 1.7 return 0;
371 }
372
|
373 mike 1.6
374 void MessageQueue::handleEnqueue()
375 {
376
|
377 mike 1.4 }
378
379 PEGASUS_NAMESPACE_END
|