(file) Return to MessageQueueService.html CVS log (file) (dir) Up to [Pegasus] / pegasus / doc

   1 mday  1.1 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
   2           <html>
   3             <head>
   4               <title>Pegasus Meta Dispatcher</title>
   5             </head>
   6           
   7             <body>
   8               <h1>Pegasus Meta Dispatcher</h1>
   9               <p>
  10                 The Pegasus Meta Dispatcher is a set of classes that extend the
  11                 existing MessageQueue messaging system to be dynamic,
  12                 asynchronous, and multithreaded. The primary classes consist of the
  13                 folowing:
  14               </p>
  15               <table border="1" bgcolor="#cdcdc1" cellpadding="3" width="80%">
  16                 <tr align="left"><th>Class</th><th>Derived from</th><th>Source file</th></tr>
  17           	<tr><td>cimom</td><td>MessageQueue</td><td>Pegasus/Common/Cimom.h</td></tr>
  18           	<tr><td>MessageQueueService</td><td>MessageQueue</td><td>Pegasus/Common/MessageQueueServices.h</td></tr>
  19           	<tr><td>CimomMessage</td><td>Message</td><td>Pegasus/Common/CimomMessage.h</td></tr>
  20           	<tr><td>AsyncOpNode</td><td>n/a</td><td>Pegasus/Common/AsyncOpNode.h</td></tr>
  21           	<tr><td>AsyncDQueue</td><td>unlocked_dq</td><td>Pegasus/Common/DQueue.h</td></tr>
  22 mday  1.1 	<tr><td>IPC classes</td><td>n/a</td><td>Pegasus/Common/IPC.h</td></tr>
  23           	<tr><td>Threading classes</td><td>n/a</td><td>Pegasus/Common/Thread.h</td></tr>
  24               </table>
  25               <br>
  26               <br>
  27           
  28                 <h2>Purposes of Meta Dispatcher</h2>
  29               <p>
  30                 The Meta Dispatcher has two primary goals:
  31                 <ol>
  32                 <li>Provide for orderly asynchronous message-based communication
  33           	among a dynamic set of Pegasus Services.</li>
  34                 <li>Preserve the existing message-passing architecture of
  35           	Pegasus.</li>
  36                 <li>Allow Pluggable Services such as repositories, provider
  37           	managers, and others.</li>
  38               </ol>
  39           <br>
  40           <br>
  41               Most of the purposes listed above revolve around maintaining the
  42               integrity of data and control flow in an asynchronous
  43 mday  1.1     multithreaded environment. 
  44               </p>
  45           
  46               <h2>Terms</h2>
  47               <p>
  48               <dl>
  49                 <dt><b>Meta Dispatcher</b></dt><dd>The central message broker, or
  50           	router, that provides the asynchronous communications within
  51           	Pegasus. Derived from the <b>MessageQueue</b> class. <br>
  52                 </dd>
  53                 <dt><b>Service</b></dt><dd>A Pegasus module that sends and
  54           	receives messages to other modules through the meta
  55           	dispatcher; A module that has enhanced privileges within
  56           	Pegasus and which provides one or more functions necessary to
  57           	the operation of Pegasus. Derived from the
  58           	<b>MessageQueue</b>class.<dt>
  59                 <dt><b>Asynchronous Message</b></dt><dd>A <b>pair</b> of
  60           	messages, consisting of a <b>request</b> and a <b>response</b>
  61           	that are treated as a single operation by Services. An
  62           	asynchronous message may be fronted by a synchronous
  63           	programming interface. Derived from the <b>Message</b> class.</dd>
  64 mday  1.1       <dt><b>AsyncOpNode</b></dt><dd>A control object that manages the
  65           	lifetime of an <b>Asynchronous Message</b>.The AsyncOpNode uses
  66           	many of the IPC object classes. A Service manages the lifetime
  67           	of an AsyncOpNode during the processing of the
  68           	message. However, it necessarily cedes control of the
  69           	AsyncOpNode to the <b>Meta Dispatcher</b> while the
  70           	Asynchronous Message is being processed.</dd>
  71               </dl>
  72               
  73               </p>
  74           
  75               <h2>Meta Dispatcher Design</h2>
  76               <p>
  77                 Three points are necessary to avoid deadlocks and to
  78                 provide pluggable services in Pegaus. The first thing is
  79                 <b>independent execution paths</b> of service modules. i.e.,
  80                 each service must have its own thread(s), which must not intersect
  81                 with the thread(s) of other services. Intersection of execution
  82                 paths can occur indirectly through IPC objects such as mutexes,
  83                 conditions, and semaphores.
  84               </p>
  85 mday  1.1     <p>
  86                 The second point that is necessary is <b>interface
  87           	abstraction</b>, which the Meta Dispatcher provides through
  88                 C++ polymorphism. This allows pluggable services. i.e., one
  89                 service can replace another and the system will continue to
  90                 function (hopefully in an improved manner). 
  91               </p>
  92               <p>
  93                 The third point that is neccesary is a <b>central
  94           	message broker</b> that isolates services from each other,
  95                 thereby preventing deadlocks. The central message broker also
  96                 provides message responses for services that are paused,
  97                 stopped, or not present (plugged in). 
  98               </p>
  99           
 100           
 101               <h3>Central Hub</h3>
 102               <p>
 103                 The Meta Dispatcher therefore acts as a central message
 104                 hub. Services communicate with each other <i>via</i> the Meta
 105                 Dispatcher.
 106 mday  1.1 
 107                 <pre>
 108                 Service A--Message----1----> (block on semaphore)
 109                                             |
 110                                             |
 111                                             Meta Dispatcher| 
 112                                                            |
 113                                                          Message----2->Service B
 114                                                                            |
 115                                                                            |
 116                                      (Signal Semaphore) <---Response---3-- +
 117                                            |
 118                 Service A <--Response--4---+
 119           
 120               </pre>
 121           
 122               The numbered steps above are as follows:
 123               <ol>
 124                 <li><b>Service A</b> creates a new <code>AsyncMessage</code> and
 125           	<code>AsyncOpNode</code>and sends that message to <b>Service
 126           	  B</b> by calling
 127 mday  1.1 	<code>MessageQueueService::SendWait</code>. The calling thread
 128                 blocks on the <b>client emaphore</b> until the response is ready.</li><br>
 129           
 130                 <li>The Meta Dispatcher's routing thread picks up the message
 131           	and inserts it into <b>Service B's</b> incoming message
 132           	queue. The routing thread returns to the Meta Dispatcher to
 133           	route the next message in the system.</li><br>
 134           
 135                 <li><b>Service B's</b>incoming thread picks up the message and
 136           	calls the its message handler. Message handlers are virtual,
 137           	so a class derived from <b>MessageQueueService</b>can define
 138           	its own message handlers to override the default
 139           	handlers. When the message handler has constructed an
 140           	<b>AsyncReply</b> that reply gets linked to the
 141           	<b>AsyncOpNode</b>. The MessageQueueService then signals the
 142           	<b>client semaphore</b> within the op node.</li><br>
 143           
 144                 <li><b>Service A</b> awakens when the <b>client semaphore</b> is
 145           	signalled. It pulls the <b>AsyncResponse</b> message from the
 146           	<b>AsyncOpNode</b> and processes the result. Service A is
 147           	responsible for discarding the request, response, and
 148 mday  1.1 	AsyncOpNode objects. The existing classes have mechanisms for
 149           	caching these objects to avoid too frequent
 150           	construction/destruction of them. 
 151               </ol>
 152               </p>
 153           
 154           <h2>Test Program</h2>
 155                 <p>
 156           	The concepts explained below are all contained in the test
 157           	program for the Meta Dispatcher, which is located in 
 158           	<code>$PEGASUS_HOME/src/Pegasus/Common/tests/MessageQueueService/</code>
 159           
 160                 </p>
 161           
 162                 <h2>Service Registration and Deregistration</h2>
 163                 <p>
 164           	Services (classes derived from
 165           	<code>MessageQueueService</code>must register their presence
 166           	with the Meta Dispatcher. This is done as follows (taken from
 167           	the test program):
 168           
 169 mday  1.1 	<ol>
 170           	<li><b>Define the Service Class</b></li>
 171           <pre>
 172           // Define our service class 
 173           
 174           class MessageQueueClient : public MessageQueueService
 175           {
 176                 
 177              public:
 178                 typedef MessageQueueService Base;
 179                 
 180                 MessageQueueClient(char *name)
 181           	 : Base(name, MessageQueue::getNextQueueId(), 0,  
 182           		message_mask::type_cimom | 
 183           		message_mask::type_service | 
 184           		message_mask::ha_request | 
 185           		message_mask::ha_reply | 
 186           		message_mask::ha_async ),
 187           	   client_xid(1)
 188                 {  
 189           	 _client_capabilities = Base::_capabilities;
 190 mday  1.1 	 _client_mask = Base::_mask;
 191                 }
 192                       
 193                 virtual ~MessageQueueClient(void) 
 194                 {
 195                 }
 196                 
 197                 // method to indicate acceptance of message to 
 198                 // Meta Dispatcher
 199                 virtual Boolean messageOK(const Message *msg);
 200           
 201                 // function to send a request to another service
 202                 void send_test_request(char *greeting, Uint32 qid);
 203                 Uint32 get_qid(void);
 204                 
 205                 Uint32 _client_capabilities;
 206                 Uint32 _client_mask;
 207                 
 208                 // method to receive messages from the Meta Dispatcher,
 209                 // MUST be defined
 210                 virtual void _handle_async_request(AsyncRequest *req);
 211 mday  1.1 
 212                 AtomicInt client_xid;
 213           };
 214           
 215           </pre>
 216                 <li><b>Construct the Service</b></li>
 217           <pre>
 218           // Create our Service
 219              MessageQueueClient *q_client = 
 220                     new MessageQueueClient("test client");
 221           
 222           </pre>
 223                 <li><b>Register the Service</b></li>
 224           <pre>
 225           // Register our service with the Meta Dispatcher
 226              q_client-&gt;register_service("test client", 
 227                                          q_client-&gt;_client_capabilities, 
 228                                          q_client-&gt;_client_mask);
 229              cout &lt;&lt; " client registered " &lt;&lt; endl;
 230           </pre>
 231           
 232 mday  1.1 </ol>
 233           
 234                 The example above hides many of the details which are handled by
 235                 the MessageQueueService's constructor, such as creating the
 236                 background thread, finding the Meta Dispatcher, and constructing
 237                 the queues. But a derived class as the example shows does not
 238                 need to worry about those details. 
 239                 </p>
 240           
 241           <h2>Finding Other Services</h2>
 242                 <p>
 243           	The MessageQueueService class has an api for finding other
 244           	services. This api is built using messages that are defined in
 245           	<code>CimomMessage.h</code>. Here is an example from the test
 246           	program:
 247           
 248           <pre>
 249           
 250              Array<Uint32> services; 
 251           
 252              while( services.size() == 0 )
 253 mday  1.1    {
 254                 q_client->find_services(String("test server"), 0, 0, &services); 
 255                 pegasus_yield();  
 256              }
 257              
 258              cout << "found server at " << services[0] << endl;
 259           
 260           
 261           </pre>
 262           
 263                 The code sample above shows how to find services by their
 264                 name. The api also allows finding services by their capabilities
 265                 or the messages they support. Note that the return is an array
 266                 of Queue IDs. It is possible, for example, to find multiple
 267                 services. 
 268                 </p>
 269           
 270           <h2>Sending an Asynchronous Message to Another Service</h2>
 271                 <p>
 272           	The "handle" for a services is its Queue ID. Once you have the
 273           	Queue ID you can send a message to that service. The example
 274 mday  1.1 	above shows one way to get a service's Queue ID. Here is an
 275           	example that shows how to send that service a message. 
 276           
 277           <ol>
 278           <li><b>Define the Request and Response Message Pair by Inheriting from AsyncMessage.</b></li>
 279           <pre>
 280           
 281           class test_request : public AsyncRequest
 282           {
 283             
 284              public:
 285                 typedef AsyncRequest Base;
 286                 
 287                 test_request(Uint32 routing, 
 288           		   AsyncOpNode *op, 
 289           		   Uint32 destination, 
 290           		   Uint32 response,
 291           		   char *message)
 292           	 : Base(0x04100000,
 293           		Message::getNextKey(), 
 294           		routing,
 295 mday  1.1 		0, 
 296           		op, 
 297           		destination, 
 298           		response, 
 299           		true),
 300           	   greeting(message) 
 301                 {   
 302           	 
 303                 }
 304                 
 305                 virtual ~test_request(void) 
 306                 {
 307           
 308           
 309                 }
 310                 
 311                 String greeting;
 312           };
 313           
 314           
 315           class test_response : public AsyncReply
 316 mday  1.1 {
 317              public:
 318                 typedef AsyncReply Base;
 319                 
 320           
 321                 test_response(Uint32 key, 
 322           		    Uint32 routing,
 323           		    AsyncOpNode *op, 
 324           		    Uint32 result,
 325           		    Uint32 destination, 
 326           		    char *message)
 327           	 : Base(0x04200000,
 328           		key, 
 329           		routing, 
 330           		0, 
 331           		op, 
 332           		result, 
 333           		destination,
 334           		true), 
 335           	   greeting(message) 
 336                 {  
 337 mday  1.1 	 
 338                 }
 339                 
 340                 virtual ~test_response(void)
 341                 {
 342           	 
 343                 }
 344                 
 345                 String greeting;
 346           };
 347           
 348           </pre>
 349           
 350                 The function <code>send_test_request</code> shows everything
 351                 that is necessary to send a message to another service and
 352                 process the reply. 
 353           
 354           <pre>
 355           
 356           void MessageQueueClient::send_test_request(char *greeting, Uint32 qid)
 357           {
 358 mday  1.1 
 359           </pre>
 360           <li><b>Construct the Request</b></li>
 361           
 362           <pre>
 363              test_request *req = 
 364                 new test_request(Base::get_next_xid(),
 365           		       0,
 366           		       qid,        // destination queue ID
 367           		       _queueId,   // my own queue ID 
 368           		       greeting);  // message parameter
 369           
 370           </pre>
 371           
 372           <li><b>Send the message using <code>MessageQueueService::SendWait</code></b></li>
 373           
 374           <pre>
 375              AsyncMessage *response = SendWait(req);
 376           
 377           </pre>
 378           
 379 mday  1.1 <li><b>Process the Response.</b></i>
 380           <pre>
 381              if( response != 0  )
 382              {
 383                 msg_count++; 
 384                 delete response; 
 385                 cout << " test message " << msg_count.value() << endl;
 386                 
 387              }
 388              delete req;
 389           }
 390           
 391           </pre>
 392           
 393           <li><b>Delete the Request and the Response. The
 394           	    <code>SendWait</code> interface creates and disposes of
 395           	    everything else.</b></li> 
 396           
 397           </ol>
 398                 </p>
 399           
 400 mday  1.1 
 401           <h2>Handling an Incoming Message </h2>
 402           
 403           	<p>
 404           	  To handle messages the service needs to implement the
 405           	  following methods. 
 406           
 407           <ol>
 408           
 409           <li><b><code>virtual Boolean MessageOK(const Message
 410           		*)</code></b></li>
 411           
 412           	  This method allows the Service to accept or reject the
 413           	message. The Meta Dispatcher will always call this method
 414           	before inserting the request on the Service's queue. 
 415           
 416           <pre>
 417           
 418           Boolean MessageQueueServer::messageOK(const Message *msg)
 419           {
 420              if(msg->getMask() & message_mask::ha_async)
 421 mday  1.1    {
 422                 if( msg->getType() == 0x04100000 ||
 423           	  msg->getType() == async_messages::CIMSERVICE_STOP || 
 424           	  msg->getType() == async_messages::CIMSERVICE_PAUSE || 
 425           	  msg->getType() == async_messages::CIMSERVICE_RESUME )
 426                 return true;
 427              }
 428              return false;
 429           }
 430           
 431           </pre>
 432           
 433           <li><b><code>virtual Boolean accept_async(AsyncOpNode
 434           	      *operation)</code> (optional) </b></li>
 435           
 436           	This method executes on the Meta Dispatcher's thread and links
 437           	the incoming message to the Service's queue. <br><br>
 438           
 439           
 440           
 441           <li><b><code>virtual void _handle_incoming_operation(AsyncOpNode
 442 mday  1.1 	      *)</code></b></li><br>
 443           
 444           
 445           This method is called by the Service's background thread. Here is an
 446           	example implementation that just does some sanity checking on
 447           	the message.
 448           
 449           <pre>
 450           
 451           void MessageQueueServer::_handle_incoming_operation(AsyncOpNode *op)
 452           {
 453              if ( operation != 0 )
 454              {
 455                 Message *rq = operation-&gt;get_request();
 456                 PEGASUS_ASSERT(rq != 0 );
 457                 PEGASUS_ASSERT(rq-&gt;getMask() & message_mask::ha_async );
 458                 PEGASUS_ASSERT(rq-&gt;getMask() & message_mask::ha_request);
 459                 _handle_async_request(static_cast&lt;AsyncRequest *&gt;(rq));
 460              }
 461                
 462              return;
 463 mday  1.1    
 464           }
 465           
 466           
 467           </pre>
 468           
 469           <li><b><code>virtual void _handle_async_request(AsyncRequest *)</code></b></li><br>
 470           <br>
 471           
 472           	This method handles the request. The Service must implement
 473           	this method. <b>If the Service does not handle the Request it
 474           	  must pass the Request to the Base class by calling <code>Base::_handle_async_request(req)</code></b>
 475           
 476           <pre>
 477           void MessageQueueServer::_handle_async_request(AsyncRequest *req)
 478           {
 479              if (req->getType() == 0x04100000 )
 480              {
 481                 req->op->processing();
 482                 handle_test_request(req);   // Message Handler 
 483              }
 484 mday  1.1    else if ( req->getType() == async_messages::CIMSERVICE_STOP )
 485              {
 486                 req->op->processing();
 487                 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 488              }
 489              
 490              else
 491                 Base::_handle_async_request(req);  // Give it to the Base !!
 492           }
 493           
 494           </pre>
 495           
 496           <li><b>Specific Message Handlers</b>
 497           
 498           	  Each Message handler will be defined by the format of the
 499           	  Request/Response pair. Here is an example from the test
 500           	  program. 
 501           
 502           <pre>
 503            
 504              if( msg-&gt;getType() == 0x04100000 )
 505 mday  1.1    {
 506           
 507           </pre>
 508           	  <ol>
 509           	    <li><b>Construct the Reply</b></li>
 510           <pre>
 511           
 512                 test_response *resp = 
 513           	 new test_response(msg-&gt;getKey(),
 514           			   msg-&gt;getRouting(),
 515           			   msg-&gt;op, 
 516           			   async_results::OK,
 517           			   msg-&gt;dest, 
 518           			   "i am a test response");
 519           
 520           
 521           </pre>
 522           	    <li><b>Complete the Reply</b> by calling the following
 523           	    helper routine in the Base class</li>
 524           
 525           <pre>
 526 mday  1.1       _completeAsyncResponse(msg, resp, ASYNC_OPSTATE_COMPLETE, 0);
 527           
 528           
 529              }
 530           </pre>
 531           
 532           </ol>
 533           	</p>
 534           
 535           
 536               <hr>
 537           <h2>Class Definitions</h2>
 538           
 539           <h3>cimom (Meta Dispatcher)</h3>
 540           <pre>
 541           class PEGASUS_COMMON_LINKAGE cimom : public MessageQueue
 542           {
 543              public : 
 544                 cimom(void);
 545                 
 546                 virtual ~cimom(void) ;
 547 mday  1.1             
 548                 Boolean moduleChange(struct timeval last);
 549                 
 550                 Uint32 getModuleCount(void);
 551                 Uint32 getModuleIDs(Uint32 *ids, Uint32 count) throw(IPCException);
 552           
 553                 AsyncOpNode *get_cached_op(void) throw(IPCException);
 554                 void cache_op(AsyncOpNode *op) throw(IPCException);
 555                       
 556                 void set_default_op_timeout(const struct timeval *buffer);
 557                 void get_default_op_timeout(struct timeval *timeout) const ;
 558           
 559                 virtual void handleEnqueue();
 560                 void register_module(RegisterCimService *msg);
 561                 void deregister_module(Uint32 quid);
 562                 void update_module(UpdateCimService *msg );
 563                 void ioctl(AsyncIoctl *msg );
 564           
 565                 void find_service_q(FindServiceQueue *msg );
 566                 void enumerate_service(EnumerateService *msg );
 567                 Boolean route_async(AsyncOpNode *operation);
 568 mday  1.1       void _shutdown_routed_queue(void);
 569                 
 570                       
 571              protected:
 572                 Uint32 get_module_q(const String & name);
 573                 void _make_response(AsyncRequest *req, Uint32 code);
 574                 void _completeAsyncResponse(AsyncRequest *request, 
 575           				  AsyncReply *reply, 
 576           				  Uint32 state, 
 577           				  Uint32 flag);
 578              private:
 579                 struct timeval _default_op_timeout;
 580                 struct timeval _last_module_change;
 581                 DQueue&lt;message_module&gt; _modules;
 582           
 583                 DQueue&lt;AsyncOpNode&gt; _recycle;
 584                 
 585                 AsyncDQueue&lt;AsyncOpNode&gt; _routed_ops;
 586                 DQueue&lt;AsyncOpNode&gt; _internal_ops;
 587                 
 588                 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _routing_proc(void *);
 589 mday  1.1 
 590                 Thread _routing_thread;
 591           
 592                 static Uint32 get_xid(void);
 593                 void _handle_cimom_op(AsyncOpNode *op, Thread *thread, MessageQueue *queue);
 594                 Uint32 _ioctl(Uint32, Uint32, void *);
 595           
 596           
 597                 AtomicInt _die;
 598                 AtomicInt _routed_queue_shutdown;
 599                 
 600                 static AtomicInt _xid;
 601                 
 602           //       CIMOperationRequestDispatcher *_cim_dispatcher;
 603           //       CIMOperationResponseEncoder *_cim_encoder;
 604           //       CIMOperationRequestDecoder *_cim_decoder;
 605           //       CIMRepository *_repository;
 606                 
 607           };
 608           
 609           </pre>
 610 mday  1.1 	  
 611           
 612           <h3>MessageQueueService</h3>
 613           
 614           <pre>
 615           
 616           class message_module;
 617           
 618           class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
 619           {
 620              public:
 621           
 622                 typedef MessageQueue Base;
 623                 
 624                 MessageQueueService(const char *name, Uint32 queueID, Uint32 capabilities, Uint32 mask) ;
 625                 
 626                 virtual ~MessageQueueService(void);
 627                 
 628                 virtual void handle_heartbeat_request(AsyncRequest *req);
 629                 virtual void handle_heartbeat_reply(AsyncReply *rep);
 630                 
 631 mday  1.1       virtual void handle_AsyncIoctl(AsyncIoctl *req);
 632                 virtual void handle_CimServiceStart(CimServiceStart *req);
 633                 virtual void handle_CimServiceStop(CimServiceStop *req);
 634                 virtual void handle_CimServicePause(CimServicePause *req);
 635                 virtual void handle_CimServiceResume(CimServiceResume *req);
 636                 
 637                 virtual void handle_AsyncOperationStart(AsyncOperationStart *req);
 638                 virtual void handle_AsyncOperationResult(AsyncOperationResult *req);
 639                 virtual Boolean accept_async(AsyncOpNode *op);
 640                 virtual Boolean messageOK(const Message *msg) ;
 641           
 642                 AsyncReply *SendWait(AsyncRequest *request);
 643                 
 644                 void _completeAsyncResponse(AsyncRequest *request, 
 645           				 AsyncReply *reply, 
 646           				 Uint32 state, 
 647           				 Uint32 flag);
 648                 Boolean register_service(String name, Uint32 capabilities, Uint32 mask);
 649                 Boolean update_service(Uint32 capabilities, Uint32 mask);
 650                 Boolean deregister_service(void);
 651                 virtual void _shutdown_incoming_queue(void);
 652 mday  1.1       void find_services(String name,
 653           			 Uint32 capabilities, 
 654           			 Uint32 mask, 
 655           			 Array&lt;Uint32&gt; *results);
 656                 void enumerate_service(Uint32 queue, message_module *result);
 657                 Uint32 get_next_xid(void);
 658                 AsyncOpNode *get_op(void);
 659                 void return_op(AsyncOpNode *op);
 660                 Uint32 _capabilities;
 661                 Uint32 _mask;
 662                 AtomicInt _die;
 663              protected:
 664           
 665                 virtual void _handle_incoming_operation(AsyncOpNode *operation, Thread *thread, MessageQueue *queue);
 666                 virtual void _handle_async_request(AsyncRequest *req);
 667                 virtual void _make_response(AsyncRequest *req, Uint32 code);
 668                 cimom *_meta_dispatcher;
 669           
 670              private: 
 671                 void handleEnqueue();
 672                 DQueue&lt;AsyncOpNode&gt; _pending;
 673 mday  1.1       AsyncDQueue&lt;AsyncOpNode&gt; _incoming;
 674                 
 675                 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _req_proc(void *);
 676                 AtomicInt _incoming_queue_shutdown;
 677                 
 678                 Thread _req_thread;
 679                 
 680                 struct timeval _default_op_timeout;
 681           
 682                 static AtomicInt _xid;
 683           
 684            };
 685           
 686           
 687           </pre>
 688           
 689           <h3>Asynchronous Messages</h3>
 690           
 691           <pre>
 692           
 693           extern const Uint32 CIMOM_Q_ID;
 694 mday  1.1 
 695           class AsyncOpNode;
 696           
 697           class PEGASUS_COMMON_LINKAGE async_results
 698           {
 699              public:
 700                 static const Uint32 OK;
 701                 static const Uint32 PARAMETER_ERROR;
 702                 static const Uint32 MODULE_ALREADY_REGISTERED;
 703                 static const Uint32 MODULE_NOT_FOUND;
 704                 static const Uint32 INTERNAL_ERROR;
 705           
 706                 static const Uint32 ASYNC_STARTED;
 707                 static const Uint32 ASYNC_PROCESSING;
 708                 static const Uint32 ASYNC_COMPLETE;
 709                 static const Uint32 ASYNC_CANCELLED;
 710                 static const Uint32 ASYNC_PAUSED;
 711                 static const Uint32 ASYNC_RESUMED;
 712           
 713                 static const Uint32 CIM_SERVICE_STARTED;
 714                 static const Uint32 CIM_SERVICE_STOPPED;
 715 mday  1.1       static const Uint32 CIM_SERVICE_PAUSED;
 716           
 717                 static const Uint32 CIM_SERVICE_RESUMED;
 718                 static const Uint32 CIM_NAK;
 719           
 720                 static const Uint32 ASYNC_PHASE_COMPLETE;
 721                 static const Uint32 ASYNC_CHILD_COMPLETE;
 722                 static const Uint32 ASYNC_PHASE_STARTED;
 723                 static const Uint32 ASYNC_CHILD_STARTED;
 724                 static const Uint32 CIM_PAUSED;
 725                 static const Uint32 CIM_STOPPED;
 726                 
 727           };
 728           
 729           
 730           class PEGASUS_COMMON_LINKAGE async_messages
 731           {
 732              public:
 733                 static const Uint32 HEARTBEAT;
 734                 static const Uint32 REPLY;
 735                 static const Uint32 REGISTER_CIM_SERVICE;
 736 mday  1.1       static const Uint32 DEREGISTER_CIM_SERVICE;
 737                 static const Uint32 UPDATE_CIM_SERVICE;
 738                 static const Uint32 IOCTL;
 739                 static const Uint32 CIMSERVICE_START;
 740                 static const Uint32 CIMSERVICE_STOP;
 741                 static const Uint32 CIMSERVICE_PAUSE;
 742                 static const Uint32 CIMSERVICE_RESUME;
 743           
 744                 static const Uint32 ASYNC_OP_START;
 745                 static const Uint32 ASYNC_OP_RESULT;
 746                 static const Uint32 ASYNC_LEGACY_OP_START;
 747                 static const Uint32 ASYNC_LEGACY_OP_RESULT;
 748           
 749                 static const Uint32 FIND_SERVICE_Q;
 750                 static const Uint32 FIND_SERVICE_Q_RESULT;
 751                 static const Uint32 ENUMERATE_SERVICE;
 752                 static const Uint32 ENUMERATE_SERVICE_RESULT;
 753           };
 754           
 755           
 756           class PEGASUS_COMMON_LINKAGE AsyncMessage : public Message
 757 mday  1.1 {
 758              public:
 759                 AsyncMessage(Uint32 type, 
 760           		   Uint32 key, 
 761           		   Uint32 routing,
 762           		   Uint32 mask,
 763           		   AsyncOpNode *operation);
 764                      
 765                 virtual ~AsyncMessage(void) 
 766                 {
 767           	 
 768                 }
 769                 
 770                 Boolean operator ==(void *key);
 771                 Boolean operator ==(const AsyncMessage& msg);
 772                 
 773                 AsyncOpNode *op;
 774                 Thread *_myself;
 775                 MessageQueue *_service;
 776           };
 777           
 778 mday  1.1 
 779           inline Boolean AsyncMessage::operator ==(void *key)
 780           {
 781              if( key == reinterpret_cast&lt;void *&gt;(this))
 782                 return true;
 783              return false;
 784           }
 785           
 786           inline Boolean AsyncMessage::operator ==(const AsyncMessage& msg)
 787           {
 788              return this-&gt;operator==(reinterpret_cast&lt;void *&gt;(const_cast&lt;AsyncMessage *&gt;(&msg)));
 789           }
 790           
 791           
 792           class PEGASUS_COMMON_LINKAGE AsyncRequest : public AsyncMessage
 793           {
 794              public:
 795                 AsyncRequest(Uint32 type, 
 796           		   Uint32 key, 
 797           		   Uint32 routing,
 798           		   Uint32 mask,
 799 mday  1.1 		   AsyncOpNode *operation,
 800           		   Uint32 destination,
 801           		   Uint32 response,
 802           		   Boolean blocking);
 803                 
 804                 
 805                 virtual ~AsyncRequest(void) 
 806                 {
 807           
 808                 }
 809                       
 810                 Uint32 dest;
 811                 Uint32 resp;
 812                 Boolean block;
 813           };
 814           
 815           class PEGASUS_COMMON_LINKAGE AsyncReply : public AsyncMessage
 816           {
 817              public:
 818                 AsyncReply(Uint32 type, 
 819           		 Uint32 key, 
 820 mday  1.1 		 Uint32 routing, 
 821           		 Uint32 mask,
 822           		 AsyncOpNode *operation,
 823           		 Uint32 result_code,
 824           		 Uint32 destination,
 825           		 Boolean blocking);
 826                 
 827                 
 828                 virtual ~AsyncReply(void)
 829                 {
 830           	 if(op != 0 )
 831           	    delete op;
 832           	 
 833                 }
 834                       
 835                 Uint32 result;
 836                 Uint32 dest;
 837                 Boolean block;
 838           };
 839           
 840           
 841 mday  1.1 
 842           class PEGASUS_COMMON_LINKAGE RegisterCimService : public AsyncRequest
 843           {
 844              public: 
 845                 RegisterCimService(Uint32 routing, 
 846           			 AsyncOpNode *operation,
 847           			 Boolean blocking,
 848           			 String service_name,
 849           			 Uint32 service_capabilities, 
 850           			 Uint32 service_mask,
 851           			 Uint32 service_queue);
 852                 
 853                 virtual ~RegisterCimService(void) 
 854                 {
 855           
 856                 }
 857                 
 858                 String name;
 859                 Uint32 capabilities;
 860                 Uint32 mask;
 861                 Uint32 queue;
 862 mday  1.1 };
 863           
 864           class PEGASUS_COMMON_LINKAGE DeRegisterCimService : public AsyncRequest
 865           {
 866              public:
 867                 DeRegisterCimService(Uint32 routing, 
 868           			   AsyncOpNode *operation,
 869           			   Boolean blocking, 
 870           			   Uint32 service_queue);
 871                 
 872                 
 873                 virtual ~DeRegisterCimService(void)
 874                 {
 875           
 876                 }
 877                 
 878                 Uint32 queue;
 879           } ;
 880           
 881           class PEGASUS_COMMON_LINKAGE UpdateCimService : public AsyncRequest
 882           {
 883 mday  1.1    public:
 884                 UpdateCimService(Uint32 routing, 
 885           		       AsyncOpNode *operation,
 886           		       Boolean blocking, 
 887           		       Uint32 service_queue, 
 888           		       Uint32 service_capabilities, 
 889           		       Uint32 service_mask);
 890           
 891                 virtual ~UpdateCimService(void) 
 892                 {
 893           
 894                 }
 895                 
 896                 Uint32 queue;
 897                 Uint32 capabilities;
 898                 Uint32 mask;
 899           };
 900           
 901           
 902           class PEGASUS_COMMON_LINKAGE AsyncIoctl : public AsyncRequest
 903           {
 904 mday  1.1    public:
 905                 AsyncIoctl(Uint32 routing, 
 906           		 AsyncOpNode *operation, 
 907           		 Uint32 destination, 
 908           		 Uint32 response,
 909           		 Boolean blocking,
 910           		 Uint32 code, 
 911           		 Uint32 int_param,
 912           		 void *p_param);
 913           
 914                 virtual ~AsyncIoctl(void)
 915                 {
 916           
 917                 }
 918                 
 919                 enum 
 920                 {
 921           	 IO_CLOSE,
 922           	 IO_OPEN,
 923           	 IO_SOURCE_QUENCH,
 924           	 IO_SERVICE_DEFINED
 925 mday  1.1       };
 926                 
 927                 
 928           
 929                 Uint32 ctl;
 930                 Uint32 intp;
 931                 void *voidp;
 932           
 933           };
 934           
 935           class PEGASUS_COMMON_LINKAGE CimServiceStart : public AsyncRequest
 936           {
 937              public:
 938                 CimServiceStart(Uint32 routing, 
 939           		      AsyncOpNode *operation, 
 940           		      Uint32 destination, 
 941           		      Uint32 response, 
 942           		      Boolean blocking);
 943                 
 944                 virtual ~CimServiceStart(void) 
 945                 {
 946 mday  1.1 	 
 947                 }
 948           };
 949           
 950           
 951           class PEGASUS_COMMON_LINKAGE CimServiceStop : public AsyncRequest
 952           {
 953              public:
 954                 CimServiceStop(Uint32 routing, 
 955           		     AsyncOpNode *operation, 
 956           		     Uint32 destination, 
 957           		     Uint32 response, 
 958           		     Boolean blocking);
 959                       
 960                 virtual ~CimServiceStop(void) 
 961                 {
 962           
 963                 }
 964           };
 965           
 966           class PEGASUS_COMMON_LINKAGE CimServicePause : public AsyncRequest
 967 mday  1.1 {
 968              public:
 969                 CimServicePause(Uint32 routing, 
 970           		      AsyncOpNode *operation, 
 971           		      Uint32 destination, 
 972           		      Uint32 response, 
 973           		      Boolean blocking);
 974                 
 975                 
 976                 virtual ~CimServicePause(void)
 977                 {
 978           
 979                 }
 980           };
 981           
 982           class PEGASUS_COMMON_LINKAGE CimServiceResume : public AsyncRequest
 983           {
 984              public:
 985                 CimServiceResume(Uint32 routing, 
 986           		       AsyncOpNode *operation, 
 987           		       Uint32 destination, 
 988 mday  1.1 		       Uint32 response, 
 989           		       Boolean blocking);
 990                 
 991                 
 992                 virtual ~CimServiceResume(void)
 993                 {
 994           
 995                 }
 996           };
 997           
 998           class PEGASUS_COMMON_LINKAGE AsyncOperationStart : public AsyncRequest
 999           {
1000              public:
1001                 AsyncOperationStart(Uint32 routing, 
1002           			  AsyncOpNode *operation, 
1003           			  Uint32 destination, 
1004           			  Uint32 response, 
1005           			  Boolean blocking, 
1006           			  Message *action);
1007                 
1008           
1009 mday  1.1       virtual ~AsyncOperationStart(void)
1010                 {
1011           
1012                 }
1013                 
1014                 Message *act;
1015           };
1016           
1017           class PEGASUS_COMMON_LINKAGE AsyncOperationResult : public AsyncReply
1018           {
1019              public:
1020                 AsyncOperationResult(Uint32 key, 
1021           			   Uint32 routing, 
1022           			   AsyncOpNode *operation,
1023           			   Uint32 result_code, 
1024           			   Uint32 destination,
1025           			   Uint32 blocking);
1026                 
1027           
1028                 virtual ~AsyncOperationResult(void)
1029                 {
1030 mday  1.1 
1031                 }
1032           };
1033           
1034           
1035           class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationStart : public AsyncRequest
1036           {
1037              public:
1038                 AsyncLegacyOperationStart(Uint32 routing, 
1039           				AsyncOpNode *operation, 
1040           				Uint32 destination, 
1041           				Message *action);
1042                 
1043                 
1044                 virtual ~AsyncLegacyOperationStart(void)
1045                 {
1046           
1047                 }
1048                 
1049                 Message *act;
1050           };
1051 mday  1.1 
1052           class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationResult : public AsyncReply
1053           {
1054              public:
1055                 AsyncLegacyOperationResult(Uint32 key, 
1056           				 Uint32 routing, 
1057           				 AsyncOpNode *operation,
1058           				 Message *result);
1059                 
1060                 virtual ~AsyncLegacyOperationResult(void)
1061                 {
1062           
1063                 }
1064           
1065                 Message *res;
1066           };
1067           
1068           
1069           class PEGASUS_COMMON_LINKAGE FindServiceQueue : public AsyncRequest
1070           {
1071              public:
1072 mday  1.1       FindServiceQueue(Uint32 routing, 
1073           		       AsyncOpNode *operation, 
1074           		       Uint32 response,
1075           		       Boolean blocking, 
1076           		       String service_name, 
1077           		       Uint32 service_capabilities, 
1078           		       Uint32 service_mask);
1079                 
1080                 virtual ~FindServiceQueue(void)
1081                 {
1082           
1083                 }
1084                 
1085                 String name;
1086                 Uint32 capabilities;
1087                 Uint32 mask;
1088           } ;
1089           
1090           class PEGASUS_COMMON_LINKAGE FindServiceQueueResult : public AsyncReply
1091           {
1092              public:
1093 mday  1.1       FindServiceQueueResult(Uint32 key, 
1094           			     Uint32 routing, 
1095           			     AsyncOpNode *operation, 
1096           			     Uint32 result_code, 
1097           			     Uint32 destination, 
1098           			     Boolean blocking, 
1099           			     Array&lt;Uint32&gt; queue_ids);
1100                 
1101                 
1102                 virtual ~FindServiceQueueResult(void)
1103                 {
1104           
1105                 }
1106                 
1107                 Array&lt;Uint32&gt; qids;
1108           } ;
1109           
1110           class PEGASUS_COMMON_LINKAGE EnumerateService : public AsyncRequest
1111           {
1112              public:
1113                 EnumerateService(Uint32 routing, 
1114 mday  1.1 		       AsyncOpNode *operation, 
1115           		       Uint32 response, 
1116           		       Boolean blocking, 
1117           		       Uint32 queue_id);
1118                 
1119                 
1120                 virtual ~EnumerateService(void)
1121                 {
1122           
1123                 }
1124                 
1125                 Uint32 qid;
1126           };
1127           
1128           class PEGASUS_COMMON_LINKAGE EnumerateServiceResponse : public AsyncReply
1129           {
1130              public:
1131                 EnumerateServiceResponse(Uint32 key, 
1132           			       Uint32 routing, 
1133           			       AsyncOpNode *operation, 
1134           			       Uint32 result_code, 
1135 mday  1.1 			       Uint32 response, 
1136           			       Boolean blocking,
1137           			       String service_name, 
1138           			       Uint32 service_capabilities, 
1139           			       Uint32 service_mask, 
1140           			       Uint32 service_qid);
1141                 
1142                 
1143                 virtual ~EnumerateServiceResponse(void)
1144                 {
1145           
1146                 }
1147                 
1148                 String name;
1149                 Uint32 capabilities;
1150                 Uint32 mask;
1151                 Uint32 qid;
1152           };
1153           
1154           </pre>
1155           
1156 mday  1.1 <h3>AsyncOPNode</h3>
1157           
1158           <pre>
1159           #define ASYNC_OPFLAGS_UNKNOWN           0x00000000
1160           #define ASYNC_OPFLAGS_INTERVAL_REPEAT   0x00000010
1161           #define ASYNC_OPFLAGS_INDICATION        0x00000020
1162           #define ASYNC_OPFLAGS_REMOTE            0x00000040
1163           #define ASYNC_OPFLAGS_LOCAL_OUT_OF_PROC 0x00000080
1164           #define ASYNC_OPFLAGS_PHASED            0x00000001
1165           #define ASYNC_OPFLAGS_PARTIAL           0x00000002
1166           #define ASYNC_OPFLAGS_NORMAL            0x00000000
1167           #define ASYNC_OPFLAGS_SINGLE            0x00000008
1168           #define ASYNC_OPFLAGS_MULTIPLE          0x00000010
1169           #define ASYNC_OPFLAGS_TOTAL             0x00000020
1170           #define ASYNC_OPFLAGS_META_DISPATCHER   0x00000040
1171           
1172           #define ASYNC_OPSTATE_UNKNOWN           0x00000000
1173           #define ASYNC_OPSTATE_OFFERED           0x00000001
1174           #define ASYNC_OPSTATE_DECLINED          0x00000002
1175           #define ASYNC_OPSTATE_STARTED           0x00000004
1176           #define ASYNC_OPSTATE_PROCESSING        0x00000008
1177 mday  1.1 #define ASYNC_OPSTATE_DELIVER           0x00000010 
1178           #define ASYNC_OPSTATE_RESERVE           0x00000020
1179           #define ASYNC_OPSTATE_COMPLETE          0x00000040
1180           #define ASYNC_OPSTATE_TIMEOUT           0x00000080
1181           #define ASYNC_OPSTATE_CANCELLED         0x00000100
1182           #define ASYNC_OPSTATE_PAUSED            0x00000200
1183           #define ASYNC_OPSTATE_SUSPENDED         0x00000400
1184           #define ASYNC_OPSTATE_RESUMED           0x00000800
1185           #define ASYNC_OPSTATE_ORPHANED          0x00001000
1186           #define ASYNC_OPSTATE_RELEASED          0x00002000
1187           
1188           class Cimom;
1189           
1190           class PEGASUS_COMMON_LINKAGE AsyncOpNode
1191           {
1192              public:
1193           
1194                 AsyncOpNode(void);
1195                 ~AsyncOpNode(void);
1196                       
1197                 Boolean  operator == (const void *key) const;
1198 mday  1.1       Boolean operator == (const AsyncOpNode & node) const;
1199           
1200                 void get_timeout_interval(struct timeval *buffer) ;
1201                 void set_timeout_interval(const struct timeval *interval);
1202                 
1203                 Boolean timeout(void)  ;
1204           
1205                 OperationContext & get_context(void) ;
1206           
1207                 void put_request(const Message *request) ;
1208                 Message *get_request(void) ;
1209                 
1210                 void put_response(const Message *response) ;
1211                 Message *get_response(void) ;
1212                 
1213                 Uint32 read_state(void) ;
1214                 void write_state(Uint32) ;
1215                 
1216                 Uint32 read_flags(void);
1217                 void write_flags(Uint32);
1218                 
1219 mday  1.1       void lock(void)  throw(IPCException);
1220                 void unlock(void) throw(IPCException);
1221                 void udpate(void) throw(IPCException);
1222                 void deliver(const Uint32 count) throw(IPCException);
1223                 void reserve(const Uint32 size) throw(IPCException);
1224                 void processing(void) throw(IPCException) ;
1225                 void processing(OperationContext *context) throw(IPCException);
1226                 void complete(void) throw(IPCException) ;
1227                 void complete(OperationContext *context) throw(IPCException);
1228                 void release(void);
1229                 void wait(void);
1230                 
1231                 
1232              private:
1233                 Semaphore _client_sem;
1234                 Mutex _mut;
1235                 unlocked_dq&lt;Message&gt; _request;
1236                 unlocked_dq&lt;Message&gt; _response; 
1237           
1238                 OperationContext _operation_list;
1239                 Uint32 _state;
1240 mday  1.1       Uint32 _flags;
1241                 Uint32 _offered_count;
1242                 Uint32 _total_ops;
1243                 Uint32 _completed_ops;
1244                 Uint32 _user_data;
1245                 
1246                 struct timeval _start;
1247                 struct timeval _lifetime;
1248                 struct timeval _updated;
1249                 struct timeval _timeout_interval;
1250           
1251                 AsyncOpNode *_parent;
1252                 unlocked_dq&lt;AsyncOpNode&gt; _children;
1253           
1254                 void _reset(unlocked_dq&lt;AsyncOpNode&gt; *dst_q);
1255           
1256                 // the lifetime member is for cache management by the cimom
1257                 void _set_lifetime(struct timeval *lifetime) ;
1258                 Boolean _check_lifetime(void) ;
1259           
1260                 Boolean _is_child(void) ;
1261 mday  1.1       Uint32 _is_parent(void) ;
1262           
1263                 Boolean _is_my_child(const AsyncOpNode & caller) const;
1264                 void _make_orphan( AsyncOpNode & parent) ;
1265                 void _adopt_child(AsyncOpNode *child) ;
1266                 void _disown_child(AsyncOpNode *child) ;
1267                 friend class cimom;
1268                 friend class MessageQueueService;
1269                 
1270           };
1271           </pre>
1272           
1273           <hr>
1274               <address><a href="mailto:mdday@us.ibm.com">Michael Day</a></address>
1275           <!-- Created: Tue Feb  5 13:21:55 EST 2002 -->
1276           <!-- hhmts start -->
1277           Last modified: Tue Feb  5 18:09:50 EST 2002
1278           <!-- hhmts end -->
1279             </body>
1280           </html>
1281           
1282 mday  1.1 

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2