1 mday 1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mday 1.1 //
23 // Author: Mike Day (mdday@us.ibm.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "MessageQueueService.h"
|
30 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
31 mday 1.1
32 PEGASUS_NAMESPACE_BEGIN
33
|
34 mday 1.15
35 cimom *MessageQueueService::_meta_dispatcher = 0;
36 AtomicInt MessageQueueService::_service_count = 0;
37 AtomicInt MessageQueueService::_xid(1);
|
38 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
39 mday 1.15
40
|
41 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
42 Uint32 queueID,
43 Uint32 capabilities,
44 Uint32 mask)
|
45 mday 1.15 : Base(name, true, queueID),
|
46 mday 1.22
|
47 mday 1.1 _mask(mask),
|
48 mday 1.4 _die(0),
|
49 mday 1.41 _incoming(true, 0),
|
50 mday 1.39 _callback(true),
|
51 mday 1.7 _incoming_queue_shutdown(0),
|
52 mday 1.39 _callback_ready(0),
53 _req_thread(_req_proc, this, false),
54 _callback_thread(_callback_proc, this, false)
|
55 mday 1.1 {
|
56 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
57
|
58 mday 1.1 _default_op_timeout.tv_sec = 30;
59 _default_op_timeout.tv_usec = 100;
|
60 mday 1.15
61 _meta_dispatcher_mutex.lock(pegasus_thread_self());
62
63 if( _meta_dispatcher == 0 )
64 {
65 PEGASUS_ASSERT( _service_count.value() == 0 );
66 _meta_dispatcher = new cimom();
67 if (_meta_dispatcher == NULL )
68 {
69 _meta_dispatcher_mutex.unlock();
70
71 throw NullPointer();
72 }
73
74 }
75 _service_count++;
76
77 if( false == register_service(name, _capabilities, _mask) )
78 {
79 _meta_dispatcher_mutex.unlock();
80 throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher");
81 mday 1.15 }
82
83 _meta_dispatcher_mutex.unlock();
|
84 mday 1.39 // _callback_thread.run();
85
|
86 mday 1.4 _req_thread.run();
|
87 mday 1.1 }
88
|
89 mday 1.4
|
90 mday 1.1 MessageQueueService::~MessageQueueService(void)
91 {
92 _die = 1;
|
93 mday 1.7 if (_incoming_queue_shutdown.value() == 0 )
|
94 mday 1.16 {
|
95 mday 1.32 _shutdown_incoming_queue();
|
96 kumpf 1.46 _req_thread.join(); // ATTN-RK-P3-20020521: Is this redundant?
|
97 mday 1.16 }
|
98 mday 1.39 _callback_ready.signal();
99 _callback_thread.join();
100
|
101 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
102 _service_count--;
103 if (_service_count.value() == 0 )
104 {
105 _meta_dispatcher->_shutdown_routed_queue();
106 delete _meta_dispatcher;
|
107 kumpf 1.45 _meta_dispatcher = 0;
|
108 mday 1.15 }
109 _meta_dispatcher_mutex.unlock();
110
|
111 mday 1.1 }
112
|
113 mday 1.15
|
114 mday 1.1
|
115 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
116 {
117
|
118 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
119 return ;
|
120 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
121 0,
122 _queueId,
123 _queueId,
124 true,
125 AsyncIoctl::IO_CLOSE,
126 0,
127 0);
|
128 mday 1.9
|
129 mday 1.8 msg->op = get_op();
|
130 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
131 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
132 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
133 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
134 mday 1.41
|
135 mday 1.32 msg->op->_op_dest = this;
|
136 mday 1.8 msg->op->_request.insert_first(msg);
137
138 _incoming.insert_last_wait(msg->op);
|
139 mday 1.32
|
140 mday 1.16 _req_thread.join();
141
|
142 mday 1.7 }
143
|
144 mday 1.22
145
146 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
147 {
|
148 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
149
|
150 mday 1.22 Base::enqueue(msg);
|
151 kumpf 1.28
152 PEG_METHOD_EXIT();
|
153 mday 1.22 }
154
155
|
156 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
157 {
158 Thread *myself = reinterpret_cast<Thread *>(parm);
159 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
160 AsyncOpNode *operation = 0;
161
162 while ( service->_die.value() == 0 )
163 {
164 service->_callback_ready.wait();
165
166 service->_callback.lock();
167 operation = service->_callback.next(0);
168 while( operation != NULL)
169 {
170 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
171 {
172 operation = service->_callback.remove_no_lock(operation);
173 PEGASUS_ASSERT(operation != NULL);
174 operation->_thread_ptr = myself;
175 operation->_service_ptr = service;
176 service->_handle_async_callback(operation);
177 mday 1.39 break;
178 }
179 operation = service->_callback.next(operation);
180 }
181 service->_callback.unlock();
182 }
183 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
184 return(0);
185 }
186
|
187 mday 1.22
|
188 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
189 mday 1.1 {
|
190 mday 1.5 Thread *myself = reinterpret_cast<Thread *>(parm);
191 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
192
193 // pull messages off the incoming queue and dispatch them. then
194 // check pending messages that are non-blocking
|
195 mday 1.7 AsyncOpNode *operation = 0;
196
|
197 mday 1.6 while ( service->_die.value() == 0 )
|
198 mday 1.1 {
|
199 mday 1.41 try
200 {
|
201 mday 1.44 operation = service->_incoming.remove_first_wait();
|
202 mday 1.41 }
203 catch(ListClosed & )
204 {
205 break;
206 }
207 if( operation )
208 {
209 operation->_thread_ptr = myself;
210 operation->_service_ptr = service;
211 service->_handle_incoming_operation(operation);
212 }
|
213 mday 1.42 }
|
214 mday 1.44
|
215 mday 1.5 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
216 return(0);
|
217 mday 1.1 }
218
|
219 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
220 {
221 return _callback.count();
222 }
223
224
225
|
226 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
227 MessageQueue *q,
228 void *parm)
229 {
230 op->_client_sem.signal();
231 }
232
|
233 mday 1.30
234 // callback function is responsible for cleaning up all resources
235 // including op, op->_callback_node, and op->_callback_ptr
|
236 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
237 {
|
238 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
239 {
240
241 Message *msg = op->get_request();
242 if( msg && ( msg->getMask() & message_mask::ha_async))
243 {
244 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
245 {
246 AsyncLegacyOperationStart *wrapper =
247 static_cast<AsyncLegacyOperationStart *>(msg);
248 msg = wrapper->get_action();
249 delete wrapper;
250 }
251 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
252 {
253 AsyncModuleOperationStart *wrapper =
254 static_cast<AsyncModuleOperationStart *>(msg);
255 msg = wrapper->get_action();
256 delete wrapper;
257 }
258 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
259 mday 1.39 {
260 AsyncModuleOperationStart *wrapper =
261 static_cast<AsyncModuleOperationStart *>(msg);
262 msg = wrapper->get_action();
263 delete wrapper;
264 }
265 else if (msg->getType() == async_messages::ASYNC_OP_START)
266 {
267 AsyncOperationStart *wrapper =
268 static_cast<AsyncOperationStart *>(msg);
269 msg = wrapper->get_action();
270 delete wrapper;
271 }
272 delete msg;
273 }
274
275 msg = op->get_response();
276 if( msg && ( msg->getMask() & message_mask::ha_async))
277 {
278 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
279 {
280 mday 1.39 AsyncLegacyOperationResult *wrapper =
281 static_cast<AsyncLegacyOperationResult *>(msg);
282 msg = wrapper->get_result();
283 delete wrapper;
284 }
285 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
286 {
287 AsyncModuleOperationResult *wrapper =
288 static_cast<AsyncModuleOperationResult *>(msg);
289 msg = wrapper->get_result();
290 delete wrapper;
291 }
292 }
293 void (*callback)(Message *, void *, void *) = op->__async_callback;
294 void *handle = op->_callback_handle;
295 void *parm = op->_callback_parameter;
296 op->release();
297 return_op(op);
298 callback(msg, handle, parm);
299 }
300 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
301 mday 1.39 {
302 // note that _callback_node may be different from op
303 // op->_callback_response_q is a "this" pointer we can use for
304 // static callback methods
305 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
306 }
|
307 mday 1.22 }
308
|
309 mday 1.1
|
310 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
311 // Thread *thread,
312 // MessageQueue *queue)
|
313 mday 1.1 {
|
314 mday 1.5 if ( operation != 0 )
315 {
|
316 mday 1.29
|
317 mday 1.22 // ATTN: optimization
318 // << Tue Feb 19 14:10:38 2002 mdd >>
|
319 mday 1.6 operation->lock();
|
320 mday 1.29
|
321 mday 1.6 Message *rq = operation->_request.next(0);
|
322 mday 1.29
|
323 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
324 // move this to the bottom of the loop when the majority of
325 // messages become async messages.
326
|
327 mday 1.18 // divert legacy messages to handleEnqueue
|
328 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
329 mday 1.18 {
330 rq = operation->_request.remove_first() ;
331 operation->unlock();
332 // delete the op node
|
333 mday 1.39 operation->release();
334 return_op( operation);
|
335 mday 1.24
|
336 mday 1.26 handleEnqueue(rq);
|
337 mday 1.18 return;
338 }
339
|
340 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
341 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
342 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
343 {
344 operation->unlock();
345 _handle_async_callback(operation);
346 }
347 else
348 {
349 PEGASUS_ASSERT(rq != 0 );
350 // ATTN: optimization
351 // << Wed Mar 6 15:00:39 2002 mdd >>
352 // put thread and queue into the asyncopnode structure.
|
353 mday 1.34 // (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
354 // (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
355 // done << Tue Mar 12 14:49:07 2002 mdd >>
|
356 mday 1.29 operation->unlock();
357 _handle_async_request(static_cast<AsyncRequest *>(rq));
358 }
|
359 mday 1.5 }
360 return;
|
361 mday 1.1 }
362
363 void MessageQueueService::_handle_async_request(AsyncRequest *req)
364 {
|
365 mday 1.4 if ( req != 0 )
366 {
367 req->op->processing();
|
368 mday 1.1
|
369 mday 1.4 Uint32 type = req->getType();
370 if( type == async_messages::HEARTBEAT )
371 handle_heartbeat_request(req);
372 else if (type == async_messages::IOCTL)
373 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
374 else if (type == async_messages::CIMSERVICE_START)
375 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
376 else if (type == async_messages::CIMSERVICE_STOP)
377 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
378 else if (type == async_messages::CIMSERVICE_PAUSE)
379 handle_CimServicePause(static_cast<CimServicePause *>(req));
380 else if (type == async_messages::CIMSERVICE_RESUME)
381 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
382 else if ( type == async_messages::ASYNC_OP_START)
383 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
384 else
385 {
386 // we don't handle this request message
387 _make_response(req, async_results::CIM_NAK );
388 }
|
389 mday 1.1 }
390 }
391
|
392 mday 1.17
393 Boolean MessageQueueService::_enqueueResponse(
394 Message* request,
395 Message* response)
|
396 mday 1.18
|
397 mday 1.17 {
|
398 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
399 "MessageQueueService::_enqueueResponse");
|
400 mday 1.25
401 if( request->getMask() & message_mask::ha_async)
402 {
403 if (response->getMask() & message_mask::ha_async )
404 {
405 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
406 static_cast<AsyncReply *>(response),
407 ASYNC_OPSTATE_COMPLETE, 0 );
|
408 kumpf 1.37 PEG_METHOD_EXIT();
|
409 mday 1.25 return true;
410 }
411 }
412
|
413 mday 1.17 if(request->_async != 0 )
414 {
415 Uint32 mask = request->_async->getMask();
|
416 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
417
418 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
419 AsyncOpNode *op = async->op;
420 request->_async = 0;
|
421 mday 1.27 // this request is probably going to be deleted !!
422 // remove it from the op node
423 op->_request.remove(request);
|
424 mday 1.18
425 AsyncLegacyOperationResult *async_result =
426 new AsyncLegacyOperationResult(
427 async->getKey(),
428 async->getRouting(),
429 op,
430 response);
431 _completeAsyncResponse(async,
432 async_result,
433 ASYNC_OPSTATE_COMPLETE,
434 0);
|
435 kumpf 1.37 PEG_METHOD_EXIT();
|
436 mday 1.18 return true;
|
437 mday 1.17 }
|
438 mday 1.18
439 // ensure that the destination queue is in response->dest
|
440 kumpf 1.37 PEG_METHOD_EXIT();
|
441 mday 1.24 return SendForget(response);
|
442 mday 1.18
|
443 mday 1.17 }
444
|
445 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
446 mday 1.1 {
|
447 mday 1.19 cimom::_make_response(req, code);
|
448 mday 1.1 }
449
450
|
451 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
452 AsyncReply *reply,
453 Uint32 state,
454 Uint32 flag)
455 {
|
456 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
457 "MessageQueueService::_completeAsyncResponse");
458
|
459 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
460 kumpf 1.37
461 PEG_METHOD_EXIT();
|
462 mday 1.5 }
463
464
|
465 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
466 Uint32 state,
467 Uint32 flag,
468 Uint32 code)
469 {
470 cimom::_complete_op_node(op, state, flag, code);
471 }
472
|
473 mday 1.5
474 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
475 {
|
476 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
477 return false;
478
|
479 mday 1.20 // ATTN optimization remove the message checking altogether in the base
480 // << Mon Feb 18 14:02:20 2002 mdd >>
|
481 mday 1.6 op->lock();
482 Message *rq = op->_request.next(0);
|
483 mday 1.20 Message *rp = op->_response.next(0);
|
484 mday 1.6 op->unlock();
485
|
486 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
487 _die.value() == 0 )
|
488 mday 1.5 {
|
489 mday 1.6 _incoming.insert_last_wait(op);
|
490 mday 1.5 return true;
491 }
|
492 mday 1.32 // else
493 // {
494 // if( (rq != 0 && (true == MessageQueueService::messageOK(rq))) ||
495 // (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&
496 // _die.value() == 0)
497 // {
498 // MessageQueueService::_incoming.insert_last_wait(op);
499 // return true;
500 // }
501 // }
502
|
503 mday 1.5 return false;
504 }
505
506 Boolean MessageQueueService::messageOK(const Message *msg)
507 {
|
508 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
509 return false;
|
510 mday 1.18 return true;
511 }
512
|
513 mday 1.29
514 // made pure virtual
515 // << Wed Mar 6 15:11:31 2002 mdd >>
|
516 mday 1.25 // void MessageQueueService::handleEnqueue(Message *msg)
517 // {
518 // if ( msg )
519 // delete msg;
520 // }
|
521 mday 1.5
|
522 mday 1.29 // made pure virtual
523 // << Wed Mar 6 15:11:56 2002 mdd >>
|
524 mday 1.25 // void MessageQueueService::handleEnqueue(void)
525 // {
526 // Message *msg = dequeue();
527 // handleEnqueue(msg);
528 // }
|
529 mday 1.5
|
530 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
531 {
532 // default action is to echo a heartbeat response
533
534 AsyncReply *reply =
535 new AsyncReply(async_messages::HEARTBEAT,
536 req->getKey(),
537 req->getRouting(),
538 0,
539 req->op,
540 async_results::OK,
541 req->resp,
542 false);
|
543 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
544 mday 1.1 }
545
546
547 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
548 {
549 ;
550 }
551
552 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
553 {
|
554 mday 1.8
555 switch( req->ctl )
556 {
557 case AsyncIoctl::IO_CLOSE:
558 {
559 // save my bearings
|
560 mday 1.34 Thread *myself = req->op->_thread_ptr;
561 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
562 mday 1.8
563 // respond to this message.
564 _make_response(req, async_results::OK);
565 // ensure we do not accept any further messages
566
567 // ensure we don't recurse on IO_CLOSE
568 if( _incoming_queue_shutdown.value() > 0 )
569 break;
570
571 // set the closing flag
572 service->_incoming_queue_shutdown = 1;
573 // empty out the queue
574 while( 1 )
575 {
576 AsyncOpNode *operation;
577 try
578 {
579 operation = service->_incoming.remove_first();
580 }
581 catch(IPCException & )
582 {
583 mday 1.8 break;
584 }
585 if( operation )
586 {
|
587 mday 1.34 operation->_thread_ptr = myself;
588 operation->_service_ptr = service;
589 service->_handle_incoming_operation(operation);
|
590 mday 1.8 }
591 else
592 break;
593 } // message processing loop
594
595 // shutdown the AsyncDQueue
596 service->_incoming.shutdown_queue();
597 // exit the thread !
598 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
599 return;
600 }
601
602 default:
603 _make_response(req, async_results::CIM_NAK);
604 }
|
605 mday 1.1 }
|
606 mday 1.8
|
607 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
608 {
|
609 mday 1.10 // clear the stoped bit and update
|
610 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
611 mday 1.10 _make_response(req, async_results::OK);
612 // now tell the meta dispatcher we are stopped
613 update_service(_capabilities, _mask);
614
|
615 mday 1.1 }
616 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
617 {
|
618 mday 1.10 // set the stopeed bit and update
619 _capabilities |= module_capabilities::stopped;
620 _make_response(req, async_results::CIM_STOPPED);
621 // now tell the meta dispatcher we are stopped
622 update_service(_capabilities, _mask);
623
|
624 mday 1.1 }
625 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
626 {
|
627 mday 1.10 // set the paused bit and update
|
628 mday 1.13 _capabilities |= module_capabilities::paused;
|
629 mday 1.11 update_service(_capabilities, _mask);
|
630 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
631 // now tell the meta dispatcher we are stopped
|
632 mday 1.1 }
633 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
634 {
|
635 mday 1.10 // clear the paused bit and update
|
636 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
637 mday 1.11 update_service(_capabilities, _mask);
|
638 mday 1.10 _make_response(req, async_results::OK);
639 // now tell the meta dispatcher we are stopped
|
640 mday 1.1 }
641
642 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
643 {
644 _make_response(req, async_results::CIM_NAK);
645 }
646
647 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
648 {
|
649 mday 1.14 ;
650 }
651
|
652 mday 1.10
|
653 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
654 {
655 // remove the legacy message from the request and enqueue it to its destination
656 Uint32 result = async_results::CIM_NAK;
657
|
658 mday 1.25 Message *legacy = req->_act;
|
659 mday 1.14 if ( legacy != 0 )
660 {
|
661 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
662 mday 1.14 if( queue != 0 )
663 {
|
664 mday 1.25 if(queue->isAsync() == true )
665 {
666 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
667 }
668 else
669 {
670 // Enqueue the response:
671 queue->enqueue(req->get_action());
672 }
673
|
674 mday 1.14 result = async_results::OK;
675 }
676 }
677 _make_response(req, result);
678 }
679
680 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
681 {
682 ;
|
683 mday 1.1 }
684
685 AsyncOpNode *MessageQueueService::get_op(void)
686 {
|
687 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
688 mday 1.1
|
689 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
690 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
691 mday 1.4
|
692 mday 1.1 return op;
693 }
694
695 void MessageQueueService::return_op(AsyncOpNode *op)
696 {
697 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
698 mday 1.4 delete op;
|
699 mday 1.1 }
700
|
701 mday 1.18
|
702 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
703 Uint32 destination)
704 {
705 PEGASUS_ASSERT(op != 0 );
|
706 mday 1.30 op->lock();
707 op->_op_dest = MessageQueue::lookup(destination);
|
708 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
709 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
710 mday 1.30 op->unlock();
711 if ( op->_op_dest == 0 )
712 return false;
713
|
714 mday 1.29 return _meta_dispatcher->route_async(op);
715 }
716
|
717 mday 1.39
|
718 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
719 Uint32 destination,
|
720 mday 1.18 void (*callback)(AsyncOpNode *,
|
721 mday 1.32 MessageQueue *,
|
722 mday 1.30 void *),
723 MessageQueue *callback_response_q,
724 void *callback_ptr)
|
725 mday 1.20 {
|
726 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
727 mday 1.18
|
728 mday 1.21 // get the queue handle for the destination
729
|
730 mday 1.30 op->lock();
|
731 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
732 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
733 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
734 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
735 mday 1.30 // initialize the callback data
|
736 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
737 op->_callback_node = op; // the op node
738 op->_callback_response_q = callback_response_q; // the queue that will receive the response
739 op->_callback_ptr = callback_ptr; // user data for callback
740 op->_callback_request_q = this; // I am the originator of this request
|
741 mday 1.30
742 op->unlock();
743 if(op->_op_dest == 0)
744 return false;
|
745 mday 1.21
746 return _meta_dispatcher->route_async(op);
|
747 mday 1.18 }
748
749
|
750 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
751 Uint32 destination,
752 void (*callback)(Message *response,
753 void *handle,
754 void *parameter),
755 void *handle,
756 void *parameter)
757 {
758 if(msg == NULL)
759 return false;
760 if(callback == NULL)
761 return SendForget(msg);
762 AsyncOpNode *op = get_op();
|
763 mday 1.43 msg->dest = destination;
|
764 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
765 {
766 op->release();
767 return_op(op);
768 return false;
769 }
770 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
771 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
772 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
773 op->__async_callback = callback;
774 op->_callback_node = op;
775 op->_callback_handle = handle;
776 op->_callback_parameter = parameter;
777 op->_callback_response_q = this;
778
779
780 if( ! (msg->getMask() & message_mask::ha_async) )
781 {
782 AsyncLegacyOperationStart *wrapper =
783 new AsyncLegacyOperationStart(get_next_xid(),
784 op,
785 mday 1.39 destination,
786 msg,
787 destination);
788 }
789 else
790 {
791 op->_request.insert_first(msg);
792 (static_cast<AsyncMessage *>(msg))->op = op;
793 }
794
795 _callback.insert_last(op);
796 return _meta_dispatcher->route_async(op);
797 }
798
799
|
800 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
801 {
802
|
803 mday 1.24
|
804 mday 1.18 AsyncOpNode *op = 0;
|
805 mday 1.22 Uint32 mask = msg->getMask();
806
807 if (mask & message_mask::ha_async)
|
808 mday 1.18 {
809 op = (static_cast<AsyncMessage *>(msg))->op ;
810 }
|
811 mday 1.22
|
812 mday 1.18 if( op == 0 )
|
813 mday 1.20 {
|
814 mday 1.18 op = get_op();
|
815 mday 1.20 op->_request.insert_first(msg);
|
816 mday 1.22 if (mask & message_mask::ha_async)
817 (static_cast<AsyncMessage *>(msg))->op = op;
|
818 mday 1.20 }
|
819 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
820 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
821 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
822 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
823 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
824 mday 1.30 if ( op->_op_dest == 0 )
|
825 mday 1.39 {
826 op->release();
827 return_op(op);
|
828 mday 1.21 return false;
|
829 mday 1.39 }
|
830 mday 1.24
|
831 mday 1.18 // now see if the meta dispatcher will take it
|
832 mday 1.30 return _meta_dispatcher->route_async(op);
|
833 mday 1.18 }
|
834 mday 1.2
|
835 mday 1.1
|
836 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
837 mday 1.1 {
|
838 mday 1.4 if ( request == 0 )
839 return 0 ;
|
840 mday 1.5
841 Boolean destroy_op = false;
842
843 if (request->op == false)
844 {
845 request->op = get_op();
|
846 mday 1.7 request->op->_request.insert_first(request);
|
847 mday 1.5 destroy_op = true;
848 }
|
849 mday 1.4
|
850 mday 1.33 request->block = false;
|
851 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
852 mday 1.33 SendAsync(request->op,
853 request->dest,
|
854 mday 1.36 _sendwait_callback,
|
855 mday 1.33 this,
856 (void *)0);
|
857 mday 1.4
|
858 mday 1.33 request->op->_client_sem.wait();
|
859 mday 1.6 request->op->lock();
860 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
861 rpl->op = 0;
862 request->op->unlock();
863
|
864 mday 1.5 if( destroy_op == true)
865 {
|
866 mday 1.6 request->op->lock();
867 request->op->_request.remove(request);
868 request->op->_state |= ASYNC_OPSTATE_RELEASED;
869 request->op->unlock();
870 return_op(request->op);
|
871 mday 1.7 request->op = 0;
|
872 mday 1.5 }
873 return rpl;
|
874 mday 1.1 }
875
876
877 Boolean MessageQueueService::register_service(String name,
878 Uint32 capabilities,
879 Uint32 mask)
880
881 {
882 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
883 mday 1.5 0,
|
884 mday 1.1 true,
885 name,
886 capabilities,
887 mask,
888 _queueId);
|
889 mday 1.44 msg->dest = CIMOM_Q_ID;
890
|
891 mday 1.1 Boolean registered = false;
|
892 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
893 mday 1.1
|
894 mday 1.2 if ( reply != 0 )
|
895 mday 1.1 {
896 if(reply->getMask() & message_mask:: ha_async)
897 {
898 if(reply->getMask() & message_mask::ha_reply)
899 {
|
900 mday 1.15 if(reply->result == async_results::OK ||
901 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
902 mday 1.1 registered = true;
903 }
904 }
905
|
906 mday 1.7 delete reply;
|
907 mday 1.1 }
|
908 mday 1.5 delete msg;
|
909 mday 1.1 return registered;
910 }
911
912 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
913 {
914
915
916 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
917 mday 1.5 0,
|
918 mday 1.1 true,
919 _queueId,
920 _capabilities,
921 _mask);
922 Boolean registered = false;
|
923 mday 1.2
924 AsyncMessage *reply = SendWait(msg);
925 if (reply)
|
926 mday 1.1 {
927 if(reply->getMask() & message_mask:: ha_async)
928 {
929 if(reply->getMask() & message_mask::ha_reply)
930 {
|
931 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
932 mday 1.1 registered = true;
933 }
934 }
935 delete reply;
936 }
|
937 mday 1.5 delete msg;
|
938 mday 1.1 return registered;
939 }
940
941
942 Boolean MessageQueueService::deregister_service(void)
943 {
|
944 mday 1.3
|
945 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
946 return true;
|
947 mday 1.1 }
948
949
950 void MessageQueueService::find_services(String name,
951 Uint32 capabilities,
952 Uint32 mask,
953 Array<Uint32> *results)
954 {
955
956 if( results == 0 )
957 throw NullPointer();
|
958 mday 1.5
|
959 mday 1.1 results->clear();
960
961 FindServiceQueue *req =
962 new FindServiceQueue(get_next_xid(),
|
963 mday 1.5 0,
|
964 mday 1.1 _queueId,
965 true,
966 name,
967 capabilities,
968 mask);
|
969 mday 1.44
970 req->dest = CIMOM_Q_ID;
|
971 mday 1.1
|
972 mday 1.2 AsyncMessage *reply = SendWait(req);
973 if(reply)
|
974 mday 1.1 {
975 if( reply->getMask() & message_mask::ha_async)
976 {
977 if(reply->getMask() & message_mask::ha_reply)
978 {
979 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
980 {
981 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
982 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
983 }
984 }
985 }
986 delete reply;
987 }
|
988 mday 1.5 delete req;
|
989 mday 1.1 return ;
990 }
991
992 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
993 {
994 if(result == 0)
995 throw NullPointer();
996
997 EnumerateService *req
998 = new EnumerateService(get_next_xid(),
|
999 mday 1.5 0,
|
1000 mday 1.1 _queueId,
1001 true,
1002 queue);
1003
|
1004 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1005 mday 1.1
|
1006 mday 1.2 if (reply)
|
1007 mday 1.1 {
1008 Boolean found = false;
1009
1010 if( reply->getMask() & message_mask::ha_async)
1011 {
1012 if(reply->getMask() & message_mask::ha_reply)
1013 {
1014 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1015 {
1016 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1017 {
1018 if( found == false)
1019 {
1020 found = true;
1021
1022 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1023 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1024 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1025 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1026 }
1027 }
1028 mday 1.1 }
1029 }
1030 }
1031 delete reply;
1032 }
|
1033 mday 1.5 delete req;
1034
|
1035 mday 1.1 return;
1036 }
1037
1038 Uint32 MessageQueueService::get_next_xid(void)
1039 {
1040 _xid++;
1041 return _xid.value();
1042 }
1043
1044 PEGASUS_NAMESPACE_END
|