1 mday 1.1 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
2 <html>
3 <head>
4 <title>Pegasus Meta Dispatcher</title>
5 </head>
6
7 <body>
8 <h1>Pegasus Meta Dispatcher</h1>
9 <p>
10 The Pegasus Meta Dispatcher is a set of classes that extend the
11 existing MessageQueue messaging system to be dynamic,
12 asynchronous, and multithreaded. The primary classes consist of the
13 folowing:
14 </p>
15 <table border="1" bgcolor="#cdcdc1" cellpadding="3" width="80%">
16 <tr align="left"><th>Class</th><th>Derived from</th><th>Source file</th></tr>
17 <tr><td>cimom</td><td>MessageQueue</td><td>Pegasus/Common/Cimom.h</td></tr>
18 <tr><td>MessageQueueService</td><td>MessageQueue</td><td>Pegasus/Common/MessageQueueServices.h</td></tr>
19 <tr><td>CimomMessage</td><td>Message</td><td>Pegasus/Common/CimomMessage.h</td></tr>
20 <tr><td>AsyncOpNode</td><td>n/a</td><td>Pegasus/Common/AsyncOpNode.h</td></tr>
21 <tr><td>AsyncDQueue</td><td>unlocked_dq</td><td>Pegasus/Common/DQueue.h</td></tr>
22 mday 1.1 <tr><td>IPC classes</td><td>n/a</td><td>Pegasus/Common/IPC.h</td></tr>
23 <tr><td>Threading classes</td><td>n/a</td><td>Pegasus/Common/Thread.h</td></tr>
24 </table>
25 <br>
26 <br>
27
28 <h2>Purposes of Meta Dispatcher</h2>
29 <p>
30 The Meta Dispatcher has two primary goals:
31 <ol>
32 <li>Provide for orderly asynchronous message-based communication
33 among a dynamic set of Pegasus Services.</li>
34 <li>Preserve the existing message-passing architecture of
35 Pegasus.</li>
36 <li>Allow Pluggable Services such as repositories, provider
37 managers, and others.</li>
38 </ol>
39 <br>
40 <br>
41 Most of the purposes listed above revolve around maintaining the
42 integrity of data and control flow in an asynchronous
43 mday 1.1 multithreaded environment.
44 </p>
45
46 <h2>Terms</h2>
47 <p>
48 <dl>
49 <dt><b>Meta Dispatcher</b></dt><dd>The central message broker, or
50 router, that provides the asynchronous communications within
51 Pegasus. Derived from the <b>MessageQueue</b> class. <br>
52 </dd>
53 <dt><b>Service</b></dt><dd>A Pegasus module that sends and
54 receives messages to other modules through the meta
55 dispatcher; A module that has enhanced privileges within
56 Pegasus and which provides one or more functions necessary to
57 the operation of Pegasus. Derived from the
58 <b>MessageQueue</b>class.<dt>
59 <dt><b>Asynchronous Message</b></dt><dd>A <b>pair</b> of
60 messages, consisting of a <b>request</b> and a <b>response</b>
61 that are treated as a single operation by Services. An
62 asynchronous message may be fronted by a synchronous
63 programming interface. Derived from the <b>Message</b> class.</dd>
64 mday 1.1 <dt><b>AsyncOpNode</b></dt><dd>A control object that manages the
65 lifetime of an <b>Asynchronous Message</b>.The AsyncOpNode uses
66 many of the IPC object classes. A Service manages the lifetime
67 of an AsyncOpNode during the processing of the
68 message. However, it necessarily cedes control of the
69 AsyncOpNode to the <b>Meta Dispatcher</b> while the
70 Asynchronous Message is being processed.</dd>
71 </dl>
72
73 </p>
74
75 <h2>Meta Dispatcher Design</h2>
76 <p>
77 Three points are necessary to avoid deadlocks and to
78 provide pluggable services in Pegaus. The first thing is
79 <b>independent execution paths</b> of service modules. i.e.,
80 each service must have its own thread(s), which must not intersect
81 with the thread(s) of other services. Intersection of execution
82 paths can occur indirectly through IPC objects such as mutexes,
83 conditions, and semaphores.
84 </p>
85 mday 1.1 <p>
86 The second point that is necessary is <b>interface
87 abstraction</b>, which the Meta Dispatcher provides through
88 C++ polymorphism. This allows pluggable services. i.e., one
89 service can replace another and the system will continue to
90 function (hopefully in an improved manner).
91 </p>
92 <p>
93 The third point that is neccesary is a <b>central
94 message broker</b> that isolates services from each other,
95 thereby preventing deadlocks. The central message broker also
96 provides message responses for services that are paused,
97 stopped, or not present (plugged in).
98 </p>
99
100
101 <h3>Central Hub</h3>
102 <p>
103 The Meta Dispatcher therefore acts as a central message
104 hub. Services communicate with each other <i>via</i> the Meta
105 Dispatcher.
106 mday 1.1
107 <pre>
108 Service A--Message----1----> (block on semaphore)
109 |
110 |
111 Meta Dispatcher|
112 |
113 Message----2->Service B
114 |
115 |
116 (Signal Semaphore) <---Response---3-- +
117 |
118 Service A <--Response--4---+
119
120 </pre>
121
122 The numbered steps above are as follows:
123 <ol>
124 <li><b>Service A</b> creates a new <code>AsyncMessage</code> and
125 <code>AsyncOpNode</code>and sends that message to <b>Service
126 B</b> by calling
127 mday 1.1 <code>MessageQueueService::SendWait</code>. The calling thread
128 blocks on the <b>client emaphore</b> until the response is ready.</li><br>
129
130 <li>The Meta Dispatcher's routing thread picks up the message
131 and inserts it into <b>Service B's</b> incoming message
132 queue. The routing thread returns to the Meta Dispatcher to
133 route the next message in the system.</li><br>
134
135 <li><b>Service B's</b>incoming thread picks up the message and
136 calls the its message handler. Message handlers are virtual,
137 so a class derived from <b>MessageQueueService</b>can define
138 its own message handlers to override the default
139 handlers. When the message handler has constructed an
140 <b>AsyncReply</b> that reply gets linked to the
141 <b>AsyncOpNode</b>. The MessageQueueService then signals the
142 <b>client semaphore</b> within the op node.</li><br>
143
144 <li><b>Service A</b> awakens when the <b>client semaphore</b> is
145 signalled. It pulls the <b>AsyncResponse</b> message from the
146 <b>AsyncOpNode</b> and processes the result. Service A is
147 responsible for discarding the request, response, and
148 mday 1.1 AsyncOpNode objects. The existing classes have mechanisms for
149 caching these objects to avoid too frequent
150 construction/destruction of them.
151 </ol>
152 </p>
153
154 <h2>Test Program</h2>
155 <p>
156 The concepts explained below are all contained in the test
157 program for the Meta Dispatcher, which is located in
158 <code>$PEGASUS_HOME/src/Pegasus/Common/tests/MessageQueueService/</code>
159
160 </p>
161
162 <h2>Service Registration and Deregistration</h2>
163 <p>
164 Services (classes derived from
165 <code>MessageQueueService</code>must register their presence
166 with the Meta Dispatcher. This is done as follows (taken from
167 the test program):
168
169 mday 1.1 <ol>
170 <li><b>Define the Service Class</b></li>
171 <pre>
172 // Define our service class
173
174 class MessageQueueClient : public MessageQueueService
175 {
176
177 public:
178 typedef MessageQueueService Base;
179
180 MessageQueueClient(char *name)
181 : Base(name, MessageQueue::getNextQueueId(), 0,
182 message_mask::type_cimom |
183 message_mask::type_service |
184 message_mask::ha_request |
185 message_mask::ha_reply |
186 message_mask::ha_async ),
187 client_xid(1)
188 {
189 _client_capabilities = Base::_capabilities;
190 mday 1.1 _client_mask = Base::_mask;
191 }
192
193 virtual ~MessageQueueClient(void)
194 {
195 }
196
197 // method to indicate acceptance of message to
198 // Meta Dispatcher
199 virtual Boolean messageOK(const Message *msg);
200
201 // function to send a request to another service
202 void send_test_request(char *greeting, Uint32 qid);
203 Uint32 get_qid(void);
204
205 Uint32 _client_capabilities;
206 Uint32 _client_mask;
207
208 // method to receive messages from the Meta Dispatcher,
209 // MUST be defined
210 virtual void _handle_async_request(AsyncRequest *req);
211 mday 1.1
212 AtomicInt client_xid;
213 };
214
215 </pre>
216 <li><b>Construct the Service</b></li>
217 <pre>
218 // Create our Service
219 MessageQueueClient *q_client =
220 new MessageQueueClient("test client");
221
222 </pre>
223 <li><b>Register the Service</b></li>
224 <pre>
225 // Register our service with the Meta Dispatcher
226 q_client->register_service("test client",
227 q_client->_client_capabilities,
228 q_client->_client_mask);
229 cout << " client registered " << endl;
230 </pre>
231
232 mday 1.1 </ol>
233
234 The example above hides many of the details which are handled by
235 the MessageQueueService's constructor, such as creating the
236 background thread, finding the Meta Dispatcher, and constructing
237 the queues. But a derived class as the example shows does not
238 need to worry about those details.
239 </p>
240
241 <h2>Finding Other Services</h2>
242 <p>
243 The MessageQueueService class has an api for finding other
244 services. This api is built using messages that are defined in
245 <code>CimomMessage.h</code>. Here is an example from the test
246 program:
247
248 <pre>
249
|
259 mday 1.1
260
261 </pre>
262
263 The code sample above shows how to find services by their
264 name. The api also allows finding services by their capabilities
265 or the messages they support. Note that the return is an array
266 of Queue IDs. It is possible, for example, to find multiple
267 services.
268 </p>
269
270 <h2>Sending an Asynchronous Message to Another Service</h2>
271 <p>
272 The "handle" for a services is its Queue ID. Once you have the
273 Queue ID you can send a message to that service. The example
274 above shows one way to get a service's Queue ID. Here is an
275 example that shows how to send that service a message.
276
277 <ol>
278 <li><b>Define the Request and Response Message Pair by Inheriting from AsyncMessage.</b></li>
279 <pre>
280 mday 1.1
281 class test_request : public AsyncRequest
282 {
283
284 public:
285 typedef AsyncRequest Base;
286
287 test_request(Uint32 routing,
288 AsyncOpNode *op,
289 Uint32 destination,
290 Uint32 response,
291 char *message)
292 : Base(0x04100000,
293 Message::getNextKey(),
294 routing,
295 0,
296 op,
297 destination,
298 response,
299 true),
300 greeting(message)
301 mday 1.1 {
302
303 }
304
305 virtual ~test_request(void)
306 {
307
308
309 }
310
311 String greeting;
312 };
313
314
315 class test_response : public AsyncReply
316 {
317 public:
318 typedef AsyncReply Base;
319
320
321 test_response(Uint32 key,
322 mday 1.1 Uint32 routing,
323 AsyncOpNode *op,
324 Uint32 result,
325 Uint32 destination,
326 char *message)
327 : Base(0x04200000,
328 key,
329 routing,
330 0,
331 op,
332 result,
333 destination,
334 true),
335 greeting(message)
336 {
337
338 }
339
340 virtual ~test_response(void)
341 {
342
343 mday 1.1 }
344
345 String greeting;
346 };
347
348 </pre>
349
350 The function <code>send_test_request</code> shows everything
351 that is necessary to send a message to another service and
352 process the reply.
353
354 <pre>
355
356 void MessageQueueClient::send_test_request(char *greeting, Uint32 qid)
357 {
358
359 </pre>
360 <li><b>Construct the Request</b></li>
361
362 <pre>
363 test_request *req =
364 mday 1.1 new test_request(Base::get_next_xid(),
365 0,
366 qid, // destination queue ID
367 _queueId, // my own queue ID
368 greeting); // message parameter
369
370 </pre>
371
372 <li><b>Send the message using <code>MessageQueueService::SendWait</code></b></li>
373
374 <pre>
375 AsyncMessage *response = SendWait(req);
376
377 </pre>
378
379 <li><b>Process the Response.</b></i>
380 <pre>
381 if( response != 0 )
382 {
383 msg_count++;
384 delete response;
385 mday 1.1 cout << " test message " << msg_count.value() << endl;
386
387 }
388 delete req;
389 }
390
391 </pre>
392
393 <li><b>Delete the Request and the Response. The
394 <code>SendWait</code> interface creates and disposes of
395 everything else.</b></li>
396
397 </ol>
398 </p>
399
400
401 <h2>Handling an Incoming Message </h2>
402
403 <p>
404 To handle messages the service needs to implement the
405 following methods.
406 mday 1.1
407 <ol>
408
409 <li><b><code>virtual Boolean MessageOK(const Message
410 *)</code></b></li>
411
412 This method allows the Service to accept or reject the
413 message. The Meta Dispatcher will always call this method
414 before inserting the request on the Service's queue.
415
416 <pre>
417
418 Boolean MessageQueueServer::messageOK(const Message *msg)
419 {
420 if(msg->getMask() & message_mask::ha_async)
421 {
422 if( msg->getType() == 0x04100000 ||
423 msg->getType() == async_messages::CIMSERVICE_STOP ||
424 msg->getType() == async_messages::CIMSERVICE_PAUSE ||
425 msg->getType() == async_messages::CIMSERVICE_RESUME )
426 return true;
427 mday 1.1 }
428 return false;
429 }
430
431 </pre>
432
433 <li><b><code>virtual Boolean accept_async(AsyncOpNode
434 *operation)</code> (optional) </b></li>
435
436 This method executes on the Meta Dispatcher's thread and links
437 the incoming message to the Service's queue. <br><br>
438
439
440
441 <li><b><code>virtual void _handle_incoming_operation(AsyncOpNode
442 *)</code></b></li><br>
443
444
445 This method is called by the Service's background thread. Here is an
446 example implementation that just does some sanity checking on
447 the message.
448 mday 1.1
449 <pre>
450
451 void MessageQueueServer::_handle_incoming_operation(AsyncOpNode *op)
452 {
453 if ( operation != 0 )
454 {
455 Message *rq = operation->get_request();
456 PEGASUS_ASSERT(rq != 0 );
457 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
458 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
459 _handle_async_request(static_cast<AsyncRequest *>(rq));
460 }
461
462 return;
463
464 }
465
466
467 </pre>
468
469 mday 1.1 <li><b><code>virtual void _handle_async_request(AsyncRequest *)</code></b></li><br>
470 <br>
471
472 This method handles the request. The Service must implement
473 this method. <b>If the Service does not handle the Request it
474 must pass the Request to the Base class by calling <code>Base::_handle_async_request(req)</code></b>
475
476 <pre>
477 void MessageQueueServer::_handle_async_request(AsyncRequest *req)
478 {
479 if (req->getType() == 0x04100000 )
480 {
481 req->op->processing();
482 handle_test_request(req); // Message Handler
483 }
484 else if ( req->getType() == async_messages::CIMSERVICE_STOP )
485 {
486 req->op->processing();
487 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
488 }
489
490 mday 1.1 else
491 Base::_handle_async_request(req); // Give it to the Base !!
492 }
493
494 </pre>
495
496 <li><b>Specific Message Handlers</b>
497
498 Each Message handler will be defined by the format of the
499 Request/Response pair. Here is an example from the test
500 program.
501
502 <pre>
503
504 if( msg->getType() == 0x04100000 )
505 {
506
507 </pre>
508 <ol>
509 <li><b>Construct the Reply</b></li>
510 <pre>
511 mday 1.1
512 test_response *resp =
513 new test_response(msg->getKey(),
514 msg->getRouting(),
515 msg->op,
516 async_results::OK,
517 msg->dest,
518 "i am a test response");
519
520
521 </pre>
522 <li><b>Complete the Reply</b> by calling the following
523 helper routine in the Base class</li>
524
525 <pre>
526 _completeAsyncResponse(msg, resp, ASYNC_OPSTATE_COMPLETE, 0);
527
528 }
529 </pre>
530
531 </ol>
532 mday 1.1 </p>
533
|
534 mday 1.2 <h2>Handling CIMMessage and Other Pre-existing Message Classes</h2>
535 <p>
536 Existing Messages, including all of the <code>CIMMessage</code>
537 derivitives, are not configured to be asynchronous
538 request/reply pairs. They are designed to travel through
539 Pegasus as events that trigger other processing events,
540 which is the end of their lifetime. This is not an optimal
541 use model for asynchronous operation because the
542 originator of the event does not require nor receive any
543 completion notification. Further, there is not a
544 one-to-one correspondence of "event messages" to replies.
545 </p>
546
547 <h3>AsyncLegacyOperationStart Message</h3>
548 <p>
549 The AsyncLegacyOperationStart message is an envelope that
550 allows a <code>MessageQueueService</code>-based service to
551 send, receive, and process pre-existing "legacy"
552 messages.
553 </p>
554 <p>
555 mday 1.2 The <code>AsyncLegacyOperationStart</code> Message allows
556 an asynchronous service to create, package, and send a
557 "legacy" message to another service or, indirectly,
558 enqueue it to a non-asynchronous message queue. The code
559 example below shows how this works:
560 </p>
561
562 <pre>
563
564 cout << " sending LEGACY to test server" << endl;
565
566 Message *legacy = new Message(0x11100011,
567 Message::getNextKey());
568
569 AsyncLegacyOperationStart *req =
570 new AsyncLegacyOperationStart(q_client->get_next_xid(),
571 0,
572 services[0],
573 legacy,
574 q_client->getQueueId());
575 reply = q_client->SendWait(req);
576 mday 1.2 delete req;
577 delete reply;
578
579 </pre>
580 <p>
581 The code sample above shows a <code>Message</code> object
582 being embedded inside an
583 <code>AsyncLegacyOperationStart</code> message and sent
584 using the <code>SendWait</code>API.
585 </p>
586
587 <h3>Default Handler for Legacy Messages</h3>
588 <p>
589 The <code>MessageQueueService</code> class has a default
590 handler for legacy messages that extracts the
591 <code>Message</code> out of its asynchronous "envelope"
592 and dispatches it using the pre-existing synchronous
593 interface, as shown below.
594 </p>
595
596 <pre>
597 mday 1.2
598 void MessageQueueService::handle_AsyncLegacyOperationStart(
599 AsyncLegacyOperationStart *req)
600 {
601 // remove the legacy message from the request and enqueue it to its destination
602 Uint32 result = async_results::CIM_NAK;
603
604 Message *legacy = req->act;
605 if ( legacy != 0 )
606 {
607 MessageQueue* queue = MessageQueue::lookup(req->legacy_destination);
608 if( queue != 0 )
609 {
610 // Enqueue the response:
611 queue->enqueue(legacy);
612 result = async_results::OK;
613 }
614 }
615 _make_response(req, result);
616 }
617
618 mday 1.2 </pre>
619
620 <p>
621 The default handler shown above extracts the legacy
622 message and attempts to <code>enqueue</code> that message
623 syncrhonously using the pre-existing interface.
624 </p>
625
626 <h3>Example of Custom Handler for Legacy Messages</h3>
627 <p>
628 By implementing the virtual
629 <code>_handle_async_request</code> method,
630 a service can choose to implement its own handler for
631 Legacy messages, as the code below shows:
632 </p>
633
634 <ol>
635
636 <li><b>Implement the virtual <code>_handle_async_request</code> method.</b></li>
637 <pre>
638
639 mday 1.2 void MessageQueueServer::_handle_async_request(AsyncRequest *req)
640 {
641 if (req->getType() == 0x04100000 )
642 {
643 req->op->processing();
644 handle_test_request(req);
645 }
646 else if ( req->getType() == async_messages::CIMSERVICE_STOP )
647 {
648 req->op->processing();
649 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
650 }
651 </pre>
652 <li><b>Implement a dispatcher for <code>ASYNC_LEGACY_OP_START</code></b></li>
653 <pre>
654 else if ( req->getType() == async_messages::ASYNC_LEGACY_OP_START )
655 {
656 req->op->processing();
657 handle_LegacyOpStart(static_cast<AsyncLegacyOperationStart *>(req));
658 }
659
660 mday 1.2 else
661 Base::_handle_async_request(req);
662 }
663
664 </pre>
665 <li><b>Implement a dispatcher for <code>ASYNC_LEGACY_OP_START</code></b></li>
666 <pre>
667
668 void MessageQueueServer::handle_LegacyOpStart(AsyncLegacyOperationStart *req)
669 {
670
671 Message *legacy = req->act;
672 cout << " ### handling legacy messages " << endl;
673
674
675 AsyncReply *resp =
676 new AsyncReply(async_messages::REPLY,
677 req->getKey(),
678 req->getRouting(),
679 0,
680 req->op,
681 mday 1.2 async_results::OK,
682 req->resp,
683 req->block);
684 _completeAsyncResponse(req, resp, ASYNC_OPSTATE_COMPLETE, 0 );
685
686 if (legacy != 0 )
687 cout << " legacy msg type: " << legacy->getType() << endl;
688
689 }
690
691 </pre>
692
693
694 </ol>
|
695 mday 1.1
696 <hr>
697 <h2>Class Definitions</h2>
698
699 <h3>cimom (Meta Dispatcher)</h3>
700 <pre>
701 class PEGASUS_COMMON_LINKAGE cimom : public MessageQueue
702 {
703 public :
704 cimom(void);
705
706 virtual ~cimom(void) ;
707
708 Boolean moduleChange(struct timeval last);
709
710 Uint32 getModuleCount(void);
711 Uint32 getModuleIDs(Uint32 *ids, Uint32 count) throw(IPCException);
712
713 AsyncOpNode *get_cached_op(void) throw(IPCException);
714 void cache_op(AsyncOpNode *op) throw(IPCException);
715
716 mday 1.1 void set_default_op_timeout(const struct timeval *buffer);
717 void get_default_op_timeout(struct timeval *timeout) const ;
718
719 virtual void handleEnqueue();
720 void register_module(RegisterCimService *msg);
721 void deregister_module(Uint32 quid);
722 void update_module(UpdateCimService *msg );
723 void ioctl(AsyncIoctl *msg );
724
725 void find_service_q(FindServiceQueue *msg );
726 void enumerate_service(EnumerateService *msg );
727 Boolean route_async(AsyncOpNode *operation);
728 void _shutdown_routed_queue(void);
729
730
731 protected:
732 Uint32 get_module_q(const String & name);
733 void _make_response(AsyncRequest *req, Uint32 code);
734 void _completeAsyncResponse(AsyncRequest *request,
735 AsyncReply *reply,
736 Uint32 state,
737 mday 1.1 Uint32 flag);
738 private:
739 struct timeval _default_op_timeout;
740 struct timeval _last_module_change;
741 DQueue<message_module> _modules;
742
743 DQueue<AsyncOpNode> _recycle;
744
745 AsyncDQueue<AsyncOpNode> _routed_ops;
746 DQueue<AsyncOpNode> _internal_ops;
747
748 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _routing_proc(void *);
749
750 Thread _routing_thread;
751
752 static Uint32 get_xid(void);
753 void _handle_cimom_op(AsyncOpNode *op, Thread *thread, MessageQueue *queue);
754 Uint32 _ioctl(Uint32, Uint32, void *);
755
756
757 AtomicInt _die;
758 mday 1.1 AtomicInt _routed_queue_shutdown;
759
760 static AtomicInt _xid;
761
762 // CIMOperationRequestDispatcher *_cim_dispatcher;
763 // CIMOperationResponseEncoder *_cim_encoder;
764 // CIMOperationRequestDecoder *_cim_decoder;
765 // CIMRepository *_repository;
766
767 };
768
769 </pre>
770
771
772 <h3>MessageQueueService</h3>
773
774 <pre>
775
776 class message_module;
777
778 class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
779 mday 1.1 {
780 public:
781
782 typedef MessageQueue Base;
783
784 MessageQueueService(const char *name, Uint32 queueID, Uint32 capabilities, Uint32 mask) ;
785
786 virtual ~MessageQueueService(void);
787
788 virtual void handle_heartbeat_request(AsyncRequest *req);
789 virtual void handle_heartbeat_reply(AsyncReply *rep);
790
791 virtual void handle_AsyncIoctl(AsyncIoctl *req);
792 virtual void handle_CimServiceStart(CimServiceStart *req);
793 virtual void handle_CimServiceStop(CimServiceStop *req);
794 virtual void handle_CimServicePause(CimServicePause *req);
795 virtual void handle_CimServiceResume(CimServiceResume *req);
796
797 virtual void handle_AsyncOperationStart(AsyncOperationStart *req);
798 virtual void handle_AsyncOperationResult(AsyncOperationResult *req);
799 virtual Boolean accept_async(AsyncOpNode *op);
800 mday 1.1 virtual Boolean messageOK(const Message *msg) ;
801
802 AsyncReply *SendWait(AsyncRequest *request);
803
804 void _completeAsyncResponse(AsyncRequest *request,
805 AsyncReply *reply,
806 Uint32 state,
807 Uint32 flag);
808 Boolean register_service(String name, Uint32 capabilities, Uint32 mask);
809 Boolean update_service(Uint32 capabilities, Uint32 mask);
810 Boolean deregister_service(void);
811 virtual void _shutdown_incoming_queue(void);
812 void find_services(String name,
813 Uint32 capabilities,
814 Uint32 mask,
815 Array<Uint32> *results);
816 void enumerate_service(Uint32 queue, message_module *result);
817 Uint32 get_next_xid(void);
818 AsyncOpNode *get_op(void);
819 void return_op(AsyncOpNode *op);
820 Uint32 _capabilities;
821 mday 1.1 Uint32 _mask;
822 AtomicInt _die;
823 protected:
824
825 virtual void _handle_incoming_operation(AsyncOpNode *operation, Thread *thread, MessageQueue *queue);
826 virtual void _handle_async_request(AsyncRequest *req);
827 virtual void _make_response(AsyncRequest *req, Uint32 code);
828 cimom *_meta_dispatcher;
829
830 private:
831 void handleEnqueue();
832 DQueue<AsyncOpNode> _pending;
833 AsyncDQueue<AsyncOpNode> _incoming;
834
835 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _req_proc(void *);
836 AtomicInt _incoming_queue_shutdown;
837
838 Thread _req_thread;
839
840 struct timeval _default_op_timeout;
841
842 mday 1.1 static AtomicInt _xid;
843
844 };
845
846
847 </pre>
848
849 <h3>Asynchronous Messages</h3>
850
851 <pre>
852
853 extern const Uint32 CIMOM_Q_ID;
854
855 class AsyncOpNode;
856
857 class PEGASUS_COMMON_LINKAGE async_results
858 {
859 public:
860 static const Uint32 OK;
861 static const Uint32 PARAMETER_ERROR;
862 static const Uint32 MODULE_ALREADY_REGISTERED;
863 mday 1.1 static const Uint32 MODULE_NOT_FOUND;
864 static const Uint32 INTERNAL_ERROR;
865
866 static const Uint32 ASYNC_STARTED;
867 static const Uint32 ASYNC_PROCESSING;
868 static const Uint32 ASYNC_COMPLETE;
869 static const Uint32 ASYNC_CANCELLED;
870 static const Uint32 ASYNC_PAUSED;
871 static const Uint32 ASYNC_RESUMED;
872
873 static const Uint32 CIM_SERVICE_STARTED;
874 static const Uint32 CIM_SERVICE_STOPPED;
875 static const Uint32 CIM_SERVICE_PAUSED;
876
877 static const Uint32 CIM_SERVICE_RESUMED;
878 static const Uint32 CIM_NAK;
879
880 static const Uint32 ASYNC_PHASE_COMPLETE;
881 static const Uint32 ASYNC_CHILD_COMPLETE;
882 static const Uint32 ASYNC_PHASE_STARTED;
883 static const Uint32 ASYNC_CHILD_STARTED;
884 mday 1.1 static const Uint32 CIM_PAUSED;
885 static const Uint32 CIM_STOPPED;
886
887 };
888
889
890 class PEGASUS_COMMON_LINKAGE async_messages
891 {
892 public:
893 static const Uint32 HEARTBEAT;
894 static const Uint32 REPLY;
895 static const Uint32 REGISTER_CIM_SERVICE;
896 static const Uint32 DEREGISTER_CIM_SERVICE;
897 static const Uint32 UPDATE_CIM_SERVICE;
898 static const Uint32 IOCTL;
899 static const Uint32 CIMSERVICE_START;
900 static const Uint32 CIMSERVICE_STOP;
901 static const Uint32 CIMSERVICE_PAUSE;
902 static const Uint32 CIMSERVICE_RESUME;
903
904 static const Uint32 ASYNC_OP_START;
905 mday 1.1 static const Uint32 ASYNC_OP_RESULT;
906 static const Uint32 ASYNC_LEGACY_OP_START;
907 static const Uint32 ASYNC_LEGACY_OP_RESULT;
908
909 static const Uint32 FIND_SERVICE_Q;
910 static const Uint32 FIND_SERVICE_Q_RESULT;
911 static const Uint32 ENUMERATE_SERVICE;
912 static const Uint32 ENUMERATE_SERVICE_RESULT;
913 };
914
915
916 class PEGASUS_COMMON_LINKAGE AsyncMessage : public Message
917 {
918 public:
919 AsyncMessage(Uint32 type,
920 Uint32 key,
921 Uint32 routing,
922 Uint32 mask,
923 AsyncOpNode *operation);
924
925 virtual ~AsyncMessage(void)
926 mday 1.1 {
927
928 }
929
930 Boolean operator ==(void *key);
931 Boolean operator ==(const AsyncMessage& msg);
932
933 AsyncOpNode *op;
934 Thread *_myself;
935 MessageQueue *_service;
936 };
937
938
939 inline Boolean AsyncMessage::operator ==(void *key)
940 {
941 if( key == reinterpret_cast<void *>(this))
942 return true;
943 return false;
944 }
945
946 inline Boolean AsyncMessage::operator ==(const AsyncMessage& msg)
947 mday 1.1 {
948 return this->operator==(reinterpret_cast<void *>(const_cast<AsyncMessage *>(&msg)));
949 }
950
951
952 class PEGASUS_COMMON_LINKAGE AsyncRequest : public AsyncMessage
953 {
954 public:
955 AsyncRequest(Uint32 type,
956 Uint32 key,
957 Uint32 routing,
958 Uint32 mask,
959 AsyncOpNode *operation,
960 Uint32 destination,
961 Uint32 response,
962 Boolean blocking);
963
964
965 virtual ~AsyncRequest(void)
966 {
967
968 mday 1.1 }
969
970 Uint32 dest;
971 Uint32 resp;
972 Boolean block;
973 };
974
975 class PEGASUS_COMMON_LINKAGE AsyncReply : public AsyncMessage
976 {
977 public:
978 AsyncReply(Uint32 type,
979 Uint32 key,
980 Uint32 routing,
981 Uint32 mask,
982 AsyncOpNode *operation,
983 Uint32 result_code,
984 Uint32 destination,
985 Boolean blocking);
986
987
988 virtual ~AsyncReply(void)
989 mday 1.1 {
990 if(op != 0 )
991 delete op;
992
993 }
994
995 Uint32 result;
996 Uint32 dest;
997 Boolean block;
998 };
999
1000
1001
1002 class PEGASUS_COMMON_LINKAGE RegisterCimService : public AsyncRequest
1003 {
1004 public:
1005 RegisterCimService(Uint32 routing,
1006 AsyncOpNode *operation,
1007 Boolean blocking,
1008 String service_name,
1009 Uint32 service_capabilities,
1010 mday 1.1 Uint32 service_mask,
1011 Uint32 service_queue);
1012
1013 virtual ~RegisterCimService(void)
1014 {
1015
1016 }
1017
1018 String name;
1019 Uint32 capabilities;
1020 Uint32 mask;
1021 Uint32 queue;
1022 };
1023
1024 class PEGASUS_COMMON_LINKAGE DeRegisterCimService : public AsyncRequest
1025 {
1026 public:
1027 DeRegisterCimService(Uint32 routing,
1028 AsyncOpNode *operation,
1029 Boolean blocking,
1030 Uint32 service_queue);
1031 mday 1.1
1032
1033 virtual ~DeRegisterCimService(void)
1034 {
1035
1036 }
1037
1038 Uint32 queue;
1039 } ;
1040
1041 class PEGASUS_COMMON_LINKAGE UpdateCimService : public AsyncRequest
1042 {
1043 public:
1044 UpdateCimService(Uint32 routing,
1045 AsyncOpNode *operation,
1046 Boolean blocking,
1047 Uint32 service_queue,
1048 Uint32 service_capabilities,
1049 Uint32 service_mask);
1050
1051 virtual ~UpdateCimService(void)
1052 mday 1.1 {
1053
1054 }
1055
1056 Uint32 queue;
1057 Uint32 capabilities;
1058 Uint32 mask;
1059 };
1060
1061
1062 class PEGASUS_COMMON_LINKAGE AsyncIoctl : public AsyncRequest
1063 {
1064 public:
1065 AsyncIoctl(Uint32 routing,
1066 AsyncOpNode *operation,
1067 Uint32 destination,
1068 Uint32 response,
1069 Boolean blocking,
1070 Uint32 code,
1071 Uint32 int_param,
1072 void *p_param);
1073 mday 1.1
1074 virtual ~AsyncIoctl(void)
1075 {
1076
1077 }
1078
1079 enum
1080 {
1081 IO_CLOSE,
1082 IO_OPEN,
1083 IO_SOURCE_QUENCH,
1084 IO_SERVICE_DEFINED
1085 };
1086
1087
1088
1089 Uint32 ctl;
1090 Uint32 intp;
1091 void *voidp;
1092
1093 };
1094 mday 1.1
1095 class PEGASUS_COMMON_LINKAGE CimServiceStart : public AsyncRequest
1096 {
1097 public:
1098 CimServiceStart(Uint32 routing,
1099 AsyncOpNode *operation,
1100 Uint32 destination,
1101 Uint32 response,
1102 Boolean blocking);
1103
1104 virtual ~CimServiceStart(void)
1105 {
1106
1107 }
1108 };
1109
1110
1111 class PEGASUS_COMMON_LINKAGE CimServiceStop : public AsyncRequest
1112 {
1113 public:
1114 CimServiceStop(Uint32 routing,
1115 mday 1.1 AsyncOpNode *operation,
1116 Uint32 destination,
1117 Uint32 response,
1118 Boolean blocking);
1119
1120 virtual ~CimServiceStop(void)
1121 {
1122
1123 }
1124 };
1125
1126 class PEGASUS_COMMON_LINKAGE CimServicePause : public AsyncRequest
1127 {
1128 public:
1129 CimServicePause(Uint32 routing,
1130 AsyncOpNode *operation,
1131 Uint32 destination,
1132 Uint32 response,
1133 Boolean blocking);
1134
1135
1136 mday 1.1 virtual ~CimServicePause(void)
1137 {
1138
1139 }
1140 };
1141
1142 class PEGASUS_COMMON_LINKAGE CimServiceResume : public AsyncRequest
1143 {
1144 public:
1145 CimServiceResume(Uint32 routing,
1146 AsyncOpNode *operation,
1147 Uint32 destination,
1148 Uint32 response,
1149 Boolean blocking);
1150
1151
1152 virtual ~CimServiceResume(void)
1153 {
1154
1155 }
1156 };
1157 mday 1.1
1158 class PEGASUS_COMMON_LINKAGE AsyncOperationStart : public AsyncRequest
1159 {
1160 public:
1161 AsyncOperationStart(Uint32 routing,
1162 AsyncOpNode *operation,
1163 Uint32 destination,
1164 Uint32 response,
1165 Boolean blocking,
1166 Message *action);
1167
1168
1169 virtual ~AsyncOperationStart(void)
1170 {
1171
1172 }
1173
1174 Message *act;
1175 };
1176
1177 class PEGASUS_COMMON_LINKAGE AsyncOperationResult : public AsyncReply
1178 mday 1.1 {
1179 public:
1180 AsyncOperationResult(Uint32 key,
1181 Uint32 routing,
1182 AsyncOpNode *operation,
1183 Uint32 result_code,
1184 Uint32 destination,
1185 Uint32 blocking);
1186
1187
1188 virtual ~AsyncOperationResult(void)
1189 {
1190
1191 }
1192 };
1193
1194
1195 class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationStart : public AsyncRequest
1196 {
1197 public:
1198 AsyncLegacyOperationStart(Uint32 routing,
1199 mday 1.1 AsyncOpNode *operation,
1200 Uint32 destination,
1201 Message *action);
1202
1203
1204 virtual ~AsyncLegacyOperationStart(void)
1205 {
1206
1207 }
1208
1209 Message *act;
1210 };
1211
1212 class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationResult : public AsyncReply
1213 {
1214 public:
1215 AsyncLegacyOperationResult(Uint32 key,
1216 Uint32 routing,
1217 AsyncOpNode *operation,
1218 Message *result);
1219
1220 mday 1.1 virtual ~AsyncLegacyOperationResult(void)
1221 {
1222
1223 }
1224
1225 Message *res;
1226 };
1227
1228
1229 class PEGASUS_COMMON_LINKAGE FindServiceQueue : public AsyncRequest
1230 {
1231 public:
1232 FindServiceQueue(Uint32 routing,
1233 AsyncOpNode *operation,
1234 Uint32 response,
1235 Boolean blocking,
1236 String service_name,
1237 Uint32 service_capabilities,
1238 Uint32 service_mask);
1239
1240 virtual ~FindServiceQueue(void)
1241 mday 1.1 {
1242
1243 }
1244
1245 String name;
1246 Uint32 capabilities;
1247 Uint32 mask;
1248 } ;
1249
1250 class PEGASUS_COMMON_LINKAGE FindServiceQueueResult : public AsyncReply
1251 {
1252 public:
1253 FindServiceQueueResult(Uint32 key,
1254 Uint32 routing,
1255 AsyncOpNode *operation,
1256 Uint32 result_code,
1257 Uint32 destination,
1258 Boolean blocking,
1259 Array<Uint32> queue_ids);
1260
1261
1262 mday 1.1 virtual ~FindServiceQueueResult(void)
1263 {
1264
1265 }
1266
1267 Array<Uint32> qids;
1268 } ;
1269
1270 class PEGASUS_COMMON_LINKAGE EnumerateService : public AsyncRequest
1271 {
1272 public:
1273 EnumerateService(Uint32 routing,
1274 AsyncOpNode *operation,
1275 Uint32 response,
1276 Boolean blocking,
1277 Uint32 queue_id);
1278
1279
1280 virtual ~EnumerateService(void)
1281 {
1282
1283 mday 1.1 }
1284
1285 Uint32 qid;
1286 };
1287
1288 class PEGASUS_COMMON_LINKAGE EnumerateServiceResponse : public AsyncReply
1289 {
1290 public:
1291 EnumerateServiceResponse(Uint32 key,
1292 Uint32 routing,
1293 AsyncOpNode *operation,
1294 Uint32 result_code,
1295 Uint32 response,
1296 Boolean blocking,
1297 String service_name,
1298 Uint32 service_capabilities,
1299 Uint32 service_mask,
1300 Uint32 service_qid);
1301
1302
1303 virtual ~EnumerateServiceResponse(void)
1304 mday 1.1 {
1305
1306 }
1307
1308 String name;
1309 Uint32 capabilities;
1310 Uint32 mask;
1311 Uint32 qid;
1312 };
1313
1314 </pre>
1315
1316 <h3>AsyncOPNode</h3>
1317
1318 <pre>
1319 #define ASYNC_OPFLAGS_UNKNOWN 0x00000000
1320 #define ASYNC_OPFLAGS_INTERVAL_REPEAT 0x00000010
1321 #define ASYNC_OPFLAGS_INDICATION 0x00000020
1322 #define ASYNC_OPFLAGS_REMOTE 0x00000040
1323 #define ASYNC_OPFLAGS_LOCAL_OUT_OF_PROC 0x00000080
1324 #define ASYNC_OPFLAGS_PHASED 0x00000001
1325 mday 1.1 #define ASYNC_OPFLAGS_PARTIAL 0x00000002
1326 #define ASYNC_OPFLAGS_NORMAL 0x00000000
1327 #define ASYNC_OPFLAGS_SINGLE 0x00000008
1328 #define ASYNC_OPFLAGS_MULTIPLE 0x00000010
1329 #define ASYNC_OPFLAGS_TOTAL 0x00000020
1330 #define ASYNC_OPFLAGS_META_DISPATCHER 0x00000040
1331
1332 #define ASYNC_OPSTATE_UNKNOWN 0x00000000
1333 #define ASYNC_OPSTATE_OFFERED 0x00000001
1334 #define ASYNC_OPSTATE_DECLINED 0x00000002
1335 #define ASYNC_OPSTATE_STARTED 0x00000004
1336 #define ASYNC_OPSTATE_PROCESSING 0x00000008
1337 #define ASYNC_OPSTATE_DELIVER 0x00000010
1338 #define ASYNC_OPSTATE_RESERVE 0x00000020
1339 #define ASYNC_OPSTATE_COMPLETE 0x00000040
1340 #define ASYNC_OPSTATE_TIMEOUT 0x00000080
1341 #define ASYNC_OPSTATE_CANCELLED 0x00000100
1342 #define ASYNC_OPSTATE_PAUSED 0x00000200
1343 #define ASYNC_OPSTATE_SUSPENDED 0x00000400
1344 #define ASYNC_OPSTATE_RESUMED 0x00000800
1345 #define ASYNC_OPSTATE_ORPHANED 0x00001000
1346 mday 1.1 #define ASYNC_OPSTATE_RELEASED 0x00002000
1347
1348 class Cimom;
1349
1350 class PEGASUS_COMMON_LINKAGE AsyncOpNode
1351 {
1352 public:
1353
1354 AsyncOpNode(void);
1355 ~AsyncOpNode(void);
1356
1357 Boolean operator == (const void *key) const;
1358 Boolean operator == (const AsyncOpNode & node) const;
1359
1360 void get_timeout_interval(struct timeval *buffer) ;
1361 void set_timeout_interval(const struct timeval *interval);
1362
1363 Boolean timeout(void) ;
1364
1365 OperationContext & get_context(void) ;
1366
1367 mday 1.1 void put_request(const Message *request) ;
1368 Message *get_request(void) ;
1369
1370 void put_response(const Message *response) ;
1371 Message *get_response(void) ;
1372
1373 Uint32 read_state(void) ;
1374 void write_state(Uint32) ;
1375
1376 Uint32 read_flags(void);
1377 void write_flags(Uint32);
1378
1379 void lock(void) throw(IPCException);
1380 void unlock(void) throw(IPCException);
1381 void udpate(void) throw(IPCException);
1382 void deliver(const Uint32 count) throw(IPCException);
1383 void reserve(const Uint32 size) throw(IPCException);
1384 void processing(void) throw(IPCException) ;
1385 void processing(OperationContext *context) throw(IPCException);
1386 void complete(void) throw(IPCException) ;
1387 void complete(OperationContext *context) throw(IPCException);
1388 mday 1.1 void release(void);
1389 void wait(void);
1390
1391
1392 private:
1393 Semaphore _client_sem;
1394 Mutex _mut;
1395 unlocked_dq<Message> _request;
1396 unlocked_dq<Message> _response;
1397
1398 OperationContext _operation_list;
1399 Uint32 _state;
1400 Uint32 _flags;
1401 Uint32 _offered_count;
1402 Uint32 _total_ops;
1403 Uint32 _completed_ops;
1404 Uint32 _user_data;
1405
1406 struct timeval _start;
1407 struct timeval _lifetime;
1408 struct timeval _updated;
1409 mday 1.1 struct timeval _timeout_interval;
1410
1411 AsyncOpNode *_parent;
1412 unlocked_dq<AsyncOpNode> _children;
1413
1414 void _reset(unlocked_dq<AsyncOpNode> *dst_q);
1415
1416 // the lifetime member is for cache management by the cimom
1417 void _set_lifetime(struct timeval *lifetime) ;
1418 Boolean _check_lifetime(void) ;
1419
1420 Boolean _is_child(void) ;
1421 Uint32 _is_parent(void) ;
1422
1423 Boolean _is_my_child(const AsyncOpNode & caller) const;
1424 void _make_orphan( AsyncOpNode & parent) ;
1425 void _adopt_child(AsyncOpNode *child) ;
1426 void _disown_child(AsyncOpNode *child) ;
1427 friend class cimom;
1428 friend class MessageQueueService;
1429
1430 mday 1.1 };
1431 </pre>
1432
1433 <hr>
1434 <address><a href="mailto:mdday@us.ibm.com">Michael Day</a></address>
1435 <!-- Created: Tue Feb 5 13:21:55 EST 2002 -->
1436 <!-- hhmts start -->
|