(file) Return to MessageQueueService.html CVS log (file) (dir) Up to [Pegasus] / pegasus / doc

File: [Pegasus] / pegasus / doc / MessageQueueService.html (download) / (as text)
Revision: 1.3, Tue Mar 12 22:57:40 2002 UTC (22 years, 2 months ago) by mday
Branch: MAIN
Changes since 1.2: +172 -14 lines
*** empty log message ***

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html lang="en"><head>
<META http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"><meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"><title>XML Pointer Language (XPointer) Version 1.0</title><style type="text/css">

body {
  margin: 2em 1em 2em;
  font-family: sans-serif;
  color: black;
  background: white;
  background-position: top left;
  background-attachment: fixed;
  background-repeat: no-repeat;
}
:link { color: #00C; background: transparent }
:visited { color: #609; background: transparent }
:active { color: #C00; background: transparent }

th, td { /* ns 4 */
  font-family: sans-serif;
}

h1, h2, h3, h4, h5, h6 { text-align: left }
/* background should be transparent, but WebTV has a bug */
h1, h2, h3 { color: #005A9C; background: white }
h1 { font: 170% sans-serif }
h2 { font: 140% sans-serif }
h3 { font: 120% sans-serif }
h4 { font: bold 100% sans-serif }
h5 { font: italic 100% sans-serif }
h6 { font: small-caps 100% sans-serif }

.hide { display: none }

div.head { margin-bottom: 1em }
div.head h1 { margin-top: 2em; clear: both }
div.head table { margin-left: 2em; margin-top: 2em }
div.head img { color: white; border: none } /* remove border from top image */

p.copyright { font-size: small }
p.copyright small { font-size: small }

pre { margin-left: 2em }
/*
p {
  margin-top: 0.6em;
  margin-bottom: 0.6em;
}
*/
dt, dd { margin-top: 0; margin-bottom: 0 } /* opera 3.50 */
dt { font-weight: bold }

pre, code { font-family: monospace } /* navigator 4 requires this */

ul.toc {
  list-style: disc;		/* Mac NS has problem with 'none' */
  list-style: none;
}

code           { font-family: monospace; }

div.constraint,
div.issue,
div.note,
div.notice     { margin-left: 2em; }

li p           { margin-top: 0.3em;
                 margin-bottom: 0.3em; }
      
div.exampleInner pre { margin-left: 1em;
                       margin-top: 0em; margin-bottom: 0em}
div.exampleOuter {border: 4px double gray;
                  margin: 0em; padding: 0em}
div.exampleInner { background-color: #d5dee3;
                   border-top-width: 4px;
                   border-top-style: double;
                   border-top-color: #d3d3d3;
                   border-bottom-width: 4px;
                   border-bottom-style: double;
                   border-bottom-color: #d3d3d3;
                   padding: 4px; margin: 0em }
div.exampleWrapper { margin: 4px }
div.exampleHeader { font-weight: bold;
                    margin: 4px}

table { background-color: #d5dee3;
        width: 85% ;
        border-style: double;
        border-width: 4px;
        border-color: #d3d3d3;
}
        </style>
  
<html>
  <head>
    <title>Pegasus Meta Dispatcher</title>
  </head>

  <body>

<p class="copyright">this is copyright text</p>

<div class="constraint">this is a constraint</div>

<div class="exampleInner"><code>this is an inner example</code></div>
    <h1>Pegasus Meta Dispatcher</h1>
    <p>
      The Pegasus Meta Dispatcher is a set of classes that extend the
      existing MessageQueue messaging system to be dynamic,
      asynchronous, and multithreaded. The primary classes consist of the
      folowing:
    </p>
	<table> 
	    <tr align="left"><th>Class</th><th>Derived from</th><th>Source file</th></tr>
	    <tr><td><div class="exampleInner">cimom</div></td><td>MessageQueue</td><td>Pegasus/Common/Cimom.h</td></tr>
	    <tr><td>MessageQueueService</td><td>MessageQueue</td><td>Pegasus/Common/MessageQueueServices.h</td></tr>
	    <tr><td>CimomMessage</td><td>Message</td><td>Pegasus/Common/CimomMessage.h</td></tr>
	<tr><td>AsyncOpNode</td><td>n/a</td><td>Pegasus/Common/AsyncOpNode.h</td></tr>
	    <tr><td>AsyncDQueue</td><td>unlocked_dq</td><td>Pegasus/Common/DQueue.h</td></tr>
	    <tr><td>IPC classes</td><td>n/a</td><td>Pegasus/Common/IPC.h</td></tr>
	    <tr><td>Threading classes</td><td>n/a</td><td>Pegasus/Common/Thread.h</td></tr>

	</table>
	
	<br>
	<br>

      <h2>Purposes of Meta Dispatcher</h2>
    <p>
      The Meta Dispatcher has two primary goals:
      <ol>
      <li>Provide for orderly asynchronous message-based communication
	among a dynamic set of Pegasus Services.</li>
      <li>Preserve the existing message-passing architecture of
	Pegasus.</li>
      <li>Allow Pluggable Services such as repositories, provider
	managers, and others.</li>
    </ol>
<br>
<br>
    Most of the purposes listed above revolve around maintaining the
    integrity of data and control flow in an asynchronous
    multithreaded environment. 
    </p>

    <h2>Terms</h2>
    <p>
    <dl>
      <dt><b>Meta Dispatcher</b></dt><dd>The central message broker, or
	router, that provides the asynchronous communications within
	Pegasus. Derived from the <b>MessageQueue</b> class. <br>
      </dd>
      <dt><b>Service</b></dt><dd>A Pegasus module that sends and
	receives messages to other modules through the meta
	dispatcher; A module that has enhanced privileges within
	Pegasus and which provides one or more functions necessary to
	the operation of Pegasus. Derived from the
	<b>MessageQueue</b>class.<dt>
      <dt><b>Asynchronous Message</b></dt><dd>A <b>pair</b> of
	messages, consisting of a <b>request</b> and a <b>response</b>
	that are treated as a single operation by Services. An
	asynchronous message may be fronted by a synchronous
	programming interface. Derived from the <b>Message</b> class.</dd>
      <dt><b>AsyncOpNode</b></dt><dd>A control object that manages the
	lifetime of an <b>Asynchronous Message</b>.The AsyncOpNode uses
	many of the IPC object classes. A Service manages the lifetime
	of an AsyncOpNode during the processing of the
	message. However, it necessarily cedes control of the
	AsyncOpNode to the <b>Meta Dispatcher</b> while the
	Asynchronous Message is being processed.</dd>
    </dl>
    
    </p>

    <h2>Meta Dispatcher Design</h2>
    <p>
      Three points are necessary to avoid deadlocks and to
      provide pluggable services in Pegaus. The first thing is
      <b>independent execution paths</b> of service modules. i.e.,
      each service must have its own thread(s), which must not intersect
      with the thread(s) of other services. Intersection of execution
      paths can occur indirectly through IPC objects such as mutexes,
      conditions, and semaphores.
    </p>
    <p>
      The second point that is necessary is <b>interface
	abstraction</b>, which the Meta Dispatcher provides through
      C++ polymorphism. This allows pluggable services. i.e., one
      service can replace another and the system will continue to
      function (hopefully in an improved manner). 
    </p>
    <p>
      The third point that is neccesary is a <b>central
	message broker</b> that isolates services from each other,
      thereby preventing deadlocks. The central message broker also
      provides message responses for services that are paused,
      stopped, or not present (plugged in). 
    </p>


    <h3>Central Hub</h3>
    <p>
      The Meta Dispatcher therefore acts as a central message
      hub. Services communicate with each other <i>via</i> the Meta
      Dispatcher.
	<div class="exampleOuter"><div class="exampleInner">
      <pre>
      Service A--Message----1----> (block on semaphore)
                                  |
                                  |
                                  Meta Dispatcher| 
                                                 |
                                               Message----2->Service B
                                                                 |
                                                                 |
                           (Signal Semaphore) <---Response---3-- +
                                 |
      Service A <--Response--4---+

    </pre>

	  </div></div>
    The numbered steps above are as follows:
    <ol>
      <li><b>Service A</b> creates a new <code>AsyncMessage</code> and
	<code>AsyncOpNode</code>and sends that message to <b>Service
	  B</b> by calling
	<code>MessageQueueService::SendWait</code>. The calling thread
      blocks on the <b>client emaphore</b> until the response is ready.</li><br>

      <li>The Meta Dispatcher's routing thread picks up the message
	and inserts it into <b>Service B's</b> incoming message
	queue. The routing thread returns to the Meta Dispatcher to
	route the next message in the system.</li><br>

      <li><b>Service B's</b>incoming thread picks up the message and
	calls the its message handler. Message handlers are virtual,
	so a class derived from <b>MessageQueueService</b>can define
	its own message handlers to override the default
	handlers. When the message handler has constructed an
	<b>AsyncReply</b> that reply gets linked to the
	<b>AsyncOpNode</b>. The MessageQueueService then signals the
	<b>client semaphore</b> within the op node.</li><br>

      <li><b>Service A</b> awakens when the <b>client semaphore</b> is
	signalled. It pulls the <b>AsyncResponse</b> message from the
	<b>AsyncOpNode</b> and processes the result. Service A is
	responsible for discarding the request, response, and
	AsyncOpNode objects. The existing classes have mechanisms for
	caching these objects to avoid too frequent
	construction/destruction of them. 
    </ol>
    </p>

<h2>Test Program</h2>
      <p>
	The concepts explained below are all contained in the test
	program for the Meta Dispatcher, which is located in 
	<code>$PEGASUS_HOME/src/Pegasus/Common/tests/MessageQueueService/</code>

      </p>

      <h2>Service Registration and Deregistration</h2>
      <p>
	Services (classes derived from
	<code>MessageQueueService</code>must register their presence
	with the Meta Dispatcher. This is done as follows (taken from
	the test program):

	<ol>
	<li><b>Define the Service Class</b></li>
<pre>
// Define our service class 

class MessageQueueClient : public MessageQueueService
{
      
   public:
      typedef MessageQueueService Base;
      
      MessageQueueClient(char *name)
	 : Base(name, MessageQueue::getNextQueueId(), 0,  
		message_mask::type_cimom | 
		message_mask::type_service | 
		message_mask::ha_request | 
		message_mask::ha_reply | 
		message_mask::ha_async ),
	   client_xid(1)
      {  
	 _client_capabilities = Base::_capabilities;
	 _client_mask = Base::_mask;
      }
            
      virtual ~MessageQueueClient(void) 
      {
      }
      
      // method to indicate acceptance of message to 
      // Meta Dispatcher
      virtual Boolean messageOK(const Message *msg);

      // function to send a request to another service
      void send_test_request(char *greeting, Uint32 qid);
      Uint32 get_qid(void);
      
      Uint32 _client_capabilities;
      Uint32 _client_mask;
      
      // method to receive messages from the Meta Dispatcher,
      // MUST be defined
      virtual void _handle_async_request(AsyncRequest *req);

      AtomicInt client_xid;
};

</pre>
      <li><b>Construct the Service</b></li>
<pre>
// Create our Service
   MessageQueueClient *q_client = 
          new MessageQueueClient("test client");

</pre>
      <li><b>Register the Service</b></li>
<pre>
// Register our service with the Meta Dispatcher
   q_client-&gt;register_service("test client", 
                               q_client-&gt;_client_capabilities, 
                               q_client-&gt;_client_mask);
   cout &lt;&lt; " client registered " &lt;&lt; endl;
</pre>

</ol>

      The example above hides many of the details which are handled by
      the MessageQueueService's constructor, such as creating the
      background thread, finding the Meta Dispatcher, and constructing
      the queues. But a derived class as the example shows does not
      need to worry about those details. 
      </p>

<h2>Finding Other Services</h2>
      <p>
	The MessageQueueService class has an api for finding other
	services. This api is built using messages that are defined in
	<code>CimomMessage.h</code>. Here is an example from the test
	program:

<pre>

   Array&lt;Uint32&gt;; services; 

   while( services.size() == 0 )
   {
      q_client-&gt;find_services(String("test server"), 0, 0, &services); 
      pegasus_yield();  
   }
   
   cout &lt;&lt; "found server at " &lt;&lt; services[0] &lt;&lt; endl;


</pre>

      The code sample above shows how to find services by their
      name. The api also allows finding services by their capabilities
      or the messages they support. Note that the return is an array
      of Queue IDs. It is possible, for example, to find multiple
      services. 
      </p>

<h2>Sending an Asynchronous Message to Another Service</h2>
      <p>
	The "handle" for a services is its Queue ID. Once you have the
	Queue ID you can send a message to that service. The example
	above shows one way to get a service's Queue ID. Here is an
	example that shows how to send that service a message. 

<ol>
<li><b>Define the Request and Response Message Pair by Inheriting from AsyncMessage.</b></li>
<pre>

class test_request : public AsyncRequest
{
  
   public:
      typedef AsyncRequest Base;
      
      test_request(Uint32 routing, 
		   AsyncOpNode *op, 
		   Uint32 destination, 
		   Uint32 response,
		   char *message)
	 : Base(0x04100000,
		Message::getNextKey(), 
		routing,
		0, 
		op, 
		destination, 
		response, 
		true),
	   greeting(message) 
      {   
	 
      }
      
      virtual ~test_request(void) 
      {


      }
      
      String greeting;
};


class test_response : public AsyncReply
{
   public:
      typedef AsyncReply Base;
      

      test_response(Uint32 key, 
		    Uint32 routing,
		    AsyncOpNode *op, 
		    Uint32 result,
		    Uint32 destination, 
		    char *message)
	 : Base(0x04200000,
		key, 
		routing, 
		0, 
		op, 
		result, 
		destination,
		true), 
	   greeting(message) 
      {  
	 
      }
      
      virtual ~test_response(void)
      {
	 
      }
      
      String greeting;
};

</pre>

      The function <code>send_test_request</code> shows everything
      that is necessary to send a message to another service and
      process the reply. 

<pre>

void MessageQueueClient::send_test_request(char *greeting, Uint32 qid)
{

</pre>
<li><b>Construct the Request</b></li>

<pre>
   test_request *req = 
      new test_request(Base::get_next_xid(),
		       0,
		       qid,        // destination queue ID
		       _queueId,   // my own queue ID 
		       greeting);  // message parameter

</pre>

<li><b>Send the message using <code>MessageQueueService::SendWait</code></b></li>

<pre>
   AsyncMessage *response = SendWait(req);

</pre>

<li><b>Process the Response.</b></i>
<pre>
   if( response != 0  )
   {
      msg_count++; 
      delete response; 
      cout << " test message " << msg_count.value() << endl;
      
   }
   delete req;
}

</pre>

<li><b>Delete the Request and the Response. The
	    <code>SendWait</code> interface creates and disposes of
	    everything else.</b></li> 

</ol>
      </p>


<h2>Handling an Incoming Message </h2>

	<p>
	  To handle messages the service needs to implement the
	  following methods. 

<ol>

<li><b><code>virtual Boolean MessageOK(const Message
		*)</code></b></li>

	  This method allows the Service to accept or reject the
	message. The Meta Dispatcher will always call this method
	before inserting the request on the Service's queue. 

<pre>

Boolean MessageQueueServer::messageOK(const Message *msg)
{
   if(msg->getMask() & message_mask::ha_async)
   {
      if( msg->getType() == 0x04100000 ||
	  msg->getType() == async_messages::CIMSERVICE_STOP || 
	  msg->getType() == async_messages::CIMSERVICE_PAUSE || 
	  msg->getType() == async_messages::CIMSERVICE_RESUME )
      return true;
   }
   return false;
}

</pre>

<li><b><code>virtual Boolean accept_async(AsyncOpNode
	      *operation)</code> (optional) </b></li>

	This method executes on the Meta Dispatcher's thread and links
	the incoming message to the Service's queue. <br><br>



<li><b><code>virtual void _handle_incoming_operation(AsyncOpNode
	      *)</code></b></li><br>


This method is called by the Service's background thread. Here is an
	example implementation that just does some sanity checking on
	the message.

<pre>

void MessageQueueServer::_handle_incoming_operation(AsyncOpNode *op)
{
   if ( operation != 0 )
   {
      Message *rq = operation-&gt;get_request();
      PEGASUS_ASSERT(rq != 0 );
      PEGASUS_ASSERT(rq-&gt;getMask() & message_mask::ha_async );
      PEGASUS_ASSERT(rq-&gt;getMask() & message_mask::ha_request);
      _handle_async_request(static_cast&lt;AsyncRequest *&gt;(rq));
   }
     
   return;
   
}


</pre>

<li><b><code>virtual void _handle_async_request(AsyncRequest *)</code></b></li><br>
<br>

	This method handles the request. The Service must implement
	this method. <b>If the Service does not handle the Request it
	  must pass the Request to the Base class by calling <code>Base::_handle_async_request(req)</code></b>

<pre>
void MessageQueueServer::_handle_async_request(AsyncRequest *req)
{
   if (req->getType() == 0x04100000 )
   {
      req->op->processing();
      handle_test_request(req);   // Message Handler 
   }
   else if ( req->getType() == async_messages::CIMSERVICE_STOP )
   {
      req->op->processing();
      handle_CimServiceStop(static_cast<CimServiceStop *>(req));
   }
   
   else
      Base::_handle_async_request(req);  // Give it to the Base !!
}

</pre>

<li><b>Specific Message Handlers</b>

	  Each Message handler will be defined by the format of the
	  Request/Response pair. Here is an example from the test
	  program. 

<pre>
 
   if( msg-&gt;getType() == 0x04100000 )
   {

</pre>
	  <ol>
	    <li><b>Construct the Reply</b></li>
<pre>

      test_response *resp = 
	 new test_response(msg-&gt;getKey(),
			   msg-&gt;getRouting(),
			   msg-&gt;op, 
			   async_results::OK,
			   msg-&gt;dest, 
			   "i am a test response");


</pre>
	    <li><b>Complete the Reply</b> by calling the following
	    helper routine in the Base class</li>

<pre>
      _completeAsyncResponse(msg, resp, ASYNC_OPSTATE_COMPLETE, 0);

   }
</pre>

</ol>
	</p>

<h2>Handling CIMMessage and Other Pre-existing Message Classes</h2>
	  <p>
	    Existing Messages, including all of the <code>CIMMessage</code>
	    derivitives, are not configured to be asynchronous
	    request/reply pairs. They are designed to travel through
	    Pegasus as events that trigger other processing events,
	    which is the end of their lifetime. This is not an optimal
	    use model for asynchronous operation because the
	    originator of the event does not require nor receive any
	    completion notification. Further, there is not a
	    one-to-one correspondence of "event messages" to replies. 
	  </p>
	  
	  <h3>AsyncLegacyOperationStart Message</h3>
	  <p>
	    The AsyncLegacyOperationStart message is an envelope that
	    allows a <code>MessageQueueService</code>-based service to
	    send, receive, and process pre-existing "legacy"
	    messages. 
	  </p>
	  <p>
	    The <code>AsyncLegacyOperationStart</code> Message allows
	    an asynchronous service to create, package, and send a
	    "legacy" message to another service or, indirectly,
	    enqueue it to a non-asynchronous message queue. The code
	    example below shows how this works:
	  </p>

<pre>

   cout &lt;&lt; " sending LEGACY to test server" &lt;&lt; endl;
   
   Message *legacy = new Message(0x11100011, 
				 Message::getNextKey());
   
   AsyncLegacyOperationStart *req = 
      new AsyncLegacyOperationStart(q_client-&gt;get_next_xid(), 
				    0, 
				    services[0],
				    legacy, 
				    q_client-&gt;getQueueId());
   reply = q_client-&gt;SendWait(req);
   delete req;
   delete reply;
   
</pre>
	  <p>
	   The code sample above shows a <code>Message</code> object
	   being embedded inside an
	    <code>AsyncLegacyOperationStart</code> message and sent
	    using the <code>SendWait</code>API. 
	  </p>

	  <h3>Default Handler for Legacy Messages</h3>
	  <p>
	    The <code>MessageQueueService</code> class has a default
	    handler for legacy messages that extracts the
	    <code>Message</code> out of its asynchronous "envelope"
	    and dispatches it using the pre-existing synchronous
	    interface, as shown below. 
	  </p>

<pre>

void MessageQueueService::handle_AsyncLegacyOperationStart(
                                               AsyncLegacyOperationStart *req)
{
   // remove the legacy message from the request and enqueue it to its destination
   Uint32 result = async_results::CIM_NAK;
   
   Message *legacy = req-&gt;act;
   if ( legacy != 0 )
   {
      MessageQueue* queue = MessageQueue::lookup(req-&gt;legacy_destination);
      if( queue != 0 )
      {
	// Enqueue the response:
	 queue-&gt;enqueue(legacy);
	 result = async_results::OK;
      }
   }
   _make_response(req, result);
}

</pre>

	  <p>
	    The default handler shown above extracts the legacy
	    message and attempts to <code>enqueue</code> that message
	    syncrhonously using the pre-existing interface.
	  </p>

<h3>Example of Custom Handler for Legacy Messages</h3>
	  <p>
	    By implementing the virtual
	    <code>_handle_async_request</code> method, 
	    a service can choose to implement its own handler for
	    Legacy messages, as the code below shows:
	  </p>

<ol>

<li><b>Implement the virtual <code>_handle_async_request</code> method.</b></li>
<pre>

void MessageQueueServer::_handle_async_request(AsyncRequest *req)
{
   if (req->getType() == 0x04100000 )
   {
      req->op->processing();
      handle_test_request(req);
   }
   else if ( req->getType() == async_messages::CIMSERVICE_STOP )
   {
      req->op->processing();
      handle_CimServiceStop(static_cast<CimServiceStop *>(req));
   }
</pre>
<li><b>Implement a dispatcher for <code>ASYNC_LEGACY_OP_START</code></b></li>
<pre>
   else if ( req->getType() == async_messages::ASYNC_LEGACY_OP_START )
   {
      req->op->processing();
      handle_LegacyOpStart(static_cast<AsyncLegacyOperationStart *>(req));
   }
   
   else
      Base::_handle_async_request(req);
}

</pre>
<li><b>Implement a dispatcher for <code>ASYNC_LEGACY_OP_START</code></b></li>
<pre>

void MessageQueueServer::handle_LegacyOpStart(AsyncLegacyOperationStart *req)
{

   Message *legacy = req-&gt;act;
   cout &lt;&lt; " ### handling legacy messages " &lt;&lt; endl;
   

      AsyncReply *resp =  
	 new AsyncReply(async_messages::REPLY, 
			req-&gt;getKey(), 
			req-&gt;getRouting(), 
			0, 
			req-&gt;op, 
			async_results::OK, 
			req-&gt;resp, 
			req-&gt;block);
      _completeAsyncResponse(req, resp, ASYNC_OPSTATE_COMPLETE, 0 );

      if (legacy != 0 )
	 cout &lt;&lt; " legacy msg type: " &lt;&lt; legacy-&gt;getType() &lt;&lt; endl;
      
}

</pre>


</ol>

    <hr>



<h2>Sending Messages without Blocking (Async with Callback)</h2>

	      <p>
		Whenever there is a possibility that the processing of
		one message may generate a nested message (message
		generated within the handler of a message) it is
		necessary to send messages without blocking, and to
		receive responses via callback routines. The diagram
		below shows the (more complicated) flow of
		non-blocking messages. 
	      </p>
<br>
	      <div class="exampleOuter"><div class="exampleInner">
      <pre>
      Service A--Message----1----> 
                               |
        . <-----------(return)-+----->-(loop)--->-+
        .                      |  Meta Dispatcher | 
        .                      +----<-----<---<---+
        .                                      Message---2-->Service B
        .                                                            |
        .                                        <--Response--3------+
        .                                        | 
        .                        +--<--<-----<-----+--(return)---->
        .                        | Meta Dispatcher |  
      Service A <--Callback--4---+--->-(loop)-->---+
                   |       ^
                   +-------+
    </pre>
		</div></div>

	      <h3>Test Program</h3>
	      
	      <p>
		There is a test program that sends and receives
		non-blocking messages in 
		<code>$(PEGASUS_ROOT)/src/Pegasus/Common/tests/async_callback/</code>
	      </p>

	      <h3>SendAsync method</h3>

	      <p>
		The <code>MessageQueueService</code> class sends
		non-blocking messages using the <code>SendAsync</code>
		method from <code>MessageQueueService.h</code>.
	      </p>
<pre>
      Boolean <b><font color=#000000>SendAsync</font></b><font color=#990000>(</font>AsyncOpNode <font color=#990000>*</font>op<font color=#990000>,</font> 
			Uint32 destination<font color=#990000>,</font>
			<font color=#009900>void</font> <font color=#990000>(</font><font color=#990000>*</font>callback<font color=#990000>)</font><font color=#990000>(</font>AsyncOpNode <font color=#990000>*</font><font color=#990000>,</font> MessageQueue <font color=#990000>*</font><font color=#990000>,</font> <font color=#009900>void</font> <font color=#990000>*</font><font color=#990000>)</font><font color=#990000>,</font>
			MessageQueue <font color=#990000>*</font>callback_q<font color=#990000>,</font>
			<font color=#009900>void</font> <font
							      color=#990000>*</font>callback_ptr<font color=#990000>)</font><font color=#990000>;</font>
</pre>

<h2>Class Definitions</h2>

<h3>cimom (Meta Dispatcher)</h3>
<pre>
class PEGASUS_COMMON_LINKAGE cimom : public MessageQueue
{
   public : 
      cimom(void);
      
      virtual ~cimom(void) ;
            
      Boolean moduleChange(struct timeval last);
      
      Uint32 getModuleCount(void);
      Uint32 getModuleIDs(Uint32 *ids, Uint32 count) throw(IPCException);

      AsyncOpNode *get_cached_op(void) throw(IPCException);
      void cache_op(AsyncOpNode *op) throw(IPCException);
            
      void set_default_op_timeout(const struct timeval *buffer);
      void get_default_op_timeout(struct timeval *timeout) const ;

      virtual void handleEnqueue();
      void register_module(RegisterCimService *msg);
      void deregister_module(Uint32 quid);
      void update_module(UpdateCimService *msg );
      void ioctl(AsyncIoctl *msg );

      void find_service_q(FindServiceQueue *msg );
      void enumerate_service(EnumerateService *msg );
      Boolean route_async(AsyncOpNode *operation);
      void _shutdown_routed_queue(void);
      
            
   protected:
      Uint32 get_module_q(const String & name);
      void _make_response(AsyncRequest *req, Uint32 code);
      void _completeAsyncResponse(AsyncRequest *request, 
				  AsyncReply *reply, 
				  Uint32 state, 
				  Uint32 flag);
   private:
      struct timeval _default_op_timeout;
      struct timeval _last_module_change;
      DQueue&lt;message_module&gt; _modules;

      DQueue&lt;AsyncOpNode&gt; _recycle;
      
      AsyncDQueue&lt;AsyncOpNode&gt; _routed_ops;
      DQueue&lt;AsyncOpNode&gt; _internal_ops;
      
      static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _routing_proc(void *);

      Thread _routing_thread;

      static Uint32 get_xid(void);
      void _handle_cimom_op(AsyncOpNode *op, Thread *thread, MessageQueue *queue);
      Uint32 _ioctl(Uint32, Uint32, void *);


      AtomicInt _die;
      AtomicInt _routed_queue_shutdown;
      
      static AtomicInt _xid;
      
//       CIMOperationRequestDispatcher *_cim_dispatcher;
//       CIMOperationResponseEncoder *_cim_encoder;
//       CIMOperationRequestDecoder *_cim_decoder;
//       CIMRepository *_repository;
      
};

</pre>
	  

<h3>MessageQueueService</h3>

<pre>

class message_module;

class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
{
   public:

      typedef MessageQueue Base;
      
      MessageQueueService(const char *name, Uint32 queueID, Uint32 capabilities, Uint32 mask) ;
      
      virtual ~MessageQueueService(void);
      
      virtual void handle_heartbeat_request(AsyncRequest *req);
      virtual void handle_heartbeat_reply(AsyncReply *rep);
      
      virtual void handle_AsyncIoctl(AsyncIoctl *req);
      virtual void handle_CimServiceStart(CimServiceStart *req);
      virtual void handle_CimServiceStop(CimServiceStop *req);
      virtual void handle_CimServicePause(CimServicePause *req);
      virtual void handle_CimServiceResume(CimServiceResume *req);
      
      virtual void handle_AsyncOperationStart(AsyncOperationStart *req);
      virtual void handle_AsyncOperationResult(AsyncOperationResult *req);
      virtual Boolean accept_async(AsyncOpNode *op);
      virtual Boolean messageOK(const Message *msg) ;

      AsyncReply *SendWait(AsyncRequest *request);
      
      void _completeAsyncResponse(AsyncRequest *request, 
				 AsyncReply *reply, 
				 Uint32 state, 
				 Uint32 flag);
      Boolean register_service(String name, Uint32 capabilities, Uint32 mask);
      Boolean update_service(Uint32 capabilities, Uint32 mask);
      Boolean deregister_service(void);
      virtual void _shutdown_incoming_queue(void);
      void find_services(String name,
			 Uint32 capabilities, 
			 Uint32 mask, 
			 Array&lt;Uint32&gt; *results);
      void enumerate_service(Uint32 queue, message_module *result);
      Uint32 get_next_xid(void);
      AsyncOpNode *get_op(void);
      void return_op(AsyncOpNode *op);
      Uint32 _capabilities;
      Uint32 _mask;
      AtomicInt _die;
   protected:

      virtual void _handle_incoming_operation(AsyncOpNode *operation, Thread *thread, MessageQueue *queue);
      virtual void _handle_async_request(AsyncRequest *req);
      virtual void _make_response(AsyncRequest *req, Uint32 code);
      cimom *_meta_dispatcher;

   private: 
      void handleEnqueue();
      DQueue&lt;AsyncOpNode&gt; _pending;
      AsyncDQueue&lt;AsyncOpNode&gt; _incoming;
      
      static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _req_proc(void *);
      AtomicInt _incoming_queue_shutdown;
      
      Thread _req_thread;
      
      struct timeval _default_op_timeout;

      static AtomicInt _xid;

 };


</pre>

<h3>Asynchronous Messages</h3>

<pre>

extern const Uint32 CIMOM_Q_ID;

class AsyncOpNode;

class PEGASUS_COMMON_LINKAGE async_results
{
   public:
      static const Uint32 OK;
      static const Uint32 PARAMETER_ERROR;
      static const Uint32 MODULE_ALREADY_REGISTERED;
      static const Uint32 MODULE_NOT_FOUND;
      static const Uint32 INTERNAL_ERROR;

      static const Uint32 ASYNC_STARTED;
      static const Uint32 ASYNC_PROCESSING;
      static const Uint32 ASYNC_COMPLETE;
      static const Uint32 ASYNC_CANCELLED;
      static const Uint32 ASYNC_PAUSED;
      static const Uint32 ASYNC_RESUMED;

      static const Uint32 CIM_SERVICE_STARTED;
      static const Uint32 CIM_SERVICE_STOPPED;
      static const Uint32 CIM_SERVICE_PAUSED;

      static const Uint32 CIM_SERVICE_RESUMED;
      static const Uint32 CIM_NAK;

      static const Uint32 ASYNC_PHASE_COMPLETE;
      static const Uint32 ASYNC_CHILD_COMPLETE;
      static const Uint32 ASYNC_PHASE_STARTED;
      static const Uint32 ASYNC_CHILD_STARTED;
      static const Uint32 CIM_PAUSED;
      static const Uint32 CIM_STOPPED;
      
};


class PEGASUS_COMMON_LINKAGE async_messages
{
   public:
      static const Uint32 HEARTBEAT;
      static const Uint32 REPLY;
      static const Uint32 REGISTER_CIM_SERVICE;
      static const Uint32 DEREGISTER_CIM_SERVICE;
      static const Uint32 UPDATE_CIM_SERVICE;
      static const Uint32 IOCTL;
      static const Uint32 CIMSERVICE_START;
      static const Uint32 CIMSERVICE_STOP;
      static const Uint32 CIMSERVICE_PAUSE;
      static const Uint32 CIMSERVICE_RESUME;

      static const Uint32 ASYNC_OP_START;
      static const Uint32 ASYNC_OP_RESULT;
      static const Uint32 ASYNC_LEGACY_OP_START;
      static const Uint32 ASYNC_LEGACY_OP_RESULT;

      static const Uint32 FIND_SERVICE_Q;
      static const Uint32 FIND_SERVICE_Q_RESULT;
      static const Uint32 ENUMERATE_SERVICE;
      static const Uint32 ENUMERATE_SERVICE_RESULT;
};


class PEGASUS_COMMON_LINKAGE AsyncMessage : public Message
{
   public:
      AsyncMessage(Uint32 type, 
		   Uint32 key, 
		   Uint32 routing,
		   Uint32 mask,
		   AsyncOpNode *operation);
           
      virtual ~AsyncMessage(void) 
      {
	 
      }
      
      Boolean operator ==(void *key);
      Boolean operator ==(const AsyncMessage& msg);
      
      AsyncOpNode *op;
      Thread *_myself;
      MessageQueue *_service;
};


inline Boolean AsyncMessage::operator ==(void *key)
{
   if( key == reinterpret_cast&lt;void *&gt;(this))
      return true;
   return false;
}

inline Boolean AsyncMessage::operator ==(const AsyncMessage& msg)
{
   return this-&gt;operator==(reinterpret_cast&lt;void *&gt;(const_cast&lt;AsyncMessage *&gt;(&msg)));
}


class PEGASUS_COMMON_LINKAGE AsyncRequest : public AsyncMessage
{
   public:
      AsyncRequest(Uint32 type, 
		   Uint32 key, 
		   Uint32 routing,
		   Uint32 mask,
		   AsyncOpNode *operation,
		   Uint32 destination,
		   Uint32 response,
		   Boolean blocking);
      
      
      virtual ~AsyncRequest(void) 
      {

      }
            
      Uint32 dest;
      Uint32 resp;
      Boolean block;
};

class PEGASUS_COMMON_LINKAGE AsyncReply : public AsyncMessage
{
   public:
      AsyncReply(Uint32 type, 
		 Uint32 key, 
		 Uint32 routing, 
		 Uint32 mask,
		 AsyncOpNode *operation,
		 Uint32 result_code,
		 Uint32 destination,
		 Boolean blocking);
      
      
      virtual ~AsyncReply(void)
      {
	 if(op != 0 )
	    delete op;
	 
      }
            
      Uint32 result;
      Uint32 dest;
      Boolean block;
};



class PEGASUS_COMMON_LINKAGE RegisterCimService : public AsyncRequest
{
   public: 
      RegisterCimService(Uint32 routing, 
			 AsyncOpNode *operation,
			 Boolean blocking,
			 String service_name,
			 Uint32 service_capabilities, 
			 Uint32 service_mask,
			 Uint32 service_queue);
      
      virtual ~RegisterCimService(void) 
      {

      }
      
      String name;
      Uint32 capabilities;
      Uint32 mask;
      Uint32 queue;
};

class PEGASUS_COMMON_LINKAGE DeRegisterCimService : public AsyncRequest
{
   public:
      DeRegisterCimService(Uint32 routing, 
			   AsyncOpNode *operation,
			   Boolean blocking, 
			   Uint32 service_queue);
      
      
      virtual ~DeRegisterCimService(void)
      {

      }
      
      Uint32 queue;
} ;

class PEGASUS_COMMON_LINKAGE UpdateCimService : public AsyncRequest
{
   public:
      UpdateCimService(Uint32 routing, 
		       AsyncOpNode *operation,
		       Boolean blocking, 
		       Uint32 service_queue, 
		       Uint32 service_capabilities, 
		       Uint32 service_mask);

      virtual ~UpdateCimService(void) 
      {

      }
      
      Uint32 queue;
      Uint32 capabilities;
      Uint32 mask;
};


class PEGASUS_COMMON_LINKAGE AsyncIoctl : public AsyncRequest
{
   public:
      AsyncIoctl(Uint32 routing, 
		 AsyncOpNode *operation, 
		 Uint32 destination, 
		 Uint32 response,
		 Boolean blocking,
		 Uint32 code, 
		 Uint32 int_param,
		 void *p_param);

      virtual ~AsyncIoctl(void)
      {

      }
      
      enum 
      {
	 IO_CLOSE,
	 IO_OPEN,
	 IO_SOURCE_QUENCH,
	 IO_SERVICE_DEFINED
      };
      
      

      Uint32 ctl;
      Uint32 intp;
      void *voidp;

};

class PEGASUS_COMMON_LINKAGE CimServiceStart : public AsyncRequest
{
   public:
      CimServiceStart(Uint32 routing, 
		      AsyncOpNode *operation, 
		      Uint32 destination, 
		      Uint32 response, 
		      Boolean blocking);
      
      virtual ~CimServiceStart(void) 
      {
	 
      }
};


class PEGASUS_COMMON_LINKAGE CimServiceStop : public AsyncRequest
{
   public:
      CimServiceStop(Uint32 routing, 
		     AsyncOpNode *operation, 
		     Uint32 destination, 
		     Uint32 response, 
		     Boolean blocking);
            
      virtual ~CimServiceStop(void) 
      {

      }
};

class PEGASUS_COMMON_LINKAGE CimServicePause : public AsyncRequest
{
   public:
      CimServicePause(Uint32 routing, 
		      AsyncOpNode *operation, 
		      Uint32 destination, 
		      Uint32 response, 
		      Boolean blocking);
      
      
      virtual ~CimServicePause(void)
      {

      }
};

class PEGASUS_COMMON_LINKAGE CimServiceResume : public AsyncRequest
{
   public:
      CimServiceResume(Uint32 routing, 
		       AsyncOpNode *operation, 
		       Uint32 destination, 
		       Uint32 response, 
		       Boolean blocking);
      
      
      virtual ~CimServiceResume(void)
      {

      }
};

class PEGASUS_COMMON_LINKAGE AsyncOperationStart : public AsyncRequest
{
   public:
      AsyncOperationStart(Uint32 routing, 
			  AsyncOpNode *operation, 
			  Uint32 destination, 
			  Uint32 response, 
			  Boolean blocking, 
			  Message *action);
      

      virtual ~AsyncOperationStart(void)
      {

      }
      
      Message *act;
};

class PEGASUS_COMMON_LINKAGE AsyncOperationResult : public AsyncReply
{
   public:
      AsyncOperationResult(Uint32 key, 
			   Uint32 routing, 
			   AsyncOpNode *operation,
			   Uint32 result_code, 
			   Uint32 destination,
			   Uint32 blocking);
      

      virtual ~AsyncOperationResult(void)
      {

      }
};


class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationStart : public AsyncRequest
{
   public:
      AsyncLegacyOperationStart(Uint32 routing, 
				AsyncOpNode *operation, 
				Uint32 destination, 
				Message *action);
      
      
      virtual ~AsyncLegacyOperationStart(void)
      {

      }
      
      Message *act;
};

class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationResult : public AsyncReply
{
   public:
      AsyncLegacyOperationResult(Uint32 key, 
				 Uint32 routing, 
				 AsyncOpNode *operation,
				 Message *result);
      
      virtual ~AsyncLegacyOperationResult(void)
      {

      }

      Message *res;
};


class PEGASUS_COMMON_LINKAGE FindServiceQueue : public AsyncRequest
{
   public:
      FindServiceQueue(Uint32 routing, 
		       AsyncOpNode *operation, 
		       Uint32 response,
		       Boolean blocking, 
		       String service_name, 
		       Uint32 service_capabilities, 
		       Uint32 service_mask);
      
      virtual ~FindServiceQueue(void)
      {

      }
      
      String name;
      Uint32 capabilities;
      Uint32 mask;
} ;

class PEGASUS_COMMON_LINKAGE FindServiceQueueResult : public AsyncReply
{
   public:
      FindServiceQueueResult(Uint32 key, 
			     Uint32 routing, 
			     AsyncOpNode *operation, 
			     Uint32 result_code, 
			     Uint32 destination, 
			     Boolean blocking, 
			     Array&lt;Uint32&gt; queue_ids);
      
      
      virtual ~FindServiceQueueResult(void)
      {

      }
      
      Array&lt;Uint32&gt; qids;
} ;

class PEGASUS_COMMON_LINKAGE EnumerateService : public AsyncRequest
{
   public:
      EnumerateService(Uint32 routing, 
		       AsyncOpNode *operation, 
		       Uint32 response, 
		       Boolean blocking, 
		       Uint32 queue_id);
      
      
      virtual ~EnumerateService(void)
      {

      }
      
      Uint32 qid;
};

class PEGASUS_COMMON_LINKAGE EnumerateServiceResponse : public AsyncReply
{
   public:
      EnumerateServiceResponse(Uint32 key, 
			       Uint32 routing, 
			       AsyncOpNode *operation, 
			       Uint32 result_code, 
			       Uint32 response, 
			       Boolean blocking,
			       String service_name, 
			       Uint32 service_capabilities, 
			       Uint32 service_mask, 
			       Uint32 service_qid);
      
      
      virtual ~EnumerateServiceResponse(void)
      {

      }
      
      String name;
      Uint32 capabilities;
      Uint32 mask;
      Uint32 qid;
};

</pre>

<h3>AsyncOPNode</h3>

<pre>
#define ASYNC_OPFLAGS_UNKNOWN           0x00000000
#define ASYNC_OPFLAGS_INTERVAL_REPEAT   0x00000010
#define ASYNC_OPFLAGS_INDICATION        0x00000020
#define ASYNC_OPFLAGS_REMOTE            0x00000040
#define ASYNC_OPFLAGS_LOCAL_OUT_OF_PROC 0x00000080
#define ASYNC_OPFLAGS_PHASED            0x00000001
#define ASYNC_OPFLAGS_PARTIAL           0x00000002
#define ASYNC_OPFLAGS_NORMAL            0x00000000
#define ASYNC_OPFLAGS_SINGLE            0x00000008
#define ASYNC_OPFLAGS_MULTIPLE          0x00000010
#define ASYNC_OPFLAGS_TOTAL             0x00000020
#define ASYNC_OPFLAGS_META_DISPATCHER   0x00000040

#define ASYNC_OPSTATE_UNKNOWN           0x00000000
#define ASYNC_OPSTATE_OFFERED           0x00000001
#define ASYNC_OPSTATE_DECLINED          0x00000002
#define ASYNC_OPSTATE_STARTED           0x00000004
#define ASYNC_OPSTATE_PROCESSING        0x00000008
#define ASYNC_OPSTATE_DELIVER           0x00000010 
#define ASYNC_OPSTATE_RESERVE           0x00000020
#define ASYNC_OPSTATE_COMPLETE          0x00000040
#define ASYNC_OPSTATE_TIMEOUT           0x00000080
#define ASYNC_OPSTATE_CANCELLED         0x00000100
#define ASYNC_OPSTATE_PAUSED            0x00000200
#define ASYNC_OPSTATE_SUSPENDED         0x00000400
#define ASYNC_OPSTATE_RESUMED           0x00000800
#define ASYNC_OPSTATE_ORPHANED          0x00001000
#define ASYNC_OPSTATE_RELEASED          0x00002000

class Cimom;

class PEGASUS_COMMON_LINKAGE AsyncOpNode
{
   public:

      AsyncOpNode(void);
      ~AsyncOpNode(void);
            
      Boolean  operator == (const void *key) const;
      Boolean operator == (const AsyncOpNode & node) const;

      void get_timeout_interval(struct timeval *buffer) ;
      void set_timeout_interval(const struct timeval *interval);
      
      Boolean timeout(void)  ;

      OperationContext & get_context(void) ;

      void put_request(const Message *request) ;
      Message *get_request(void) ;
      
      void put_response(const Message *response) ;
      Message *get_response(void) ;
      
      Uint32 read_state(void) ;
      void write_state(Uint32) ;
      
      Uint32 read_flags(void);
      void write_flags(Uint32);
      
      void lock(void)  throw(IPCException);
      void unlock(void) throw(IPCException);
      void udpate(void) throw(IPCException);
      void deliver(const Uint32 count) throw(IPCException);
      void reserve(const Uint32 size) throw(IPCException);
      void processing(void) throw(IPCException) ;
      void processing(OperationContext *context) throw(IPCException);
      void complete(void) throw(IPCException) ;
      void complete(OperationContext *context) throw(IPCException);
      void release(void);
      void wait(void);
      
      
   private:
      Semaphore _client_sem;
      Mutex _mut;
      unlocked_dq&lt;Message&gt; _request;
      unlocked_dq&lt;Message&gt; _response; 

      OperationContext _operation_list;
      Uint32 _state;
      Uint32 _flags;
      Uint32 _offered_count;
      Uint32 _total_ops;
      Uint32 _completed_ops;
      Uint32 _user_data;
      
      struct timeval _start;
      struct timeval _lifetime;
      struct timeval _updated;
      struct timeval _timeout_interval;

      AsyncOpNode *_parent;
      unlocked_dq&lt;AsyncOpNode&gt; _children;

      void _reset(unlocked_dq&lt;AsyncOpNode&gt; *dst_q);

      // the lifetime member is for cache management by the cimom
      void _set_lifetime(struct timeval *lifetime) ;
      Boolean _check_lifetime(void) ;

      Boolean _is_child(void) ;
      Uint32 _is_parent(void) ;

      Boolean _is_my_child(const AsyncOpNode & caller) const;
      void _make_orphan( AsyncOpNode & parent) ;
      void _adopt_child(AsyncOpNode *child) ;
      void _disown_child(AsyncOpNode *child) ;
      friend class cimom;
      friend class MessageQueueService;
      
};
</pre>

<hr>
    <address><a href="mailto:mdday@us.ibm.com">Michael Day</a></address>
<!-- Created: Tue Feb  5 13:21:55 EST 2002 -->
<!-- hhmts start -->
Last modified: Tue Mar 12 17:48:45 EST 2002
<!-- hhmts end -->
  </body>
</html>



No CVS admin address has been configured
Powered by
ViewCVS 0.9.2