1 mike 1.4 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mike 1.4 //
23 // Author: Mike Brasher (mbrasher@bmc.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
|
29 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
30 mday 1.6.2.5 #include <Pegasus/Common/IPC.h>
|
31 mike 1.4 #include "MessageQueue.h"
32
33 PEGASUS_USING_STD;
34
35 PEGASUS_NAMESPACE_BEGIN
36
|
37 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
38 QueueTable;
39
40 static QueueTable _queueTable(128);
|
41 mday 1.6.2.7 static Mutex q_table_mut = Mutex();
|
42 mike 1.5
|
43 mday 1.6.2.6 static Uint32 _GetNextQueueId() throw(IPCException)
|
44 mike 1.5 {
|
45 mday 1.6.2.5
46 static Uint32 _queueId = 2;
47
|
48 mday 1.6.2.6 q_table_mut.lock(pegasus_thread_self());
|
49 mday 1.6.2.5
50 // Handle wrap-around!
51 if (_queueId == 0)
52 _queueId = MessageQueue::_CIMOM_Q_ID;
53 Uint32 ret = _queueId++;
|
54 mday 1.6.2.6 q_table_mut.unlock();
|
55 mday 1.6.2.5
56 return ret;
|
57 mike 1.5 }
|
58 mday 1.6.2.5
59 Uint32 MessageQueue::_CIMOM_Q_ID = 1;
|
60 mike 1.5
|
61 mday 1.6.2.6 MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)
|
62 mike 1.5 {
63 // ATTN-A: thread safety!
64
|
65 mday 1.6.2.6 memset(_name, 0x00, 26);
66 q_table_mut.lock(pegasus_thread_self());
67
|
68 mike 1.5 while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
69 ;
|
70 mday 1.6.2.6 q_table_mut.unlock();
|
71 mike 1.5 }
72
|
73 mday 1.6.2.6 MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)
|
74 mday 1.6.2.3 {
75 if(name != NULL)
76 {
|
77 mday 1.6.2.6 strncpy(_name, name, 25);
78 _name[25] = 0x00;
|
79 mday 1.6.2.3 }
80
81 else
|
82 mday 1.6.2.6 memset(_name, 0x00,25);
83
84 q_table_mut.lock(pegasus_thread_self());
85
|
86 mday 1.6.2.3 while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
87 ;
|
88 mday 1.6.2.6 q_table_mut.unlock();
89
|
90 mday 1.6.2.3 }
91
92
|
93 mike 1.5 MessageQueue::~MessageQueue()
94 {
95 // ATTN-A: thread safety!
|
96 mday 1.6.2.6 q_table_mut.lock(pegasus_thread_self());
97
|
98 mike 1.5 _queueTable.remove(_queueId);
|
99 mday 1.6.2.6 q_table_mut.unlock();
100
|
101 mike 1.5 }
102
|
103 mday 1.6.2.6 void MessageQueue::enqueue(Message* message) throw(IPCException)
|
104 mike 1.4 {
105 if (!message)
106 throw NullPointer();
107
|
108 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
109
|
110 mike 1.6.2.1 if (getenv("PEGASUS_TRACE"))
111 {
|
112 mike 1.6.2.2 cout << "===== " << getQueueName() << ": ";
|
113 mike 1.6.2.1 message->print(cout);
114 }
115
|
116 mike 1.4 if (_back)
117 {
118 _back->_next = message;
119 message->_prev = _back;
120 message->_next = 0;
121 _back = message;
122 }
123 else
124 {
125 _front = message;
126 _back = message;
127 message->_prev = 0;
128 message->_next = 0;
129 }
130 message->_owner = this;
131 _count++;
|
132 mday 1.6.2.6 _mut.unlock();
|
133 mike 1.6
134 handleEnqueue();
|
135 mike 1.4 }
136
|
137 mday 1.6.2.6 Message* MessageQueue::dequeue() throw(IPCException)
|
138 mike 1.4 {
|
139 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
|
140 mike 1.4 if (_front)
141 {
142 Message* message = _front;
143 _front = _front->_next;
144 if (_front)
145 _front->_prev = 0;
146
147 if (_back == message)
148 _back = 0;
|
149 mday 1.6.2.6 _count--;
150 _mut.unlock();
|
151 mike 1.4 message->_next = 0;
152 message->_prev = 0;
153 message->_owner = 0;
154 return message;
155 }
|
156 mday 1.6.2.6 _mut.unlock();
|
157 mike 1.4 return 0;
158 }
159
|
160 mday 1.6.2.6 void MessageQueue::remove(Message* message) throw(IPCException)
|
161 mike 1.4 {
162 if (!message)
163 throw NullPointer();
164
165 if (message->_owner != this)
166 throw NoSuchMessageOnQueue();
167
|
168 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
169
|
170 mike 1.4 if (message->_next)
171 message->_next->_prev = message->_prev;
172 else
173 _back = message->_prev;
174
175 if (message->_prev)
176 message->_prev->_next = message->_next;
177 else
178 _front = message->_next;
179
|
180 mday 1.6.2.6 _count--;
181 _mut.unlock();
182
|
183 mike 1.4 message->_prev = 0;
184 message->_next = 0;
185 message->_owner = 0;
186 }
187
|
188 mday 1.6.2.6 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
189 mike 1.4 {
|
190 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
191
|
192 mike 1.4 for (Message* m = front(); m; m = m->getNext())
193 {
|
194 mday 1.6.2.6 if (m->getType() == type)
195 {
196 _mut.unlock();
197 return m;
198 }
|
199 mike 1.4 }
|
200 mday 1.6.2.6 _mut.unlock();
|
201 mike 1.4 return 0;
202 }
203
|
204 mday 1.6.2.6 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
205 mike 1.4 {
|
206 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
207
|
208 mike 1.4 for (Message* m = front(); m; m = m->getNext())
209 {
|
210 mday 1.6.2.6 if (m->getKey() == key)
211 {
212 _mut.unlock();
213 return m;
214 }
215
|
216 mike 1.4 }
|
217 mday 1.6.2.6 _mut.unlock();
|
218 mike 1.4 return 0;
219 }
220
|
221 mday 1.6.2.6 void MessageQueue::print(ostream& os) const throw(IPCException)
|
222 mike 1.4 {
|
223 mday 1.6.2.6 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
224
225 for (const Message* m = front(); m; m = m->getNext())
|
226 mike 1.4 m->print(os);
|
227 mday 1.6.2.6 const_cast<MessageQueue *>(this)->_mut.unlock();
|
228 mike 1.4 }
229
|
230 mday 1.6.2.6 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
231 mike 1.4 {
|
232 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
233
|
234 mike 1.4 for (Message* m = front(); m; m = m->getNext())
235 {
|
236 mday 1.6.2.6 if (m->getType() == type && m->getKey() == key)
237 {
238 _mut.unlock();
239 return m;
240 }
|
241 mike 1.4 }
|
242 mday 1.6.2.6 _mut.unlock();
|
243 mike 1.4
|
244 mike 1.5 return 0;
245 }
246
|
247 mday 1.6.2.6 void MessageQueue::lock() throw(IPCException)
|
248 mike 1.5 {
|
249 mday 1.6.2.6 _mut.lock(pegasus_thread_self());
|
250 mike 1.5 }
251
|
252 mday 1.6.2.6 void MessageQueue::unlock()
|
253 mike 1.5 {
|
254 mday 1.6.2.6 _mut.unlock();
|
255 mike 1.6.2.1 }
256
257 const char* MessageQueue::getQueueName() const
258 {
|
259 mday 1.6.2.3 if(_name[0] != 0x00)
260 return _name;
261 return "unknown";
|
262 mike 1.5 }
263
|
264 mday 1.6.2.6 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
265 mike 1.5 {
266 MessageQueue* queue = 0;
|
267 mday 1.6.2.6 q_table_mut.lock(pegasus_thread_self());
268
|
269 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
270 mday 1.6.2.6 {
271 q_table_mut.unlock();
272 return queue;
273 }
274
|
275 mike 1.5 // Not found!
|
276 mday 1.6.2.6
277 q_table_mut.unlock();
278
|
279 mike 1.4 return 0;
|
280 mike 1.6 }
|
281 mday 1.6.2.3
282
|
283 mday 1.6.2.6 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
|
284 mday 1.6.2.3 {
285 if(name == NULL)
286 throw NullPointer();
|
287 mday 1.6.2.6 q_table_mut.lock(pegasus_thread_self());
288
|
289 mday 1.6.2.3 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
290 {
|
291 kumpf 1.6.2.4 // ATTN: Need to decide how many characters to compare in queue names
|
292 mday 1.6.2.6 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
293 {
294 q_table_mut.unlock();
295 return( (MessageQueue *)i.value());
296 }
297
|
298 mday 1.6.2.3 }
|
299 mday 1.6.2.6 q_table_mut.unlock();
300
|
301 mday 1.6.2.3 return 0;
302 }
303
|
304 mike 1.6
305 void MessageQueue::handleEnqueue()
306 {
307
|
308 mike 1.4 }
309
310 PEGASUS_NAMESPACE_END
|