1 mike 1.4 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mike 1.4 //
23 // Author: Mike Brasher (mbrasher@bmc.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
|
29 mike 1.5 #include <Pegasus/Common/HashTable.h>
|
30 mike 1.7 #include <Pegasus/Common/IPC.h>
|
31 mike 1.4 #include "MessageQueue.h"
32
33 PEGASUS_USING_STD;
34
35 PEGASUS_NAMESPACE_BEGIN
36
|
37 mike 1.5 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
38 QueueTable;
39
40 static QueueTable _queueTable(128);
|
41 mike 1.7 static Mutex q_table_mut = Mutex();
|
42 mike 1.5
|
43 mike 1.7 static Uint32 _GetNextQueueId() throw(IPCException)
|
44 mike 1.5 {
|
45 mike 1.7
46 static Uint32 _queueId = 2;
47 static Mutex _id_mut = Mutex();
48
49 _id_mut.lock(pegasus_thread_self());
50
51 // Handle wrap-around!
52 if (_queueId == 0)
53 _queueId = MessageQueue::_CIMOM_Q_ID;
54 Uint32 ret = _queueId++;
55 _id_mut.unlock();
56
57 return ret;
58 }
59
60 Uint32 MessageQueue::_CIMOM_Q_ID = 1;
|
61 mike 1.5
|
62 mike 1.7 MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)
63 {
64 // ATTN-A: thread safety!
65 q_table_mut.lock(pegasus_thread_self());
|
66 mike 1.5
|
67 mike 1.7 memset(_name, 0x00, 26);
68
69 while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
70 ;
71 q_table_mut.unlock();
|
72 mike 1.5 }
73
|
74 mike 1.7 MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)
|
75 mike 1.5 {
|
76 mike 1.7 if(name != NULL)
77 {
78 strncpy(_name, name, 25);
79 _name[25] = 0x00;
80 }
81
82 else
83 memset(_name, 0x00,25);
|
84 mike 1.5
|
85 mike 1.7 q_table_mut.lock(pegasus_thread_self());
86
|
87 mike 1.5 while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
|
88 mike 1.7 ;
89 q_table_mut.unlock();
90
|
91 mike 1.5 }
92
|
93 mike 1.7
|
94 mike 1.5 MessageQueue::~MessageQueue()
95 {
96 // ATTN-A: thread safety!
|
97 mike 1.7 q_table_mut.lock(pegasus_thread_self());
98
|
99 mike 1.5 _queueTable.remove(_queueId);
|
100 mike 1.7 q_table_mut.unlock();
101
|
102 mike 1.5 }
103
|
104 mike 1.7 void MessageQueue::enqueue(Message* message) throw(IPCException)
|
105 mike 1.4 {
106 if (!message)
107 throw NullPointer();
108
|
109 mike 1.7 _mut.lock(pegasus_thread_self());
110
111 if (getenv("PEGASUS_TRACE"))
112 {
113 cout << "===== " << getQueueName() << ": ";
114 message->print(cout);
115 }
116
|
117 mike 1.4 if (_back)
118 {
119 _back->_next = message;
120 message->_prev = _back;
121 message->_next = 0;
122 _back = message;
123 }
124 else
125 {
126 _front = message;
127 _back = message;
128 message->_prev = 0;
129 message->_next = 0;
130 }
131 message->_owner = this;
132 _count++;
|
133 mike 1.7 _mut.unlock();
|
134 mike 1.6
135 handleEnqueue();
|
136 mike 1.4 }
137
|
138 mike 1.7 Message* MessageQueue::dequeue() throw(IPCException)
|
139 mike 1.4 {
|
140 mike 1.7 _mut.lock(pegasus_thread_self());
|
141 mike 1.4 if (_front)
142 {
143 Message* message = _front;
144 _front = _front->_next;
145 if (_front)
146 _front->_prev = 0;
147
148 if (_back == message)
149 _back = 0;
|
150 mike 1.7 _count--;
151 _mut.unlock();
|
152 mike 1.4 message->_next = 0;
153 message->_prev = 0;
154 message->_owner = 0;
155 return message;
156 }
|
157 mike 1.7 _mut.unlock();
|
158 mike 1.4 return 0;
159 }
160
|
161 mike 1.7 void MessageQueue::remove(Message* message) throw(IPCException)
|
162 mike 1.4 {
163 if (!message)
164 throw NullPointer();
165
166 if (message->_owner != this)
167 throw NoSuchMessageOnQueue();
168
|
169 mike 1.7 _mut.lock(pegasus_thread_self());
170
|
171 mike 1.4 if (message->_next)
172 message->_next->_prev = message->_prev;
173 else
174 _back = message->_prev;
175
176 if (message->_prev)
177 message->_prev->_next = message->_next;
178 else
179 _front = message->_next;
180
|
181 mike 1.7 _count--;
182 _mut.unlock();
183
|
184 mike 1.4 message->_prev = 0;
185 message->_next = 0;
186 message->_owner = 0;
187 }
188
|
189 mike 1.7 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
|
190 mike 1.4 {
|
191 mike 1.7 _mut.lock(pegasus_thread_self());
192
|
193 mike 1.4 for (Message* m = front(); m; m = m->getNext())
194 {
|
195 mike 1.7 if (m->getType() == type)
196 {
197 _mut.unlock();
198 return m;
199 }
|
200 mike 1.4 }
|
201 mike 1.7 _mut.unlock();
|
202 mike 1.4 return 0;
203 }
204
|
205 mike 1.7 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
|
206 mike 1.4 {
|
207 mike 1.7 _mut.lock(pegasus_thread_self());
208
|
209 mike 1.4 for (Message* m = front(); m; m = m->getNext())
210 {
|
211 mike 1.7 if (m->getKey() == key)
212 {
213 _mut.unlock();
214 return m;
215 }
216
|
217 mike 1.4 }
|
218 mike 1.7 _mut.unlock();
|
219 mike 1.4 return 0;
220 }
221
|
222 mike 1.7 void MessageQueue::print(ostream& os) const throw(IPCException)
|
223 mike 1.4 {
|
224 mike 1.7 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
225
226 for (const Message* m = front(); m; m = m->getNext())
|
227 mike 1.4 m->print(os);
|
228 mike 1.7 const_cast<MessageQueue *>(this)->_mut.unlock();
|
229 mike 1.4 }
230
|
231 mike 1.7 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
|
232 mike 1.4 {
|
233 mike 1.7 _mut.lock(pegasus_thread_self());
234
|
235 mike 1.4 for (Message* m = front(); m; m = m->getNext())
236 {
|
237 mike 1.7 if (m->getType() == type && m->getKey() == key)
238 {
239 _mut.unlock();
240 return m;
241 }
|
242 mike 1.4 }
|
243 mike 1.7 _mut.unlock();
|
244 mike 1.4
|
245 mike 1.5 return 0;
246 }
247
|
248 mike 1.7 void MessageQueue::lock() throw(IPCException)
|
249 mike 1.5 {
|
250 mike 1.7 _mut.lock(pegasus_thread_self());
251 }
|
252 mike 1.5
|
253 mike 1.7 void MessageQueue::unlock()
254 {
255 _mut.unlock();
|
256 mike 1.5 }
257
|
258 mike 1.7 const char* MessageQueue::getQueueName() const
|
259 mike 1.5 {
|
260 mike 1.7 if(_name[0] != 0x00)
261 return _name;
262 return "unknown";
|
263 mike 1.5 }
264
|
265 mike 1.7 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
|
266 mike 1.5 {
267 MessageQueue* queue = 0;
|
268 mike 1.7 q_table_mut.lock(pegasus_thread_self());
269
|
270 mike 1.5 if (_queueTable.lookup(queueId, queue))
|
271 mike 1.7 {
272 q_table_mut.unlock();
273 return queue;
274 }
275
276 // Not found!
|
277 mike 1.5
|
278 mike 1.7 q_table_mut.unlock();
279
|
280 mike 1.4 return 0;
|
281 mike 1.6 }
|
282 mike 1.7
283
284 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
285 {
286 if(name == NULL)
287 throw NullPointer();
288 q_table_mut.lock(pegasus_thread_self());
289
290 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
291 {
292 // ATTN: Need to decide how many characters to compare in queue names
293 if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
294 {
295 q_table_mut.unlock();
296 return( (MessageQueue *)i.value());
297 }
298
299 }
300 q_table_mut.unlock();
301
302 return 0;
303 mike 1.7 }
304
|
305 mike 1.6
306 void MessageQueue::handleEnqueue()
307 {
308
|
309 mike 1.4 }
310
311 PEGASUS_NAMESPACE_END
|