1 mday 1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mday 1.1 //
23 // Author: Mike Day (mdday@us.ibm.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "MessageQueueService.h"
30
31 PEGASUS_NAMESPACE_BEGIN
32
33 MessageQueueService::MessageQueueService(const char *name,
34 Uint32 queueID,
35 Uint32 capabilities,
36 Uint32 mask)
|
37 mday 1.4 : Base(name, false, queueID),
|
38 mday 1.1 _capabilities(capabilities),
39 _mask(mask),
|
40 mday 1.4 _die(0),
|
41 mday 1.5 _pending(true),
|
42 mday 1.6 _incoming(true, 1000),
|
43 mday 1.5 _req_thread(_req_proc, this, false)
|
44 mday 1.1 {
45 _default_op_timeout.tv_sec = 30;
46 _default_op_timeout.tv_usec = 100;
47 _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));
48 if(_meta_dispatcher == 0 )
49 throw NullPointer();
|
50 mday 1.4 _req_thread.run();
51
|
52 mday 1.1 }
53
|
54 mday 1.4
|
55 mday 1.1 MessageQueueService::~MessageQueueService(void)
56 {
57 _die = 1;
|
58 mday 1.6 _incoming.shutdown_queue();
59
|
60 mday 1.4 _req_thread.join();
|
61 mday 1.5
|
62 mday 1.1 }
63
|
64 mday 1.4 AtomicInt MessageQueueService::_xid(1);
|
65 mday 1.1
66
|
67 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
68 mday 1.1 {
|
69 mday 1.5 Thread *myself = reinterpret_cast<Thread *>(parm);
70 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
71
72 // pull messages off the incoming queue and dispatch them. then
73 // check pending messages that are non-blocking
74
|
75 mday 1.6 while ( service->_die.value() == 0 )
|
76 mday 1.1 {
|
77 mday 1.6 AsyncOpNode *operation = service->_incoming.remove_first_wait();
78 if ( operation == 0 )
79 break;
|
80 mday 1.5
81 service->_handle_incoming_operation(operation);
|
82 mday 1.1 }
|
83 mday 1.5
84 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
85 return(0);
|
86 mday 1.1 }
87
88
|
89 mday 1.5 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
|
90 mday 1.1 {
|
91 mday 1.5 if ( operation != 0 )
92 {
|
93 mday 1.6 operation->lock();
94 Message *rq = operation->_request.next(0);
95 operation->unlock();
96
|
97 mday 1.5 PEGASUS_ASSERT(rq != 0 );
98 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
99 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
100 _handle_async_request(static_cast<AsyncRequest *>(rq));
101 }
|
102 mday 1.1
|
103 mday 1.5 return;
|
104 mday 1.1
105 }
106
107 void MessageQueueService::_handle_async_request(AsyncRequest *req)
108 {
|
109 mday 1.4 if ( req != 0 )
110 {
111 req->op->processing();
|
112 mday 1.1
|
113 mday 1.4 Uint32 type = req->getType();
114 if( type == async_messages::HEARTBEAT )
115 handle_heartbeat_request(req);
116 else if (type == async_messages::IOCTL)
117 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
118 else if (type == async_messages::CIMSERVICE_START)
119 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
120 else if (type == async_messages::CIMSERVICE_STOP)
121 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
122 else if (type == async_messages::CIMSERVICE_PAUSE)
123 handle_CimServicePause(static_cast<CimServicePause *>(req));
124 else if (type == async_messages::CIMSERVICE_RESUME)
125 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
126 else if ( type == async_messages::ASYNC_OP_START)
127 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
128 else
129 {
130 // we don't handle this request message
131 _make_response(req, async_results::CIM_NAK );
132 }
|
133 mday 1.1 }
134 }
135
136 void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code)
137 {
138 AsyncReply *reply =
139 new AsyncReply(async_messages::REPLY,
140 req->getKey(),
141 req->getRouting(),
142 0,
143 req->op,
144 code,
145 req->resp,
146 false);
|
147 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
148 mday 1.1 }
149
150
|
151 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
152 AsyncReply *reply,
153 Uint32 state,
154 Uint32 flag)
155 {
156 PEGASUS_ASSERT(request != 0 && reply != 0 );
157
158 AsyncOpNode *op = request->op;
159 op->lock();
160 op->_state |= state ;
161 op->_flags |= flag;
162 gettimeofday(&(op->_updated), NULL);
|
163 mday 1.6 if ( false == op->_request.exists(reinterpret_cast<void *>(reply)) )
164 op->_request.insert_last(reply);
|
165 mday 1.5 op->unlock();
|
166 mday 1.6
|
167 mday 1.5 op->_client_sem.signal();
|
168 mday 1.6
|
169 mday 1.5
170 }
171
172
173
174 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
175 {
|
176 mday 1.6 op->lock();
177 Message *rq = op->_request.next(0);
178 op->unlock();
179
|
180 mday 1.5 if( true == messageOK(rq) && _die.value() == 0 )
181 {
|
182 mday 1.6 _incoming.insert_last_wait(op);
|
183 mday 1.5 return true;
184 }
185 return false;
186 }
187
188 Boolean MessageQueueService::messageOK(const Message *msg)
189 {
190 if ( msg != 0 )
191 {
192 Uint32 mask = msg->getMask();
193 if ( mask & message_mask::ha_async)
194 if ( mask & message_mask::ha_request)
195 return true;
196 }
197 return false;
198 }
199
200
201 void MessageQueueService::handleEnqueue(void)
202 {
203 Message *msg = dequeue();
204 mday 1.5 if( msg )
205 {
206 delete msg;
207 }
208 }
209
|
210 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
211 {
212 // default action is to echo a heartbeat response
213
214 AsyncReply *reply =
215 new AsyncReply(async_messages::HEARTBEAT,
216 req->getKey(),
217 req->getRouting(),
218 0,
219 req->op,
220 async_results::OK,
221 req->resp,
222 false);
|
223 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
224 mday 1.1 }
225
226
227 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
228 {
229 ;
230 }
231
232 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
233 {
234 _make_response(req, async_results::OK);
235 }
236 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
237 {
238 _make_response(req, async_results::CIM_NAK);
239 }
240 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
241 {
242 _make_response(req, async_results::CIM_NAK);
243 }
244 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
245 mday 1.1 {
246 _make_response(req, async_results::CIM_NAK);
247 }
248 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
249 {
250 _make_response(req, async_results::CIM_NAK);
251 }
252
253 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
254 {
255 _make_response(req, async_results::CIM_NAK);
256
257 }
258
259 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
260 {
261 ;
262 }
263
264 AsyncOpNode *MessageQueueService::get_op(void)
265 {
|
266 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
267 mday 1.1
268 op->write_state(ASYNC_OPSTATE_UNKNOWN);
|
269 mday 1.4 op->write_flags(ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL );
270
|
271 mday 1.1 return op;
272 }
273
274 void MessageQueueService::return_op(AsyncOpNode *op)
275 {
276 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
277 mday 1.4 delete op;
|
278 mday 1.1 }
279
|
280 mday 1.2
|
281 mday 1.1
|
282 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
283 mday 1.1 {
|
284 mday 1.4 if ( request == 0 )
285 return 0 ;
|
286 mday 1.5
287 Boolean destroy_op = false;
288
289 if (request->op == false)
290 {
291 request->op = get_op();
292 request->op->put_request(request);
293
294 destroy_op = true;
295 }
|
296 mday 1.4
297 request->block = true;
298 request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
299 request->op->put_response(0);
|
300 mday 1.1
|
301 mday 1.4 // first link it on our pending list
|
302 mday 1.5 // _pending.insert_last_wait(request->op);
|
303 mday 1.2
|
304 mday 1.4 // now see if the meta dispatcher will take it
|
305 mday 1.2
|
306 mday 1.4 if (true == _meta_dispatcher->route_async(request->op))
|
307 mday 1.1 {
|
308 mday 1.4 request->op->_client_sem.wait();
|
309 mday 1.6 PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);
310
|
311 mday 1.1 }
|
312 mday 1.4
|
313 mday 1.6 request->op->lock();
314 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
315 rpl->op = 0;
316 request->op->unlock();
317
|
318 mday 1.5 if( destroy_op == true)
319 {
|
320 mday 1.6 request->op->lock();
321 request->op->_request.remove(request);
322 request->op->_state |= ASYNC_OPSTATE_RELEASED;
323 request->op->unlock();
324
325 return_op(request->op);
326
327 // delete request->op;
328 // request->op = 0;
|
329 mday 1.5 }
330
331 return rpl;
|
332 mday 1.1 }
333
334
335 Boolean MessageQueueService::register_service(String name,
336 Uint32 capabilities,
337 Uint32 mask)
338
339 {
340 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
341 mday 1.5 0,
|
342 mday 1.1 true,
343 name,
344 capabilities,
345 mask,
346 _queueId);
347 Boolean registered = false;
|
348 mday 1.2 AsyncMessage *reply = SendWait( msg );
|
349 mday 1.1
|
350 mday 1.2 if ( reply != 0 )
|
351 mday 1.1 {
352 if(reply->getMask() & message_mask:: ha_async)
353 {
354 if(reply->getMask() & message_mask::ha_reply)
355 {
|
356 mday 1.2 if((static_cast<AsyncReply *>(reply))->result == async_results::OK)
|
357 mday 1.1 registered = true;
358 }
359 }
360
361 delete reply;
362 }
|
363 mday 1.5 delete msg;
|
364 mday 1.1 return registered;
365 }
366
367 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
368 {
369
370
371 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
372 mday 1.5 0,
|
373 mday 1.1 true,
374 _queueId,
375 _capabilities,
376 _mask);
377 Boolean registered = false;
|
378 mday 1.2
379 AsyncMessage *reply = SendWait(msg);
380 if (reply)
|
381 mday 1.1 {
382 if(reply->getMask() & message_mask:: ha_async)
383 {
384 if(reply->getMask() & message_mask::ha_reply)
385 {
|
386 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
387 mday 1.1 registered = true;
388 }
389 }
390 delete reply;
391 }
|
392 mday 1.5 delete msg;
|
393 mday 1.1 return registered;
394 }
395
396
397 Boolean MessageQueueService::deregister_service(void)
398 {
|
399 mday 1.3
|
400 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
401 return true;
|
402 mday 1.1 }
403
404
405 void MessageQueueService::find_services(String name,
406 Uint32 capabilities,
407 Uint32 mask,
408 Array<Uint32> *results)
409 {
410
411 if( results == 0 )
412 throw NullPointer();
|
413 mday 1.5
|
414 mday 1.1 results->clear();
415
416 FindServiceQueue *req =
417 new FindServiceQueue(get_next_xid(),
|
418 mday 1.5 0,
|
419 mday 1.1 _queueId,
420 true,
421 name,
422 capabilities,
423 mask);
424
|
425 mday 1.2 AsyncMessage *reply = SendWait(req);
426 if(reply)
|
427 mday 1.1 {
428 if( reply->getMask() & message_mask::ha_async)
429 {
430 if(reply->getMask() & message_mask::ha_reply)
431 {
432 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
433 {
434 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
435 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
436 }
437 }
438 }
439 delete reply;
440 }
|
441 mday 1.5 delete req;
|
442 mday 1.1 return ;
443 }
444
445 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
446 {
447 if(result == 0)
448 throw NullPointer();
449
450 EnumerateService *req
451 = new EnumerateService(get_next_xid(),
|
452 mday 1.5 0,
|
453 mday 1.1 _queueId,
454 true,
455 queue);
456
|
457 mday 1.2 AsyncMessage *reply = SendWait(req);
|
458 mday 1.1
|
459 mday 1.2 if (reply)
|
460 mday 1.1 {
461 Boolean found = false;
462
463 if( reply->getMask() & message_mask::ha_async)
464 {
465 if(reply->getMask() & message_mask::ha_reply)
466 {
467 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
468 {
469 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
470 {
471 if( found == false)
472 {
473 found = true;
474
475 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
476 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
477 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
478 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
479 }
480 }
481 mday 1.1 }
482 }
483 }
484 delete reply;
485 }
|
486 mday 1.5 delete req;
487
|
488 mday 1.1 return;
489 }
490
491 Uint32 MessageQueueService::get_next_xid(void)
492 {
493 _xid++;
494 return _xid.value();
495 }
496
497 PEGASUS_NAMESPACE_END
|