1 mday 1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mday 1.1 //
23 // Author: Mike Day (mdday@us.ibm.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "MessageQueueService.h"
|
30 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
31 mday 1.1
32 PEGASUS_NAMESPACE_BEGIN
33
|
34 mday 1.15
35 cimom *MessageQueueService::_meta_dispatcher = 0;
36 AtomicInt MessageQueueService::_service_count = 0;
37 AtomicInt MessageQueueService::_xid(1);
38 Mutex MessageQueueService::_meta_dispatcher_mutex = Mutex();
39
40
|
41 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
42 Uint32 queueID,
43 Uint32 capabilities,
44 Uint32 mask)
|
45 mday 1.15 : Base(name, true, queueID),
|
46 mday 1.22
|
47 mday 1.1 _mask(mask),
|
48 mday 1.4 _die(0),
|
49 mday 1.5 _pending(true),
|
50 mday 1.6 _incoming(true, 1000),
|
51 mday 1.7 _incoming_queue_shutdown(0),
|
52 mday 1.5 _req_thread(_req_proc, this, false)
|
53 mday 1.1 {
|
54 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
55
|
56 mday 1.1 _default_op_timeout.tv_sec = 30;
57 _default_op_timeout.tv_usec = 100;
|
58 mday 1.15
59 _meta_dispatcher_mutex.lock(pegasus_thread_self());
60
61 if( _meta_dispatcher == 0 )
62 {
63 PEGASUS_ASSERT( _service_count.value() == 0 );
64 _meta_dispatcher = new cimom();
65 if (_meta_dispatcher == NULL )
66 {
67 _meta_dispatcher_mutex.unlock();
68
69 throw NullPointer();
70 }
71
72 }
73 _service_count++;
74
75
76 if( false == register_service(name, _capabilities, _mask) )
77 {
78 _meta_dispatcher_mutex.unlock();
79 mday 1.15 throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher");
80 }
81
82 _meta_dispatcher_mutex.unlock();
83
|
84 mday 1.4 _req_thread.run();
|
85 mday 1.1 }
86
|
87 mday 1.4
|
88 mday 1.1 MessageQueueService::~MessageQueueService(void)
89 {
90 _die = 1;
|
91 mday 1.7 if (_incoming_queue_shutdown.value() == 0 )
|
92 mday 1.16 {
|
93 mday 1.7 _incoming.shutdown_queue();
|
94 mday 1.16 _req_thread.join();
95 }
96
|
97 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
98 _service_count--;
99 if (_service_count.value() == 0 )
100 {
101 _meta_dispatcher->_shutdown_routed_queue();
102 delete _meta_dispatcher;
103 }
104 _meta_dispatcher_mutex.unlock();
105
|
106 mday 1.1 }
107
|
108 mday 1.15
|
109 mday 1.1
|
110 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
111 {
112
|
113 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
114 return ;
115
|
116 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
117 0,
118 _queueId,
119 _queueId,
120 true,
121 AsyncIoctl::IO_CLOSE,
122 0,
123 0);
|
124 mday 1.9
|
125 mday 1.8 msg->op = get_op();
126 msg->op->_request.insert_first(msg);
|
127 mday 1.21 msg->op->_op_dest = this;
|
128 mday 1.8
129 _incoming.insert_last_wait(msg->op);
130 msg->op->_client_sem.wait();
131
132 msg->op->lock();
133 AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
134 reply->op = 0;
135 msg->op->unlock();
|
136 mday 1.9 delete reply;
|
137 mday 1.8
138 msg->op->_request.remove(msg);
139 msg->op->_state |= ASYNC_OPSTATE_RELEASED;
140 return_op(msg->op);
|
141 mday 1.9
|
142 mday 1.8 msg->op = 0;
143 delete msg;
|
144 mday 1.16 _req_thread.join();
145
|
146 mday 1.7 }
147
|
148 mday 1.22
149
150 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
151 {
|
152 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
153
|
154 mday 1.22 Base::enqueue(msg);
|
155 kumpf 1.28
156 PEG_METHOD_EXIT();
|
157 mday 1.22
158 // PEGASUS_ASSERT(msg != 0 );
159
160 // cout << "inside overriden enqueue" << endl;
|
161 mday 1.26 // if (!msg)
|
162 mday 1.22 // {
163 // Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
164 // "MessageQueue::enqueue failure");
165 // throw NullPointer();
166 // }
167
168 // if (getenv("PEGASUS_TRACE"))
169 // {
170 // cout << "===== " << getQueueName() << ": ";
171 // msg->print(cout);
172 // }
173
174 // msg->dest = _queueId;
|
175 mday 1.26
|
176 mday 1.22 // SendForget(msg);
177
178 }
179
180
181
|
182 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
183 mday 1.1 {
|
184 mday 1.5 Thread *myself = reinterpret_cast<Thread *>(parm);
185 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
186
187 // pull messages off the incoming queue and dispatch them. then
188 // check pending messages that are non-blocking
|
189 mday 1.7 AsyncOpNode *operation = 0;
190
|
191 mday 1.6 while ( service->_die.value() == 0 )
|
192 mday 1.1 {
|
193 mday 1.7 try
194 {
|
195 mday 1.8 operation = service->_incoming.remove_first_wait();
|
196 mday 1.7 }
197 catch(ListClosed & )
198 {
|
199 mday 1.6 break;
|
200 mday 1.7 }
201 if( operation )
|
202 mday 1.8 {
203
204 service->_handle_incoming_operation(operation, myself, service);
205 }
|
206 mday 1.1 }
|
207 mday 1.5
208 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
209 return(0);
|
210 mday 1.1 }
211
|
212 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
213 {
214 return_op(op);
215 }
216
|
217 mday 1.1
|
218 mday 1.8 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,
219 Thread *thread,
220 MessageQueue *queue)
|
221 mday 1.1 {
|
222 mday 1.5 if ( operation != 0 )
223 {
|
224 mday 1.22
225 // ATTN: optimization
226 // << Tue Feb 19 14:10:38 2002 mdd >>
|
227 mday 1.6 operation->lock();
|
228 mday 1.22 if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) &&
229 (operation->_state & ASYNC_OPSTATE_COMPLETE))
230 {
231 operation->unlock();
232 _handle_async_callback(operation);
233 }
234
|
235 mday 1.6 Message *rq = operation->_request.next(0);
|
236 mday 1.18 PEGASUS_ASSERT(rq != 0 );
|
237 mday 1.28.2.1
238 // divert "plain" legacy messages to handleEnqueue
239 if ( ! (rq->getMask() & message_mask::ha_async) )
|
240 mday 1.18 {
241 rq = operation->_request.remove_first() ;
242 operation->unlock();
243 // delete the op node
244 delete operation;
|
245 mday 1.24
|
246 mday 1.28.2.1
|
247 mday 1.24 // Attn: change to handleEnqueue(msg) when we have that method in all messagequeueservices
248 // make handleEnqueue pure virtual !!!
249 // << Fri Feb 22 13:39:09 2002 mdd >>
250
|
251 mday 1.26 handleEnqueue(rq);
|
252 mday 1.18 return;
253 }
254
|
255 mday 1.6 operation->unlock();
|
256 mday 1.8 static_cast<AsyncMessage *>(rq)->_myself = thread;
257 static_cast<AsyncMessage *>(rq)->_service = queue;
|
258 mday 1.5 _handle_async_request(static_cast<AsyncRequest *>(rq));
259 }
|
260 mday 1.1
|
261 mday 1.5 return;
|
262 mday 1.1
263 }
264
265 void MessageQueueService::_handle_async_request(AsyncRequest *req)
266 {
|
267 mday 1.4 if ( req != 0 )
268 {
269 req->op->processing();
|
270 mday 1.1
|
271 mday 1.4 Uint32 type = req->getType();
272 if( type == async_messages::HEARTBEAT )
273 handle_heartbeat_request(req);
274 else if (type == async_messages::IOCTL)
275 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
276 else if (type == async_messages::CIMSERVICE_START)
277 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
278 else if (type == async_messages::CIMSERVICE_STOP)
279 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
280 else if (type == async_messages::CIMSERVICE_PAUSE)
281 handle_CimServicePause(static_cast<CimServicePause *>(req));
282 else if (type == async_messages::CIMSERVICE_RESUME)
283 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
284 else if ( type == async_messages::ASYNC_OP_START)
285 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
286 else
287 {
288 // we don't handle this request message
289 _make_response(req, async_results::CIM_NAK );
290 }
|
291 mday 1.1 }
292 }
293
|
294 mday 1.17
295 Boolean MessageQueueService::_enqueueResponse(
296 Message* request,
297 Message* response)
|
298 mday 1.18
|
299 mday 1.17 {
|
300 mday 1.25
301 if( request->getMask() & message_mask::ha_async)
302 {
303 if (response->getMask() & message_mask::ha_async )
304 {
305 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
306 static_cast<AsyncReply *>(response),
307 ASYNC_OPSTATE_COMPLETE, 0 );
308 return true;
309 }
310 }
311
|
312 mday 1.17 if(request->_async != 0 )
313 {
314 Uint32 mask = request->_async->getMask();
|
315 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
316
317 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
318 AsyncOpNode *op = async->op;
319 request->_async = 0;
|
320 mday 1.27 // this request is probably going to be deleted !!
321 // remove it from the op node
322 op->_request.remove(request);
|
323 mday 1.18
324 AsyncLegacyOperationResult *async_result =
325 new AsyncLegacyOperationResult(
326 async->getKey(),
327 async->getRouting(),
328 op,
329 response);
330 _completeAsyncResponse(async,
331 async_result,
332 ASYNC_OPSTATE_COMPLETE,
333 0);
334 return true;
|
335 mday 1.17 }
|
336 mday 1.18
337 // ensure that the destination queue is in response->dest
|
338 mday 1.24 return SendForget(response);
|
339 mday 1.18
|
340 mday 1.17 }
341
|
342 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
343 mday 1.1 {
|
344 mday 1.19 cimom::_make_response(req, code);
|
345 mday 1.1 }
346
347
|
348 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
349 AsyncReply *reply,
350 Uint32 state,
351 Uint32 flag)
352 {
|
353 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
354 mday 1.5 }
355
356
357
358 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
359 {
|
360 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
361 return false;
362
|
363 mday 1.20 // ATTN optimization remove the message checking altogether in the base
364 // << Mon Feb 18 14:02:20 2002 mdd >>
|
365 mday 1.6 op->lock();
366 Message *rq = op->_request.next(0);
|
367 mday 1.20 Message *rp = op->_response.next(0);
|
368 mday 1.6 op->unlock();
369
|
370 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
371 _die.value() == 0 )
|
372 mday 1.5 {
|
373 mday 1.6 _incoming.insert_last_wait(op);
|
374 mday 1.5 return true;
375 }
376 return false;
377 }
378
379 Boolean MessageQueueService::messageOK(const Message *msg)
380 {
|
381 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
382 return false;
|
383 mday 1.18 return true;
384 }
385
|
386 mday 1.25 // void MessageQueueService::handleEnqueue(Message *msg)
387 // {
|
388 mday 1.22
|
389 mday 1.24
|
390 mday 1.25 // if ( msg )
391 // delete msg;
392 // }
|
393 mday 1.5
394
|
395 mday 1.25 // void MessageQueueService::handleEnqueue(void)
396 // {
397 // Message *msg = dequeue();
398 // handleEnqueue(msg);
399 // }
|
400 mday 1.5
|
401 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
402 {
403 // default action is to echo a heartbeat response
404
405 AsyncReply *reply =
406 new AsyncReply(async_messages::HEARTBEAT,
407 req->getKey(),
408 req->getRouting(),
409 0,
410 req->op,
411 async_results::OK,
412 req->resp,
413 false);
|
414 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
415 mday 1.1 }
416
417
418 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
419 {
420 ;
421 }
422
423 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
424 {
|
425 mday 1.8
426 switch( req->ctl )
427 {
428 case AsyncIoctl::IO_CLOSE:
429 {
430 // save my bearings
431 Thread *myself = req->_myself;
432 MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
433
434 // respond to this message.
435 _make_response(req, async_results::OK);
436 // ensure we do not accept any further messages
437
438 // ensure we don't recurse on IO_CLOSE
439 if( _incoming_queue_shutdown.value() > 0 )
440 break;
441
442 // set the closing flag
443 service->_incoming_queue_shutdown = 1;
444 // empty out the queue
445 while( 1 )
446 mday 1.8 {
447 AsyncOpNode *operation;
448 try
449 {
450 operation = service->_incoming.remove_first();
451 }
452 catch(IPCException & )
453 {
454 break;
455 }
456 if( operation )
457 {
458 service->_handle_incoming_operation(operation, myself, service);
459 }
460 else
461 break;
462 } // message processing loop
463
464 // shutdown the AsyncDQueue
465 service->_incoming.shutdown_queue();
466 // exit the thread !
467 mday 1.8 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
468 return;
469 }
470
471 default:
472 _make_response(req, async_results::CIM_NAK);
473 }
|
474 mday 1.1 }
|
475 mday 1.8
|
476 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
477 {
|
478 mday 1.10 // clear the stoped bit and update
|
479 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
480 mday 1.10 _make_response(req, async_results::OK);
481 // now tell the meta dispatcher we are stopped
482 update_service(_capabilities, _mask);
483
|
484 mday 1.1 }
485 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
486 {
|
487 mday 1.10 // set the stopeed bit and update
488 _capabilities |= module_capabilities::stopped;
489 _make_response(req, async_results::CIM_STOPPED);
490 // now tell the meta dispatcher we are stopped
491 update_service(_capabilities, _mask);
492
|
493 mday 1.1 }
494 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
495 {
|
496 mday 1.10 // set the paused bit and update
|
497 mday 1.13 _capabilities |= module_capabilities::paused;
|
498 mday 1.11 update_service(_capabilities, _mask);
|
499 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
500 // now tell the meta dispatcher we are stopped
|
501 mday 1.1 }
502 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
503 {
|
504 mday 1.10 // clear the paused bit and update
|
505 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
506 mday 1.11 update_service(_capabilities, _mask);
|
507 mday 1.10 _make_response(req, async_results::OK);
508 // now tell the meta dispatcher we are stopped
|
509 mday 1.1 }
510
511 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
512 {
513 _make_response(req, async_results::CIM_NAK);
514 }
515
516 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
517 {
|
518 mday 1.14 ;
519 }
520
|
521 mday 1.10
|
522 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
523 {
524 // remove the legacy message from the request and enqueue it to its destination
525 Uint32 result = async_results::CIM_NAK;
526
|
527 mday 1.25 Message *legacy = req->_act;
|
528 mday 1.14 if ( legacy != 0 )
529 {
|
530 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
531 mday 1.14 if( queue != 0 )
532 {
|
533 mday 1.25 if(queue->isAsync() == true )
534 {
535 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
536 }
537 else
538 {
539 // Enqueue the response:
540 queue->enqueue(req->get_action());
541 }
542
|
543 mday 1.14 result = async_results::OK;
544 }
545 }
546 _make_response(req, result);
547 }
548
549 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
550 {
551 ;
|
552 mday 1.1 }
553
554 AsyncOpNode *MessageQueueService::get_op(void)
555 {
|
556 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
557 mday 1.1
|
558 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
559 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
560 mday 1.4
|
561 mday 1.1 return op;
562 }
563
564 void MessageQueueService::return_op(AsyncOpNode *op)
565 {
566 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
567 mday 1.4 delete op;
|
568 mday 1.1 }
569
|
570 mday 1.18
|
571 mday 1.28.2.1 void MessageQueueService::ReplyAsync(AsyncOpNode *op,
572 Uint32 destination)
573 {
574 PEGASUS_ASSERT( op->_flags & ASYNC_OPFLAGS_CALLBACK );
575
576
577 // get the queue handle for the destination
578 if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
579 {
580 delete op;
581 }
582
583 op->_response.next(0)->dest = destination;
584
585
586 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
587 op->_state |= ASYNC_OPSTATE_COMPLETE;
588
589 if ( false == _meta_dispatcher->route_async(op) )
590 delete op;
591 return;
592 mday 1.28.2.1 }
593
594
|
595 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
596 Uint32 destination,
|
597 mday 1.18 void (*callback)(AsyncOpNode *,
|
598 mday 1.21 MessageQueue *,
|
599 mday 1.28.2.1 void *),
600 void * parm)
|
601 mday 1.20 {
|
602 mday 1.28.2.1 PEGASUS_ASSERT( callback != 0 && op != 0 );
603
604 op->_callback_ptr = parm;
|
605 mday 1.18
|
606 mday 1.21 // get the queue handle for the destination
607 if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
608 return false;
|
609 mday 1.28.2.1 op->_request.next(0)->dest = destination;
610
|
611 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
612 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
613 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
614 mday 1.21
615 return _meta_dispatcher->route_async(op);
|
616 mday 1.18 }
617
|
618 mday 1.28.2.1 void MessageQueueService::ForwardRequest(AsyncOpNode *op, MessageQueue *dest)
619 {
620
621 Message *msg = op->_request.next(0);
622 if ( msg == 0 )
623 {
624 delete op;
625 return;
626 }
627 msg->_async = 0;
628 msg->dest = dest->getQueueId();
629 if(msg->getMask() & message_mask::ha_async)
630 (static_cast<AsyncMessage *>(msg))->op = 0;
631
632 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
633 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
634 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
635
636 op->_op_dest = dest;
637
638 // now see if the meta dispatcher will take it
639 mday 1.28.2.1 if( false == _meta_dispatcher->route_async(op))
640 delete op;
641
642 return;
643 }
644
645
646 void MessageQueueService::ForwardResponse(AsyncOpNode *op, MessageQueue *dest)
647 {
648
649 Message *msg = op->_response.next(0);
650 if ( msg == 0 || dest == 0 )
651 {
652 delete op;
653 return;
654 }
655 msg->_async = 0;
656 msg->dest = dest->getQueueId();
657 if(msg->getMask() & message_mask::ha_async)
658 (static_cast<AsyncMessage *>(msg))->op = 0;
659
660 mday 1.28.2.1 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
661 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
662 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
663
664 op->_op_dest = dest;
665
666 // now see if the meta dispatcher will take it
667 if( false == _meta_dispatcher->route_async(op))
668 delete op;
669
670 return;
671 }
|
672 mday 1.18
673 Boolean MessageQueueService::SendForget(Message *msg)
674 {
675
|
676 mday 1.24
|
677 mday 1.18 AsyncOpNode *op = 0;
|
678 mday 1.22 Uint32 mask = msg->getMask();
679
680 if (mask & message_mask::ha_async)
|
681 mday 1.18 {
682 op = (static_cast<AsyncMessage *>(msg))->op ;
683 }
|
684 mday 1.22
|
685 mday 1.18 if( op == 0 )
|
686 mday 1.20 {
|
687 mday 1.18 op = get_op();
|
688 mday 1.20 op->_request.insert_first(msg);
|
689 mday 1.22 if (mask & message_mask::ha_async)
690 (static_cast<AsyncMessage *>(msg))->op = op;
|
691 mday 1.28.2.1 else
692 msg->_async = 0;
|
693 mday 1.20 }
|
694 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
695 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
696 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
697 mday 1.18
|
698 mday 1.21 // get the queue handle for the destination
699 if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest)))
|
700 mday 1.24 {
|
701 mday 1.21 return false;
|
702 mday 1.24 }
703
|
704 mday 1.18 // now see if the meta dispatcher will take it
|
705 mday 1.24 Boolean return_code = _meta_dispatcher->route_async(op);
706 return return_code;
707
708
|
709 mday 1.18 }
|
710 mday 1.2
|
711 mday 1.1
|
712 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
713 mday 1.1 {
|
714 mday 1.4 if ( request == 0 )
715 return 0 ;
|
716 mday 1.5
717 Boolean destroy_op = false;
718
719 if (request->op == false)
720 {
721 request->op = get_op();
|
722 mday 1.7 request->op->_request.insert_first(request);
|
723 mday 1.5 destroy_op = true;
724 }
|
725 mday 1.4
726 request->block = true;
|
727 mday 1.22 request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
728 request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK;
|
729 mday 1.1
|
730 mday 1.21 // get the queue handle for the destination
731 if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest)))
732 return 0;
733
|
734 mday 1.2
|
735 mday 1.4 // now see if the meta dispatcher will take it
|
736 mday 1.2
|
737 mday 1.4 if (true == _meta_dispatcher->route_async(request->op))
|
738 mday 1.1 {
|
739 mday 1.4 request->op->_client_sem.wait();
|
740 mday 1.6 PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);
741
|
742 mday 1.1 }
|
743 mday 1.4
|
744 mday 1.6 request->op->lock();
745 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
746 rpl->op = 0;
747 request->op->unlock();
748
|
749 mday 1.5 if( destroy_op == true)
750 {
|
751 mday 1.6 request->op->lock();
752 request->op->_request.remove(request);
753 request->op->_state |= ASYNC_OPSTATE_RELEASED;
754 request->op->unlock();
755
756 return_op(request->op);
|
757 mday 1.7 request->op = 0;
|
758 mday 1.5 }
759
760 return rpl;
|
761 mday 1.1 }
762
763
764 Boolean MessageQueueService::register_service(String name,
765 Uint32 capabilities,
766 Uint32 mask)
767
768 {
769 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
770 mday 1.5 0,
|
771 mday 1.1 true,
772 name,
773 capabilities,
774 mask,
775 _queueId);
776 Boolean registered = false;
|
777 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
778 mday 1.1
|
779 mday 1.2 if ( reply != 0 )
|
780 mday 1.1 {
781 if(reply->getMask() & message_mask:: ha_async)
782 {
783 if(reply->getMask() & message_mask::ha_reply)
784 {
|
785 mday 1.15 if(reply->result == async_results::OK ||
786 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
787 mday 1.1 registered = true;
788 }
789 }
790
|
791 mday 1.7 delete reply;
|
792 mday 1.1 }
|
793 mday 1.5 delete msg;
|
794 mday 1.1 return registered;
795 }
796
797 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
798 {
799
800
801 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
802 mday 1.5 0,
|
803 mday 1.1 true,
804 _queueId,
805 _capabilities,
806 _mask);
807 Boolean registered = false;
|
808 mday 1.2
809 AsyncMessage *reply = SendWait(msg);
810 if (reply)
|
811 mday 1.1 {
812 if(reply->getMask() & message_mask:: ha_async)
813 {
814 if(reply->getMask() & message_mask::ha_reply)
815 {
|
816 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
817 mday 1.1 registered = true;
818 }
819 }
820 delete reply;
821 }
|
822 mday 1.5 delete msg;
|
823 mday 1.1 return registered;
824 }
825
826
827 Boolean MessageQueueService::deregister_service(void)
828 {
|
829 mday 1.3
|
830 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
831 return true;
|
832 mday 1.1 }
833
834
835 void MessageQueueService::find_services(String name,
836 Uint32 capabilities,
837 Uint32 mask,
838 Array<Uint32> *results)
839 {
840
841 if( results == 0 )
842 throw NullPointer();
|
843 mday 1.5
|
844 mday 1.1 results->clear();
845
846 FindServiceQueue *req =
847 new FindServiceQueue(get_next_xid(),
|
848 mday 1.5 0,
|
849 mday 1.1 _queueId,
850 true,
851 name,
852 capabilities,
853 mask);
854
|
855 mday 1.2 AsyncMessage *reply = SendWait(req);
856 if(reply)
|
857 mday 1.1 {
858 if( reply->getMask() & message_mask::ha_async)
859 {
860 if(reply->getMask() & message_mask::ha_reply)
861 {
862 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
863 {
864 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
865 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
866 }
867 }
868 }
869 delete reply;
870 }
|
871 mday 1.5 delete req;
|
872 mday 1.1 return ;
873 }
874
875 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
876 {
877 if(result == 0)
878 throw NullPointer();
879
880 EnumerateService *req
881 = new EnumerateService(get_next_xid(),
|
882 mday 1.5 0,
|
883 mday 1.1 _queueId,
884 true,
885 queue);
886
|
887 mday 1.2 AsyncMessage *reply = SendWait(req);
|
888 mday 1.1
|
889 mday 1.2 if (reply)
|
890 mday 1.1 {
891 Boolean found = false;
892
893 if( reply->getMask() & message_mask::ha_async)
894 {
895 if(reply->getMask() & message_mask::ha_reply)
896 {
897 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
898 {
899 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
900 {
901 if( found == false)
902 {
903 found = true;
904
905 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
906 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
907 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
908 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
909 }
910 }
911 mday 1.1 }
912 }
913 }
914 delete reply;
915 }
|
916 mday 1.5 delete req;
917
|
918 mday 1.1 return;
919 }
920
921 Uint32 MessageQueueService::get_next_xid(void)
922 {
923 _xid++;
924 return _xid.value();
925 }
926
927 PEGASUS_NAMESPACE_END
|