1 mday 1.1 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
2 <html>
3 <head>
4 <title>Pegasus Meta Dispatcher</title>
5 </head>
6
7 <body>
8 <h1>Pegasus Meta Dispatcher</h1>
9 <p>
10 The Pegasus Meta Dispatcher is a set of classes that extend the
11 existing MessageQueue messaging system to be dynamic,
12 asynchronous, and multithreaded. The primary classes consist of the
13 folowing:
14 </p>
15 <table border="1" bgcolor="#cdcdc1" cellpadding="3" width="80%">
16 <tr align="left"><th>Class</th><th>Derived from</th><th>Source file</th></tr>
17 <tr><td>cimom</td><td>MessageQueue</td><td>Pegasus/Common/Cimom.h</td></tr>
18 <tr><td>MessageQueueService</td><td>MessageQueue</td><td>Pegasus/Common/MessageQueueServices.h</td></tr>
19 <tr><td>CimomMessage</td><td>Message</td><td>Pegasus/Common/CimomMessage.h</td></tr>
20 <tr><td>AsyncOpNode</td><td>n/a</td><td>Pegasus/Common/AsyncOpNode.h</td></tr>
21 <tr><td>AsyncDQueue</td><td>unlocked_dq</td><td>Pegasus/Common/DQueue.h</td></tr>
22 mday 1.1 <tr><td>IPC classes</td><td>n/a</td><td>Pegasus/Common/IPC.h</td></tr>
23 <tr><td>Threading classes</td><td>n/a</td><td>Pegasus/Common/Thread.h</td></tr>
24 </table>
25 <br>
26 <br>
27
28 <h2>Purposes of Meta Dispatcher</h2>
29 <p>
30 The Meta Dispatcher has two primary goals:
31 <ol>
32 <li>Provide for orderly asynchronous message-based communication
33 among a dynamic set of Pegasus Services.</li>
34 <li>Preserve the existing message-passing architecture of
35 Pegasus.</li>
36 <li>Allow Pluggable Services such as repositories, provider
37 managers, and others.</li>
38 </ol>
39 <br>
40 <br>
41 Most of the purposes listed above revolve around maintaining the
42 integrity of data and control flow in an asynchronous
43 mday 1.1 multithreaded environment.
44 </p>
45
46 <h2>Terms</h2>
47 <p>
48 <dl>
49 <dt><b>Meta Dispatcher</b></dt><dd>The central message broker, or
50 router, that provides the asynchronous communications within
51 Pegasus. Derived from the <b>MessageQueue</b> class. <br>
52 </dd>
53 <dt><b>Service</b></dt><dd>A Pegasus module that sends and
54 receives messages to other modules through the meta
55 dispatcher; A module that has enhanced privileges within
56 Pegasus and which provides one or more functions necessary to
57 the operation of Pegasus. Derived from the
58 <b>MessageQueue</b>class.<dt>
59 <dt><b>Asynchronous Message</b></dt><dd>A <b>pair</b> of
60 messages, consisting of a <b>request</b> and a <b>response</b>
61 that are treated as a single operation by Services. An
62 asynchronous message may be fronted by a synchronous
63 programming interface. Derived from the <b>Message</b> class.</dd>
64 mday 1.1 <dt><b>AsyncOpNode</b></dt><dd>A control object that manages the
65 lifetime of an <b>Asynchronous Message</b>.The AsyncOpNode uses
66 many of the IPC object classes. A Service manages the lifetime
67 of an AsyncOpNode during the processing of the
68 message. However, it necessarily cedes control of the
69 AsyncOpNode to the <b>Meta Dispatcher</b> while the
70 Asynchronous Message is being processed.</dd>
71 </dl>
72
73 </p>
74
75 <h2>Meta Dispatcher Design</h2>
76 <p>
77 Three points are necessary to avoid deadlocks and to
78 provide pluggable services in Pegaus. The first thing is
79 <b>independent execution paths</b> of service modules. i.e.,
80 each service must have its own thread(s), which must not intersect
81 with the thread(s) of other services. Intersection of execution
82 paths can occur indirectly through IPC objects such as mutexes,
83 conditions, and semaphores.
84 </p>
85 mday 1.1 <p>
86 The second point that is necessary is <b>interface
87 abstraction</b>, which the Meta Dispatcher provides through
88 C++ polymorphism. This allows pluggable services. i.e., one
89 service can replace another and the system will continue to
90 function (hopefully in an improved manner).
91 </p>
92 <p>
93 The third point that is neccesary is a <b>central
94 message broker</b> that isolates services from each other,
95 thereby preventing deadlocks. The central message broker also
96 provides message responses for services that are paused,
97 stopped, or not present (plugged in).
98 </p>
99
100
101 <h3>Central Hub</h3>
102 <p>
103 The Meta Dispatcher therefore acts as a central message
104 hub. Services communicate with each other <i>via</i> the Meta
105 Dispatcher.
106 mday 1.1
107 <pre>
108 Service A--Message----1----> (block on semaphore)
109 |
110 |
111 Meta Dispatcher|
112 |
113 Message----2->Service B
114 |
115 |
116 (Signal Semaphore) <---Response---3-- +
117 |
118 Service A <--Response--4---+
119
120 </pre>
121
122 The numbered steps above are as follows:
123 <ol>
124 <li><b>Service A</b> creates a new <code>AsyncMessage</code> and
125 <code>AsyncOpNode</code>and sends that message to <b>Service
126 B</b> by calling
127 mday 1.1 <code>MessageQueueService::SendWait</code>. The calling thread
128 blocks on the <b>client emaphore</b> until the response is ready.</li><br>
129
130 <li>The Meta Dispatcher's routing thread picks up the message
131 and inserts it into <b>Service B's</b> incoming message
132 queue. The routing thread returns to the Meta Dispatcher to
133 route the next message in the system.</li><br>
134
135 <li><b>Service B's</b>incoming thread picks up the message and
136 calls the its message handler. Message handlers are virtual,
137 so a class derived from <b>MessageQueueService</b>can define
138 its own message handlers to override the default
139 handlers. When the message handler has constructed an
140 <b>AsyncReply</b> that reply gets linked to the
141 <b>AsyncOpNode</b>. The MessageQueueService then signals the
142 <b>client semaphore</b> within the op node.</li><br>
143
144 <li><b>Service A</b> awakens when the <b>client semaphore</b> is
145 signalled. It pulls the <b>AsyncResponse</b> message from the
146 <b>AsyncOpNode</b> and processes the result. Service A is
147 responsible for discarding the request, response, and
148 mday 1.1 AsyncOpNode objects. The existing classes have mechanisms for
149 caching these objects to avoid too frequent
150 construction/destruction of them.
151 </ol>
152 </p>
153
154 <h2>Test Program</h2>
155 <p>
156 The concepts explained below are all contained in the test
157 program for the Meta Dispatcher, which is located in
158 <code>$PEGASUS_HOME/src/Pegasus/Common/tests/MessageQueueService/</code>
159
160 </p>
161
162 <h2>Service Registration and Deregistration</h2>
163 <p>
164 Services (classes derived from
165 <code>MessageQueueService</code>must register their presence
166 with the Meta Dispatcher. This is done as follows (taken from
167 the test program):
168
169 mday 1.1 <ol>
170 <li><b>Define the Service Class</b></li>
171 <pre>
172 // Define our service class
173
174 class MessageQueueClient : public MessageQueueService
175 {
176
177 public:
178 typedef MessageQueueService Base;
179
180 MessageQueueClient(char *name)
181 : Base(name, MessageQueue::getNextQueueId(), 0,
182 message_mask::type_cimom |
183 message_mask::type_service |
184 message_mask::ha_request |
185 message_mask::ha_reply |
186 message_mask::ha_async ),
187 client_xid(1)
188 {
189 _client_capabilities = Base::_capabilities;
190 mday 1.1 _client_mask = Base::_mask;
191 }
192
193 virtual ~MessageQueueClient(void)
194 {
195 }
196
197 // method to indicate acceptance of message to
198 // Meta Dispatcher
199 virtual Boolean messageOK(const Message *msg);
200
201 // function to send a request to another service
202 void send_test_request(char *greeting, Uint32 qid);
203 Uint32 get_qid(void);
204
205 Uint32 _client_capabilities;
206 Uint32 _client_mask;
207
208 // method to receive messages from the Meta Dispatcher,
209 // MUST be defined
210 virtual void _handle_async_request(AsyncRequest *req);
211 mday 1.1
212 AtomicInt client_xid;
213 };
214
215 </pre>
216 <li><b>Construct the Service</b></li>
217 <pre>
218 // Create our Service
219 MessageQueueClient *q_client =
220 new MessageQueueClient("test client");
221
222 </pre>
223 <li><b>Register the Service</b></li>
224 <pre>
225 // Register our service with the Meta Dispatcher
226 q_client->register_service("test client",
227 q_client->_client_capabilities,
228 q_client->_client_mask);
229 cout << " client registered " << endl;
230 </pre>
231
232 mday 1.1 </ol>
233
234 The example above hides many of the details which are handled by
235 the MessageQueueService's constructor, such as creating the
236 background thread, finding the Meta Dispatcher, and constructing
237 the queues. But a derived class as the example shows does not
238 need to worry about those details.
239 </p>
240
241 <h2>Finding Other Services</h2>
242 <p>
243 The MessageQueueService class has an api for finding other
244 services. This api is built using messages that are defined in
245 <code>CimomMessage.h</code>. Here is an example from the test
246 program:
247
248 <pre>
249
250 Array<Uint32> services;
251
252 while( services.size() == 0 )
253 mday 1.1 {
254 q_client->find_services(String("test server"), 0, 0, &services);
255 pegasus_yield();
256 }
257
258 cout << "found server at " << services[0] << endl;
259
260
261 </pre>
262
263 The code sample above shows how to find services by their
264 name. The api also allows finding services by their capabilities
265 or the messages they support. Note that the return is an array
266 of Queue IDs. It is possible, for example, to find multiple
267 services.
268 </p>
269
270 <h2>Sending an Asynchronous Message to Another Service</h2>
271 <p>
272 The "handle" for a services is its Queue ID. Once you have the
273 Queue ID you can send a message to that service. The example
274 mday 1.1 above shows one way to get a service's Queue ID. Here is an
275 example that shows how to send that service a message.
276
277 <ol>
278 <li><b>Define the Request and Response Message Pair by Inheriting from AsyncMessage.</b></li>
279 <pre>
280
281 class test_request : public AsyncRequest
282 {
283
284 public:
285 typedef AsyncRequest Base;
286
287 test_request(Uint32 routing,
288 AsyncOpNode *op,
289 Uint32 destination,
290 Uint32 response,
291 char *message)
292 : Base(0x04100000,
293 Message::getNextKey(),
294 routing,
295 mday 1.1 0,
296 op,
297 destination,
298 response,
299 true),
300 greeting(message)
301 {
302
303 }
304
305 virtual ~test_request(void)
306 {
307
308
309 }
310
311 String greeting;
312 };
313
314
315 class test_response : public AsyncReply
316 mday 1.1 {
317 public:
318 typedef AsyncReply Base;
319
320
321 test_response(Uint32 key,
322 Uint32 routing,
323 AsyncOpNode *op,
324 Uint32 result,
325 Uint32 destination,
326 char *message)
327 : Base(0x04200000,
328 key,
329 routing,
330 0,
331 op,
332 result,
333 destination,
334 true),
335 greeting(message)
336 {
337 mday 1.1
338 }
339
340 virtual ~test_response(void)
341 {
342
343 }
344
345 String greeting;
346 };
347
348 </pre>
349
350 The function <code>send_test_request</code> shows everything
351 that is necessary to send a message to another service and
352 process the reply.
353
354 <pre>
355
356 void MessageQueueClient::send_test_request(char *greeting, Uint32 qid)
357 {
358 mday 1.1
359 </pre>
360 <li><b>Construct the Request</b></li>
361
362 <pre>
363 test_request *req =
364 new test_request(Base::get_next_xid(),
365 0,
366 qid, // destination queue ID
367 _queueId, // my own queue ID
368 greeting); // message parameter
369
370 </pre>
371
372 <li><b>Send the message using <code>MessageQueueService::SendWait</code></b></li>
373
374 <pre>
375 AsyncMessage *response = SendWait(req);
376
377 </pre>
378
379 mday 1.1 <li><b>Process the Response.</b></i>
380 <pre>
381 if( response != 0 )
382 {
383 msg_count++;
384 delete response;
385 cout << " test message " << msg_count.value() << endl;
386
387 }
388 delete req;
389 }
390
391 </pre>
392
393 <li><b>Delete the Request and the Response. The
394 <code>SendWait</code> interface creates and disposes of
395 everything else.</b></li>
396
397 </ol>
398 </p>
399
400 mday 1.1
401 <h2>Handling an Incoming Message </h2>
402
403 <p>
404 To handle messages the service needs to implement the
405 following methods.
406
407 <ol>
408
409 <li><b><code>virtual Boolean MessageOK(const Message
410 *)</code></b></li>
411
412 This method allows the Service to accept or reject the
413 message. The Meta Dispatcher will always call this method
414 before inserting the request on the Service's queue.
415
416 <pre>
417
418 Boolean MessageQueueServer::messageOK(const Message *msg)
419 {
420 if(msg->getMask() & message_mask::ha_async)
421 mday 1.1 {
422 if( msg->getType() == 0x04100000 ||
423 msg->getType() == async_messages::CIMSERVICE_STOP ||
424 msg->getType() == async_messages::CIMSERVICE_PAUSE ||
425 msg->getType() == async_messages::CIMSERVICE_RESUME )
426 return true;
427 }
428 return false;
429 }
430
431 </pre>
432
433 <li><b><code>virtual Boolean accept_async(AsyncOpNode
434 *operation)</code> (optional) </b></li>
435
436 This method executes on the Meta Dispatcher's thread and links
437 the incoming message to the Service's queue. <br><br>
438
439
440
441 <li><b><code>virtual void _handle_incoming_operation(AsyncOpNode
442 mday 1.1 *)</code></b></li><br>
443
444
445 This method is called by the Service's background thread. Here is an
446 example implementation that just does some sanity checking on
447 the message.
448
449 <pre>
450
451 void MessageQueueServer::_handle_incoming_operation(AsyncOpNode *op)
452 {
453 if ( operation != 0 )
454 {
455 Message *rq = operation->get_request();
456 PEGASUS_ASSERT(rq != 0 );
457 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
458 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
459 _handle_async_request(static_cast<AsyncRequest *>(rq));
460 }
461
462 return;
463 mday 1.1
464 }
465
466
467 </pre>
468
469 <li><b><code>virtual void _handle_async_request(AsyncRequest *)</code></b></li><br>
470 <br>
471
472 This method handles the request. The Service must implement
473 this method. <b>If the Service does not handle the Request it
474 must pass the Request to the Base class by calling <code>Base::_handle_async_request(req)</code></b>
475
476 <pre>
477 void MessageQueueServer::_handle_async_request(AsyncRequest *req)
478 {
479 if (req->getType() == 0x04100000 )
480 {
481 req->op->processing();
482 handle_test_request(req); // Message Handler
483 }
484 mday 1.1 else if ( req->getType() == async_messages::CIMSERVICE_STOP )
485 {
486 req->op->processing();
487 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
488 }
489
490 else
491 Base::_handle_async_request(req); // Give it to the Base !!
492 }
493
494 </pre>
495
496 <li><b>Specific Message Handlers</b>
497
498 Each Message handler will be defined by the format of the
499 Request/Response pair. Here is an example from the test
500 program.
501
502 <pre>
503
504 if( msg->getType() == 0x04100000 )
505 mday 1.1 {
506
507 </pre>
508 <ol>
509 <li><b>Construct the Reply</b></li>
510 <pre>
511
512 test_response *resp =
513 new test_response(msg->getKey(),
514 msg->getRouting(),
515 msg->op,
516 async_results::OK,
517 msg->dest,
518 "i am a test response");
519
520
521 </pre>
522 <li><b>Complete the Reply</b> by calling the following
523 helper routine in the Base class</li>
524
525 <pre>
526 mday 1.1 _completeAsyncResponse(msg, resp, ASYNC_OPSTATE_COMPLETE, 0);
527
528
529 }
530 </pre>
531
532 </ol>
533 </p>
534
535
536 <hr>
537 <h2>Class Definitions</h2>
538
539 <h3>cimom (Meta Dispatcher)</h3>
540 <pre>
541 class PEGASUS_COMMON_LINKAGE cimom : public MessageQueue
542 {
543 public :
544 cimom(void);
545
546 virtual ~cimom(void) ;
547 mday 1.1
548 Boolean moduleChange(struct timeval last);
549
550 Uint32 getModuleCount(void);
551 Uint32 getModuleIDs(Uint32 *ids, Uint32 count) throw(IPCException);
552
553 AsyncOpNode *get_cached_op(void) throw(IPCException);
554 void cache_op(AsyncOpNode *op) throw(IPCException);
555
556 void set_default_op_timeout(const struct timeval *buffer);
557 void get_default_op_timeout(struct timeval *timeout) const ;
558
559 virtual void handleEnqueue();
560 void register_module(RegisterCimService *msg);
561 void deregister_module(Uint32 quid);
562 void update_module(UpdateCimService *msg );
563 void ioctl(AsyncIoctl *msg );
564
565 void find_service_q(FindServiceQueue *msg );
566 void enumerate_service(EnumerateService *msg );
567 Boolean route_async(AsyncOpNode *operation);
568 mday 1.1 void _shutdown_routed_queue(void);
569
570
571 protected:
572 Uint32 get_module_q(const String & name);
573 void _make_response(AsyncRequest *req, Uint32 code);
574 void _completeAsyncResponse(AsyncRequest *request,
575 AsyncReply *reply,
576 Uint32 state,
577 Uint32 flag);
578 private:
579 struct timeval _default_op_timeout;
580 struct timeval _last_module_change;
581 DQueue<message_module> _modules;
582
583 DQueue<AsyncOpNode> _recycle;
584
585 AsyncDQueue<AsyncOpNode> _routed_ops;
586 DQueue<AsyncOpNode> _internal_ops;
587
588 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _routing_proc(void *);
589 mday 1.1
590 Thread _routing_thread;
591
592 static Uint32 get_xid(void);
593 void _handle_cimom_op(AsyncOpNode *op, Thread *thread, MessageQueue *queue);
594 Uint32 _ioctl(Uint32, Uint32, void *);
595
596
597 AtomicInt _die;
598 AtomicInt _routed_queue_shutdown;
599
600 static AtomicInt _xid;
601
602 // CIMOperationRequestDispatcher *_cim_dispatcher;
603 // CIMOperationResponseEncoder *_cim_encoder;
604 // CIMOperationRequestDecoder *_cim_decoder;
605 // CIMRepository *_repository;
606
607 };
608
609 </pre>
610 mday 1.1
611
612 <h3>MessageQueueService</h3>
613
614 <pre>
615
616 class message_module;
617
618 class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
619 {
620 public:
621
622 typedef MessageQueue Base;
623
624 MessageQueueService(const char *name, Uint32 queueID, Uint32 capabilities, Uint32 mask) ;
625
626 virtual ~MessageQueueService(void);
627
628 virtual void handle_heartbeat_request(AsyncRequest *req);
629 virtual void handle_heartbeat_reply(AsyncReply *rep);
630
631 mday 1.1 virtual void handle_AsyncIoctl(AsyncIoctl *req);
632 virtual void handle_CimServiceStart(CimServiceStart *req);
633 virtual void handle_CimServiceStop(CimServiceStop *req);
634 virtual void handle_CimServicePause(CimServicePause *req);
635 virtual void handle_CimServiceResume(CimServiceResume *req);
636
637 virtual void handle_AsyncOperationStart(AsyncOperationStart *req);
638 virtual void handle_AsyncOperationResult(AsyncOperationResult *req);
639 virtual Boolean accept_async(AsyncOpNode *op);
640 virtual Boolean messageOK(const Message *msg) ;
641
642 AsyncReply *SendWait(AsyncRequest *request);
643
644 void _completeAsyncResponse(AsyncRequest *request,
645 AsyncReply *reply,
646 Uint32 state,
647 Uint32 flag);
648 Boolean register_service(String name, Uint32 capabilities, Uint32 mask);
649 Boolean update_service(Uint32 capabilities, Uint32 mask);
650 Boolean deregister_service(void);
651 virtual void _shutdown_incoming_queue(void);
652 mday 1.1 void find_services(String name,
653 Uint32 capabilities,
654 Uint32 mask,
655 Array<Uint32> *results);
656 void enumerate_service(Uint32 queue, message_module *result);
657 Uint32 get_next_xid(void);
658 AsyncOpNode *get_op(void);
659 void return_op(AsyncOpNode *op);
660 Uint32 _capabilities;
661 Uint32 _mask;
662 AtomicInt _die;
663 protected:
664
665 virtual void _handle_incoming_operation(AsyncOpNode *operation, Thread *thread, MessageQueue *queue);
666 virtual void _handle_async_request(AsyncRequest *req);
667 virtual void _make_response(AsyncRequest *req, Uint32 code);
668 cimom *_meta_dispatcher;
669
670 private:
671 void handleEnqueue();
672 DQueue<AsyncOpNode> _pending;
673 mday 1.1 AsyncDQueue<AsyncOpNode> _incoming;
674
675 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _req_proc(void *);
676 AtomicInt _incoming_queue_shutdown;
677
678 Thread _req_thread;
679
680 struct timeval _default_op_timeout;
681
682 static AtomicInt _xid;
683
684 };
685
686
687 </pre>
688
689 <h3>Asynchronous Messages</h3>
690
691 <pre>
692
693 extern const Uint32 CIMOM_Q_ID;
694 mday 1.1
695 class AsyncOpNode;
696
697 class PEGASUS_COMMON_LINKAGE async_results
698 {
699 public:
700 static const Uint32 OK;
701 static const Uint32 PARAMETER_ERROR;
702 static const Uint32 MODULE_ALREADY_REGISTERED;
703 static const Uint32 MODULE_NOT_FOUND;
704 static const Uint32 INTERNAL_ERROR;
705
706 static const Uint32 ASYNC_STARTED;
707 static const Uint32 ASYNC_PROCESSING;
708 static const Uint32 ASYNC_COMPLETE;
709 static const Uint32 ASYNC_CANCELLED;
710 static const Uint32 ASYNC_PAUSED;
711 static const Uint32 ASYNC_RESUMED;
712
713 static const Uint32 CIM_SERVICE_STARTED;
714 static const Uint32 CIM_SERVICE_STOPPED;
715 mday 1.1 static const Uint32 CIM_SERVICE_PAUSED;
716
717 static const Uint32 CIM_SERVICE_RESUMED;
718 static const Uint32 CIM_NAK;
719
720 static const Uint32 ASYNC_PHASE_COMPLETE;
721 static const Uint32 ASYNC_CHILD_COMPLETE;
722 static const Uint32 ASYNC_PHASE_STARTED;
723 static const Uint32 ASYNC_CHILD_STARTED;
724 static const Uint32 CIM_PAUSED;
725 static const Uint32 CIM_STOPPED;
726
727 };
728
729
730 class PEGASUS_COMMON_LINKAGE async_messages
731 {
732 public:
733 static const Uint32 HEARTBEAT;
734 static const Uint32 REPLY;
735 static const Uint32 REGISTER_CIM_SERVICE;
736 mday 1.1 static const Uint32 DEREGISTER_CIM_SERVICE;
737 static const Uint32 UPDATE_CIM_SERVICE;
738 static const Uint32 IOCTL;
739 static const Uint32 CIMSERVICE_START;
740 static const Uint32 CIMSERVICE_STOP;
741 static const Uint32 CIMSERVICE_PAUSE;
742 static const Uint32 CIMSERVICE_RESUME;
743
744 static const Uint32 ASYNC_OP_START;
745 static const Uint32 ASYNC_OP_RESULT;
746 static const Uint32 ASYNC_LEGACY_OP_START;
747 static const Uint32 ASYNC_LEGACY_OP_RESULT;
748
749 static const Uint32 FIND_SERVICE_Q;
750 static const Uint32 FIND_SERVICE_Q_RESULT;
751 static const Uint32 ENUMERATE_SERVICE;
752 static const Uint32 ENUMERATE_SERVICE_RESULT;
753 };
754
755
756 class PEGASUS_COMMON_LINKAGE AsyncMessage : public Message
757 mday 1.1 {
758 public:
759 AsyncMessage(Uint32 type,
760 Uint32 key,
761 Uint32 routing,
762 Uint32 mask,
763 AsyncOpNode *operation);
764
765 virtual ~AsyncMessage(void)
766 {
767
768 }
769
770 Boolean operator ==(void *key);
771 Boolean operator ==(const AsyncMessage& msg);
772
773 AsyncOpNode *op;
774 Thread *_myself;
775 MessageQueue *_service;
776 };
777
778 mday 1.1
779 inline Boolean AsyncMessage::operator ==(void *key)
780 {
781 if( key == reinterpret_cast<void *>(this))
782 return true;
783 return false;
784 }
785
786 inline Boolean AsyncMessage::operator ==(const AsyncMessage& msg)
787 {
788 return this->operator==(reinterpret_cast<void *>(const_cast<AsyncMessage *>(&msg)));
789 }
790
791
792 class PEGASUS_COMMON_LINKAGE AsyncRequest : public AsyncMessage
793 {
794 public:
795 AsyncRequest(Uint32 type,
796 Uint32 key,
797 Uint32 routing,
798 Uint32 mask,
799 mday 1.1 AsyncOpNode *operation,
800 Uint32 destination,
801 Uint32 response,
802 Boolean blocking);
803
804
805 virtual ~AsyncRequest(void)
806 {
807
808 }
809
810 Uint32 dest;
811 Uint32 resp;
812 Boolean block;
813 };
814
815 class PEGASUS_COMMON_LINKAGE AsyncReply : public AsyncMessage
816 {
817 public:
818 AsyncReply(Uint32 type,
819 Uint32 key,
820 mday 1.1 Uint32 routing,
821 Uint32 mask,
822 AsyncOpNode *operation,
823 Uint32 result_code,
824 Uint32 destination,
825 Boolean blocking);
826
827
828 virtual ~AsyncReply(void)
829 {
830 if(op != 0 )
831 delete op;
832
833 }
834
835 Uint32 result;
836 Uint32 dest;
837 Boolean block;
838 };
839
840
841 mday 1.1
842 class PEGASUS_COMMON_LINKAGE RegisterCimService : public AsyncRequest
843 {
844 public:
845 RegisterCimService(Uint32 routing,
846 AsyncOpNode *operation,
847 Boolean blocking,
848 String service_name,
849 Uint32 service_capabilities,
850 Uint32 service_mask,
851 Uint32 service_queue);
852
853 virtual ~RegisterCimService(void)
854 {
855
856 }
857
858 String name;
859 Uint32 capabilities;
860 Uint32 mask;
861 Uint32 queue;
862 mday 1.1 };
863
864 class PEGASUS_COMMON_LINKAGE DeRegisterCimService : public AsyncRequest
865 {
866 public:
867 DeRegisterCimService(Uint32 routing,
868 AsyncOpNode *operation,
869 Boolean blocking,
870 Uint32 service_queue);
871
872
873 virtual ~DeRegisterCimService(void)
874 {
875
876 }
877
878 Uint32 queue;
879 } ;
880
881 class PEGASUS_COMMON_LINKAGE UpdateCimService : public AsyncRequest
882 {
883 mday 1.1 public:
884 UpdateCimService(Uint32 routing,
885 AsyncOpNode *operation,
886 Boolean blocking,
887 Uint32 service_queue,
888 Uint32 service_capabilities,
889 Uint32 service_mask);
890
891 virtual ~UpdateCimService(void)
892 {
893
894 }
895
896 Uint32 queue;
897 Uint32 capabilities;
898 Uint32 mask;
899 };
900
901
902 class PEGASUS_COMMON_LINKAGE AsyncIoctl : public AsyncRequest
903 {
904 mday 1.1 public:
905 AsyncIoctl(Uint32 routing,
906 AsyncOpNode *operation,
907 Uint32 destination,
908 Uint32 response,
909 Boolean blocking,
910 Uint32 code,
911 Uint32 int_param,
912 void *p_param);
913
914 virtual ~AsyncIoctl(void)
915 {
916
917 }
918
919 enum
920 {
921 IO_CLOSE,
922 IO_OPEN,
923 IO_SOURCE_QUENCH,
924 IO_SERVICE_DEFINED
925 mday 1.1 };
926
927
928
929 Uint32 ctl;
930 Uint32 intp;
931 void *voidp;
932
933 };
934
935 class PEGASUS_COMMON_LINKAGE CimServiceStart : public AsyncRequest
936 {
937 public:
938 CimServiceStart(Uint32 routing,
939 AsyncOpNode *operation,
940 Uint32 destination,
941 Uint32 response,
942 Boolean blocking);
943
944 virtual ~CimServiceStart(void)
945 {
946 mday 1.1
947 }
948 };
949
950
951 class PEGASUS_COMMON_LINKAGE CimServiceStop : public AsyncRequest
952 {
953 public:
954 CimServiceStop(Uint32 routing,
955 AsyncOpNode *operation,
956 Uint32 destination,
957 Uint32 response,
958 Boolean blocking);
959
960 virtual ~CimServiceStop(void)
961 {
962
963 }
964 };
965
966 class PEGASUS_COMMON_LINKAGE CimServicePause : public AsyncRequest
967 mday 1.1 {
968 public:
969 CimServicePause(Uint32 routing,
970 AsyncOpNode *operation,
971 Uint32 destination,
972 Uint32 response,
973 Boolean blocking);
974
975
976 virtual ~CimServicePause(void)
977 {
978
979 }
980 };
981
982 class PEGASUS_COMMON_LINKAGE CimServiceResume : public AsyncRequest
983 {
984 public:
985 CimServiceResume(Uint32 routing,
986 AsyncOpNode *operation,
987 Uint32 destination,
988 mday 1.1 Uint32 response,
989 Boolean blocking);
990
991
992 virtual ~CimServiceResume(void)
993 {
994
995 }
996 };
997
998 class PEGASUS_COMMON_LINKAGE AsyncOperationStart : public AsyncRequest
999 {
1000 public:
1001 AsyncOperationStart(Uint32 routing,
1002 AsyncOpNode *operation,
1003 Uint32 destination,
1004 Uint32 response,
1005 Boolean blocking,
1006 Message *action);
1007
1008
1009 mday 1.1 virtual ~AsyncOperationStart(void)
1010 {
1011
1012 }
1013
1014 Message *act;
1015 };
1016
1017 class PEGASUS_COMMON_LINKAGE AsyncOperationResult : public AsyncReply
1018 {
1019 public:
1020 AsyncOperationResult(Uint32 key,
1021 Uint32 routing,
1022 AsyncOpNode *operation,
1023 Uint32 result_code,
1024 Uint32 destination,
1025 Uint32 blocking);
1026
1027
1028 virtual ~AsyncOperationResult(void)
1029 {
1030 mday 1.1
1031 }
1032 };
1033
1034
1035 class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationStart : public AsyncRequest
1036 {
1037 public:
1038 AsyncLegacyOperationStart(Uint32 routing,
1039 AsyncOpNode *operation,
1040 Uint32 destination,
1041 Message *action);
1042
1043
1044 virtual ~AsyncLegacyOperationStart(void)
1045 {
1046
1047 }
1048
1049 Message *act;
1050 };
1051 mday 1.1
1052 class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationResult : public AsyncReply
1053 {
1054 public:
1055 AsyncLegacyOperationResult(Uint32 key,
1056 Uint32 routing,
1057 AsyncOpNode *operation,
1058 Message *result);
1059
1060 virtual ~AsyncLegacyOperationResult(void)
1061 {
1062
1063 }
1064
1065 Message *res;
1066 };
1067
1068
1069 class PEGASUS_COMMON_LINKAGE FindServiceQueue : public AsyncRequest
1070 {
1071 public:
1072 mday 1.1 FindServiceQueue(Uint32 routing,
1073 AsyncOpNode *operation,
1074 Uint32 response,
1075 Boolean blocking,
1076 String service_name,
1077 Uint32 service_capabilities,
1078 Uint32 service_mask);
1079
1080 virtual ~FindServiceQueue(void)
1081 {
1082
1083 }
1084
1085 String name;
1086 Uint32 capabilities;
1087 Uint32 mask;
1088 } ;
1089
1090 class PEGASUS_COMMON_LINKAGE FindServiceQueueResult : public AsyncReply
1091 {
1092 public:
1093 mday 1.1 FindServiceQueueResult(Uint32 key,
1094 Uint32 routing,
1095 AsyncOpNode *operation,
1096 Uint32 result_code,
1097 Uint32 destination,
1098 Boolean blocking,
1099 Array<Uint32> queue_ids);
1100
1101
1102 virtual ~FindServiceQueueResult(void)
1103 {
1104
1105 }
1106
1107 Array<Uint32> qids;
1108 } ;
1109
1110 class PEGASUS_COMMON_LINKAGE EnumerateService : public AsyncRequest
1111 {
1112 public:
1113 EnumerateService(Uint32 routing,
1114 mday 1.1 AsyncOpNode *operation,
1115 Uint32 response,
1116 Boolean blocking,
1117 Uint32 queue_id);
1118
1119
1120 virtual ~EnumerateService(void)
1121 {
1122
1123 }
1124
1125 Uint32 qid;
1126 };
1127
1128 class PEGASUS_COMMON_LINKAGE EnumerateServiceResponse : public AsyncReply
1129 {
1130 public:
1131 EnumerateServiceResponse(Uint32 key,
1132 Uint32 routing,
1133 AsyncOpNode *operation,
1134 Uint32 result_code,
1135 mday 1.1 Uint32 response,
1136 Boolean blocking,
1137 String service_name,
1138 Uint32 service_capabilities,
1139 Uint32 service_mask,
1140 Uint32 service_qid);
1141
1142
1143 virtual ~EnumerateServiceResponse(void)
1144 {
1145
1146 }
1147
1148 String name;
1149 Uint32 capabilities;
1150 Uint32 mask;
1151 Uint32 qid;
1152 };
1153
1154 </pre>
1155
1156 mday 1.1 <h3>AsyncOPNode</h3>
1157
1158 <pre>
1159 #define ASYNC_OPFLAGS_UNKNOWN 0x00000000
1160 #define ASYNC_OPFLAGS_INTERVAL_REPEAT 0x00000010
1161 #define ASYNC_OPFLAGS_INDICATION 0x00000020
1162 #define ASYNC_OPFLAGS_REMOTE 0x00000040
1163 #define ASYNC_OPFLAGS_LOCAL_OUT_OF_PROC 0x00000080
1164 #define ASYNC_OPFLAGS_PHASED 0x00000001
1165 #define ASYNC_OPFLAGS_PARTIAL 0x00000002
1166 #define ASYNC_OPFLAGS_NORMAL 0x00000000
1167 #define ASYNC_OPFLAGS_SINGLE 0x00000008
1168 #define ASYNC_OPFLAGS_MULTIPLE 0x00000010
1169 #define ASYNC_OPFLAGS_TOTAL 0x00000020
1170 #define ASYNC_OPFLAGS_META_DISPATCHER 0x00000040
1171
1172 #define ASYNC_OPSTATE_UNKNOWN 0x00000000
1173 #define ASYNC_OPSTATE_OFFERED 0x00000001
1174 #define ASYNC_OPSTATE_DECLINED 0x00000002
1175 #define ASYNC_OPSTATE_STARTED 0x00000004
1176 #define ASYNC_OPSTATE_PROCESSING 0x00000008
1177 mday 1.1 #define ASYNC_OPSTATE_DELIVER 0x00000010
1178 #define ASYNC_OPSTATE_RESERVE 0x00000020
1179 #define ASYNC_OPSTATE_COMPLETE 0x00000040
1180 #define ASYNC_OPSTATE_TIMEOUT 0x00000080
1181 #define ASYNC_OPSTATE_CANCELLED 0x00000100
1182 #define ASYNC_OPSTATE_PAUSED 0x00000200
1183 #define ASYNC_OPSTATE_SUSPENDED 0x00000400
1184 #define ASYNC_OPSTATE_RESUMED 0x00000800
1185 #define ASYNC_OPSTATE_ORPHANED 0x00001000
1186 #define ASYNC_OPSTATE_RELEASED 0x00002000
1187
1188 class Cimom;
1189
1190 class PEGASUS_COMMON_LINKAGE AsyncOpNode
1191 {
1192 public:
1193
1194 AsyncOpNode(void);
1195 ~AsyncOpNode(void);
1196
1197 Boolean operator == (const void *key) const;
1198 mday 1.1 Boolean operator == (const AsyncOpNode & node) const;
1199
1200 void get_timeout_interval(struct timeval *buffer) ;
1201 void set_timeout_interval(const struct timeval *interval);
1202
1203 Boolean timeout(void) ;
1204
1205 OperationContext & get_context(void) ;
1206
1207 void put_request(const Message *request) ;
1208 Message *get_request(void) ;
1209
1210 void put_response(const Message *response) ;
1211 Message *get_response(void) ;
1212
1213 Uint32 read_state(void) ;
1214 void write_state(Uint32) ;
1215
1216 Uint32 read_flags(void);
1217 void write_flags(Uint32);
1218
1219 mday 1.1 void lock(void) throw(IPCException);
1220 void unlock(void) throw(IPCException);
1221 void udpate(void) throw(IPCException);
1222 void deliver(const Uint32 count) throw(IPCException);
1223 void reserve(const Uint32 size) throw(IPCException);
1224 void processing(void) throw(IPCException) ;
1225 void processing(OperationContext *context) throw(IPCException);
1226 void complete(void) throw(IPCException) ;
1227 void complete(OperationContext *context) throw(IPCException);
1228 void release(void);
1229 void wait(void);
1230
1231
1232 private:
1233 Semaphore _client_sem;
1234 Mutex _mut;
1235 unlocked_dq<Message> _request;
1236 unlocked_dq<Message> _response;
1237
1238 OperationContext _operation_list;
1239 Uint32 _state;
1240 mday 1.1 Uint32 _flags;
1241 Uint32 _offered_count;
1242 Uint32 _total_ops;
1243 Uint32 _completed_ops;
1244 Uint32 _user_data;
1245
1246 struct timeval _start;
1247 struct timeval _lifetime;
1248 struct timeval _updated;
1249 struct timeval _timeout_interval;
1250
1251 AsyncOpNode *_parent;
1252 unlocked_dq<AsyncOpNode> _children;
1253
1254 void _reset(unlocked_dq<AsyncOpNode> *dst_q);
1255
1256 // the lifetime member is for cache management by the cimom
1257 void _set_lifetime(struct timeval *lifetime) ;
1258 Boolean _check_lifetime(void) ;
1259
1260 Boolean _is_child(void) ;
1261 mday 1.1 Uint32 _is_parent(void) ;
1262
1263 Boolean _is_my_child(const AsyncOpNode & caller) const;
1264 void _make_orphan( AsyncOpNode & parent) ;
1265 void _adopt_child(AsyncOpNode *child) ;
1266 void _disown_child(AsyncOpNode *child) ;
1267 friend class cimom;
1268 friend class MessageQueueService;
1269
1270 };
1271 </pre>
1272
1273 <hr>
1274 <address><a href="mailto:mdday@us.ibm.com">Michael Day</a></address>
1275 <!-- Created: Tue Feb 5 13:21:55 EST 2002 -->
1276 <!-- hhmts start -->
1277 Last modified: Tue Feb 5 18:09:50 EST 2002
1278 <!-- hhmts end -->
1279 </body>
1280 </html>
1281
1282 mday 1.1
|