1 mike 1.4 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
6 chip 1.8 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
9 mike 1.4 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
|
11 chip 1.8 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
13 mike 1.4 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
15 chip 1.8 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
18 mike 1.4 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 //
23 // Author: Mike Brasher (mbrasher@bmc.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
|
29 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
30 mike 1.7 #include <Pegasus/Common/IPC.h>
|
31 kumpf 1.19 #include <Pegasus/Common/Tracer.h>
|
32 mike 1.4 #include "MessageQueue.h"
33
34 PEGASUS_USING_STD;
35
36 PEGASUS_NAMESPACE_BEGIN
37
|
38 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
39 QueueTable;
40
41 static QueueTable _queueTable(128);
|
42 mike 1.7 static Mutex q_table_mut = Mutex();
|
43 mike 1.5
|
44 mike 1.16 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
|
45 mike 1.5 {
|
46 mday 1.22 static Uint32 _nextQueueId = 2;
|
47 mike 1.16
48 //
49 // Lock mutex:
50 //
|
51 chip 1.8
|
52 mike 1.7 static Mutex _id_mut = Mutex();
|
53 mike 1.16 _id_mut.lock(pegasus_thread_self());
|
54 chip 1.8
|
55 mike 1.16 // Assign next queue id. Handle wrap around and never assign zero as
56 // a queue id:
|
57 chip 1.8
|
58 mike 1.16 if (_nextQueueId == 0)
|
59 mday 1.22 _nextQueueId = 2;
|
60 chip 1.8
|
61 mike 1.16 Uint32 queueId = _nextQueueId++;
|
62 mike 1.7
|
63 mike 1.16 //
64 // Unlock mutex:
65 //
|
66 mike 1.5
|
67 mike 1.16 _id_mut.unlock();
|
68 mday 1.12
|
69 mike 1.16 return queueId;
70 }
|
71 mday 1.12
|
72 mike 1.16 MessageQueue::MessageQueue(
73 const char* name,
74 Boolean async,
75 Uint32 queueId)
|
76 mday 1.22 : _queueId(queueId), _count(0), _front(0), _back(0), _async(async)
|
77 mike 1.5 {
|
78 mike 1.16 //
79 // Copy the name:
80 //
81
|
82 kumpf 1.19 PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
83
|
84 mike 1.16 if (!name)
85 name = "";
86
87 _name = new char[strlen(name) + 1];
88 strcpy(_name, name);
89
|
90 kumpf 1.19 Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
91 "MessageQueue::MessageQueue name = %s, queueId = %i", name, queueId);
92
|
93 mike 1.16 //
94 // Insert into queue table:
95 //
|
96 mike 1.5
|
97 chip 1.8 q_table_mut.lock(pegasus_thread_self());
98
|
99 mike 1.16 while (!_queueTable.insert(_queueId, this))
|
100 mike 1.7 ;
|
101 mike 1.16
|
102 mike 1.7 q_table_mut.unlock();
|
103 chip 1.8
|
104 mday 1.12
|
105 kumpf 1.19 PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
|
106 mike 1.5 }
107
108 MessageQueue::~MessageQueue()
109 {
110 // ATTN-A: thread safety!
|
111 kumpf 1.19
112 PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
113
114 Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
115 "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
116
|
117 chip 1.8 q_table_mut.lock(pegasus_thread_self());
118
|
119 mike 1.5 _queueTable.remove(_queueId);
|
120 mike 1.7 q_table_mut.unlock();
|
121 chip 1.8
|
122 mike 1.16 // Free the name:
|
123 mday 1.12
|
124 mike 1.16 delete [] _name;
|
125 kumpf 1.19
126 PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
|
127 chip 1.8 }
128
|
129 mike 1.7 void MessageQueue::enqueue(Message* message) throw(IPCException)
|
130 mike 1.4 {
|
131 kumpf 1.19
132 if (!message)
133 {
134 Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
135 "MessageQueue::enqueue failure");
|
136 mday 1.9 throw NullPointer();
|
137 kumpf 1.19 }
|
138 mike 1.7
139 if (getenv("PEGASUS_TRACE"))
140 {
|
141 mday 1.9 cout << "===== " << getQueueName() << ": ";
142 message->print(cout);
|
143 mike 1.7 }
144
|
145 mday 1.9 _mut.lock(pegasus_thread_self());
|
146 mike 1.4 if (_back)
147 {
|
148 mday 1.9 _back->_next = message;
149 message->_prev = _back;
150 message->_next = 0;
151 _back = message;
|
152 mike 1.4 }
153 else
154 {
|
155 mday 1.9 _front = message;
156 _back = message;
157 message->_prev = 0;
158 message->_next = 0;
|
159 mike 1.4 }
160 message->_owner = this;
161 _count++;
|
162 mday 1.20
|
163 mike 1.7 _mut.unlock();
|
164 chip 1.11
|
165 mday 1.22 handleEnqueue();
|
166 kumpf 1.19
|
167 mday 1.15 }
168
169
|
170 mike 1.7 Message* MessageQueue::dequeue() throw(IPCException)
|
171 mike 1.4 {
|
172 mike 1.7 _mut.lock(pegasus_thread_self());
|
173 mike 1.4 if (_front)
174 {
175 Message* message = _front;
176 _front = _front->_next;
177 if (_front)
178 _front->_prev = 0;
179
180 if (_back == message)
181 _back = 0;
|
182 mike 1.7 _count--;
183 _mut.unlock();
|
184 mike 1.4 message->_next = 0;
185 message->_prev = 0;
186 message->_owner = 0;
187 return message;
188 }
|
189 mike 1.7 _mut.unlock();
|
190 mike 1.4 return 0;
191 }
192
|
193 mike 1.7 void MessageQueue::remove(Message* message) throw(IPCException)
|
194 mike 1.4 {
195 if (!message)
196 throw NullPointer();
197
198 if (message->_owner != this)
199 throw NoSuchMessageOnQueue();
200
|
201 mike 1.7 _mut.lock(pegasus_thread_self());
|
202 chip 1.8
|
203 mike 1.4 if (message->_next)
204 message->_next->_prev = message->_prev;
205 else
206 _back = message->_prev;
207
208 if (message->_prev)
209 message->_prev->_next = message->_next;
210 else
211 _front = message->_next;
212
|
213 mike 1.7 _count--;
214 _mut.unlock();
|
215 chip 1.8
|
216 mike 1.4 message->_prev = 0;
217 message->_next = 0;
218 message->_owner = 0;
219 }
220
|
221 mike 1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
222 mike 1.4 {
|
223 mike 1.7 _mut.lock(pegasus_thread_self());
|
224 chip 1.8
|
225 mike 1.4 for (Message* m = front(); m; m = m->getNext())
226 {
|
227 mike 1.7 if (m->getType() == type)
228 {
229 _mut.unlock();
230 return m;
231 }
|
232 mike 1.4 }
|
233 mike 1.7 _mut.unlock();
|
234 mike 1.4 return 0;
235 }
236
|
237 mike 1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
238 mike 1.4 {
|
239 mike 1.7 _mut.lock(pegasus_thread_self());
|
240 chip 1.8
|
241 mike 1.4 for (Message* m = front(); m; m = m->getNext())
242 {
|
243 mike 1.7 if (m->getKey() == key)
244 {
245 _mut.unlock();
246 return m;
247 }
|
248 chip 1.8
|
249 mike 1.4 }
|
250 mike 1.7 _mut.unlock();
|
251 mike 1.4 return 0;
252 }
253
|
254 mike 1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
|
255 mike 1.4 {
|
256 mike 1.7 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
|
257 chip 1.8
|
258 mike 1.7 for (const Message* m = front(); m; m = m->getNext())
|
259 mike 1.4 m->print(os);
|
260 mike 1.7 const_cast<MessageQueue *>(this)->_mut.unlock();
|
261 mike 1.4 }
262
|
263 mike 1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
264 mike 1.4 {
|
265 mike 1.7 _mut.lock(pegasus_thread_self());
|
266 chip 1.8
|
267 mike 1.4 for (Message* m = front(); m; m = m->getNext())
268 {
|
269 mike 1.7 if (m->getType() == type && m->getKey() == key)
270 {
271 _mut.unlock();
272 return m;
273 }
|
274 mike 1.4 }
|
275 mike 1.7 _mut.unlock();
|
276 mike 1.4
|
277 mike 1.5 return 0;
278 }
279
|
280 mike 1.7 void MessageQueue::lock() throw(IPCException)
|
281 mike 1.5 {
|
282 mike 1.7 _mut.lock(pegasus_thread_self());
283 }
|
284 mike 1.5
|
285 chip 1.8 void MessageQueue::unlock()
|
286 mike 1.7 {
287 _mut.unlock();
|
288 mike 1.5 }
289
|
290 mike 1.7 const char* MessageQueue::getQueueName() const
|
291 mike 1.5 {
|
292 mike 1.7 if(_name[0] != 0x00)
293 return _name;
294 return "unknown";
|
295 mike 1.5 }
296
|
297 mike 1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
298 mike 1.5 {
|
299 kumpf 1.19
|
300 mike 1.5 MessageQueue* queue = 0;
|
301 mike 1.7 q_table_mut.lock(pegasus_thread_self());
|
302 chip 1.8
|
303 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
304 mike 1.7 {
305 q_table_mut.unlock();
306 return queue;
307 }
|
308 chip 1.8
|
309 mike 1.7 // Not found!
|
310 mike 1.5
|
311 mike 1.7 q_table_mut.unlock();
|
312 chip 1.8
|
313 kumpf 1.19 Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
314 "MessageQueue::lookup failure queueId = %i", queueId);
315
|
316 mike 1.4 return 0;
|
317 mike 1.6 }
|
318 mike 1.7
319
320 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
321 {
|
322 kumpf 1.19
|
323 mike 1.7 if(name == NULL)
324 throw NullPointer();
325 q_table_mut.lock(pegasus_thread_self());
|
326 chip 1.8
|
327 mike 1.7 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
328 {
329 // ATTN: Need to decide how many characters to compare in queue names
330 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
331 {
332 q_table_mut.unlock();
333 return( (MessageQueue *)i.value());
334 }
|
335 chip 1.8
|
336 mike 1.7 }
337 q_table_mut.unlock();
|
338 kumpf 1.19
339 Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
340 "MessageQueue::lookup failure - name = %s", name);
|
341 chip 1.8
|
342 mike 1.7 return 0;
343 }
344
|
345 mike 1.6
346 void MessageQueue::handleEnqueue()
347 {
348
|
349 mike 1.4 }
350
351 PEGASUS_NAMESPACE_END
|