Pegasus Meta Dispatcher

The Pegasus Meta Dispatcher is a set of classes that extend the existing MessageQueue messaging system to be dynamic, asynchronous, and multithreaded. The primary classes consist of the folowing:

ClassDerived fromSource file
cimom
MessageQueuePegasus/Common/Cimom.h
MessageQueueServiceMessageQueuePegasus/Common/MessageQueueServices.h
CimomMessageMessagePegasus/Common/CimomMessage.h
AsyncOpNoden/aPegasus/Common/AsyncOpNode.h
AsyncDQueueunlocked_dqPegasus/Common/DQueue.h
IPC classesn/aPegasus/Common/IPC.h
Threading classesn/aPegasus/Common/Thread.h


Jump to the non-blocking message interface.

Purposes of Meta Dispatcher

The Meta Dispatcher has three primary goals:

  1. Provide for orderly asynchronous message-based communication among a dynamic set of Pegasus Services.
  2. Preserve the existing message-passing architecture of Pegasus.
  3. Allow Pluggable Services such as repositories, provider managers, and others.


Most of the purposes listed above revolve around maintaining the integrity of data and control flow in an asynchronous multithreaded environment.

Terms

Meta Dispatcher
The central message broker, or router, that provides the asynchronous communications within Pegasus. Derived from the MessageQueue class.
Service
A Pegasus module that sends and receives messages to other modules through the meta dispatcher; A module that has enhanced privileges within Pegasus and which provides one or more functions necessary to the operation of Pegasus. Derived from the MessageQueueclass.
Asynchronous Message
A pair of messages, consisting of a request and a response that are treated as a single operation by Services. An asynchronous message may be fronted by a synchronous programming interface. Derived from the Message class.
AsyncOpNode
A control object that manages the lifetime of an Asynchronous Message.The AsyncOpNode uses many of the IPC object classes. A Service manages the lifetime of an AsyncOpNode during the processing of the message. However, it necessarily cedes control of the AsyncOpNode to the Meta Dispatcher while the Asynchronous Message is being processed.

Meta Dispatcher Design

Three points are necessary to avoid deadlocks and to provide pluggable services in Pegaus. The first thing is independent execution paths of service modules. i.e., each service must have its own thread(s), which must not intersect with the thread(s) of other services. Intersection of execution paths can occur indirectly through IPC objects such as mutexes, conditions, and semaphores.

The second point that is necessary is interface abstraction, which the Meta Dispatcher provides through C++ polymorphism. This allows pluggable services. i.e., one service can replace another and the system will continue to function (hopefully in an improved manner).

The third point that is neccesary is a central message broker that isolates services from each other, thereby preventing deadlocks. The central message broker also provides message responses for services that are paused, stopped, or not present (plugged in).

Central Hub

The Meta Dispatcher therefore acts as a central message hub. Services communicate with each other via the Meta Dispatcher.

      Service A--Message----1----> (block on semaphore)
                                  |
                                  |
                                  Meta Dispatcher|
                                                 |
                                               Message----2->Service B
                                                                 |
                                                                 |
                           (Signal Semaphore) <---Response---3-- +
                                 |
      Service A <--Response--4---+

    
The numbered steps above are as follows:
  1. Service A creates a new AsyncMessage and AsyncOpNodeand sends that message to Service B by calling MessageQueueService::SendWait. The calling thread blocks on the client emaphore until the response is ready.

  2. The Meta Dispatcher's routing thread picks up the message and inserts it into Service B's incoming message queue. The routing thread returns to the Meta Dispatcher to route the next message in the system.

  3. Service B'sincoming thread picks up the message and calls the its message handler. Message handlers are virtual, so a class derived from MessageQueueServicecan define its own message handlers to override the default handlers. When the message handler has constructed an AsyncReply that reply gets linked to the AsyncOpNode. The MessageQueueService then signals the client semaphore within the op node.

  4. Service A awakens when the client semaphore is signalled. It pulls the AsyncResponse message from the AsyncOpNode and processes the result. Service A is responsible for discarding the request, response, and AsyncOpNode objects. The existing classes have mechanisms for caching these objects to avoid too frequent construction/destruction of them.

Test Program

The concepts explained below are all contained in the test program for the Meta Dispatcher, which is located in $PEGASUS_HOME/src/Pegasus/Common/tests/MessageQueueService/

Service Registration and Deregistration

Services (classes derived from MessageQueueServicemust register their presence with the Meta Dispatcher. This is done as follows (taken from the test program):

  1. Define the Service Class
  2. // Define our service class
    
    class MessageQueueClient : public MessageQueueService
    {
    
       public:
          typedef MessageQueueService Base;
    
          MessageQueueClient(char *name)
    	 : Base(name, MessageQueue::getNextQueueId(), 0,
    		message_mask::type_cimom |
    		message_mask::type_service |
    		message_mask::ha_request |
    		message_mask::ha_reply |
    		message_mask::ha_async ),
    	   client_xid(1)
          {
    	 _client_capabilities = Base::_capabilities;
    	 _client_mask = Base::_mask;
          }
    
          virtual ~MessageQueueClient(void)
          {
          }
    
          // method to indicate acceptance of message to
          // Meta Dispatcher
          virtual Boolean messageOK(const Message *msg);
    
          // function to send a request to another service
          void send_test_request(char *greeting, Uint32 qid);
          Uint32 get_qid(void);
    
          Uint32 _client_capabilities;
          Uint32 _client_mask;
    
          // method to receive messages from the Meta Dispatcher,
          // MUST be defined
          virtual void _handle_async_request(AsyncRequest *req);
    
          AtomicInt client_xid;
    };
    
    
  3. Construct the Service
  4. // Create our Service
       MessageQueueClient *q_client =
              new MessageQueueClient("test client");
    
    
  5. Register the Service
  6. // Register our service with the Meta Dispatcher
       q_client->register_service("test client",
                                   q_client->_client_capabilities,
                                   q_client->_client_mask);
       cout << " client registered " << endl;
    
The example above hides many of the details which are handled by the MessageQueueService's constructor, such as creating the background thread, finding the Meta Dispatcher, and constructing the queues. But a derived class as the example shows does not need to worry about those details.

Finding Other Services

The MessageQueueService class has an api for finding other services. This api is built using messages that are defined in CimomMessage.h. Here is an example from the test program:

   Array<Uint32>; services;

   while( services.size() == 0 )
   {
      q_client->find_services(String("test server"), 0, 0, &services);
      pegasus_yield();
   }

   cout << "found server at " << services[0] << endl;


The code sample above shows how to find services by their name. The api also allows finding services by their capabilities or the messages they support. Note that the return is an array of Queue IDs. It is possible, for example, to find multiple services.

Sending an Asynchronous Message to Another Service

The "handle" for a services is its Queue ID. Once you have the Queue ID you can send a message to that service. The example above shows one way to get a service's Queue ID. Here is an example that shows how to send that service a message.

  1. Define the Request and Response Message Pair by Inheriting from AsyncMessage.
  2. class test_request : public AsyncRequest
    {
    
       public:
          typedef AsyncRequest Base;
    
          test_request(Uint32 routing,
    		   AsyncOpNode *op,
    		   Uint32 destination,
    		   Uint32 response,
    		   char *message)
    	 : Base(0x04100000,
    		Message::getNextKey(),
    		routing,
    		0,
    		op,
    		destination,
    		response,
    		true),
    	   greeting(message)
          {
    	
          }
    
          virtual ~test_request(void)
          {
    
    
          }
    
          String greeting;
    };
    
    
    class test_response : public AsyncReply
    {
       public:
          typedef AsyncReply Base;
    
    
          test_response(Uint32 key,
    		    Uint32 routing,
    		    AsyncOpNode *op,
    		    Uint32 result,
    		    Uint32 destination,
    		    char *message)
    	 : Base(0x04200000,
    		key,
    		routing,
    		0,
    		op,
    		result,
    		destination,
    		true),
    	   greeting(message)
          {
    	
          }
    
          virtual ~test_response(void)
          {
    	
          }
    
          String greeting;
    };
    
    
    The function send_test_request shows everything that is necessary to send a message to another service and process the reply.
    void MessageQueueClient::send_test_request(char *greeting, Uint32 qid)
    {
    
    
  3. Construct the Request
  4.    test_request *req =
          new test_request(Base::get_next_xid(),
    		       0,
    		       qid,        // destination queue ID
    		       _queueId,   // my own queue ID
    		       greeting);  // message parameter
    
    
  5. Send the message using MessageQueueService::SendWait
  6.    AsyncMessage *response = SendWait(req);
    
    
  7. Process the Response.
       if( response != 0  )
       {
          msg_count++;
          delete response;
          cout << " test message " << msg_count.value() << endl;
    
       }
       delete req;
    }
    
    
  8. Delete the Request and the Response. The SendWait interface creates and disposes of everything else.

Handling an Incoming Message

To handle messages the service needs to implement the following methods.

  1. virtual Boolean MessageOK(const Message *)
  2. This method allows the Service to accept or reject the message. The Meta Dispatcher will always call this method before inserting the request on the Service's queue.
    Boolean MessageQueueServer::messageOK(const Message *msg)
    {
       if(msg->getMask() & message_mask::ha_async)
       {
          if( msg->getType() == 0x04100000 ||
    	  msg->getType() == async_messages::CIMSERVICE_STOP ||
    	  msg->getType() == async_messages::CIMSERVICE_PAUSE ||
    	  msg->getType() == async_messages::CIMSERVICE_RESUME )
          return true;
       }
       return false;
    }
    
    
  3. virtual Boolean accept_async(AsyncOpNode *operation) (optional)
  4. This method executes on the Meta Dispatcher's thread and links the incoming message to the Service's queue.

  5. virtual void _handle_incoming_operation(AsyncOpNode *)

  6. This method is called by the Service's background thread. Here is an example implementation that just does some sanity checking on the message.
    void MessageQueueServer::_handle_incoming_operation(AsyncOpNode *op)
    {
       if ( operation != 0 )
       {
          Message *rq = operation->get_request();
          PEGASUS_ASSERT(rq != 0 );
          PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
          PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
          _handle_async_request(static_cast<AsyncRequest *>(rq));
       }
    
       return;
    
    }
    
    
    
  7. virtual void _handle_async_request(AsyncRequest *)


  8. This method handles the request. The Service must implement this method. If the Service does not handle the Request it must pass the Request to the Base class by calling Base::_handle_async_request(req)
    void MessageQueueServer::_handle_async_request(AsyncRequest *req)
    {
       if (req->getType() == 0x04100000 )
       {
          req->op->processing();
          handle_test_request(req);   // Message Handler
       }
       else if ( req->getType() == async_messages::CIMSERVICE_STOP )
       {
          req->op->processing();
          handle_CimServiceStop(static_cast(req));
       }
    
       else
          Base::_handle_async_request(req);  // Give it to the Base !!
    }
    
    
  9. Specific Message Handlers Each Message handler will be defined by the format of the Request/Response pair. Here is an example from the test program.
       if( msg->getType() == 0x04100000 )
       {
    
    
    1. Construct the Reply
    2.       test_response *resp =
      	 new test_response(msg->getKey(),
      			   msg->getRouting(),
      			   msg->op,
      			   async_results::OK,
      			   msg->dest,
      			   "i am a test response");
      
      
      
    3. Complete the Reply by calling the following helper routine in the Base class
    4.       _completeAsyncResponse(msg, resp, ASYNC_OPSTATE_COMPLETE, 0);
      
         }
      

    Handling CIMMessage and Other Pre-existing Message Classes

    Existing Messages, including all of the CIMMessage derivitives, are not configured to be asynchronous request/reply pairs. They are designed to travel through Pegasus as events that trigger other processing events, which is the end of their lifetime. This is not an optimal use model for asynchronous operation because the originator of the event does not require nor receive any completion notification. Further, there is not a one-to-one correspondence of "event messages" to replies.

    AsyncLegacyOperationStart Message

    The AsyncLegacyOperationStart message is an envelope that allows a MessageQueueService-based service to send, receive, and process pre-existing "legacy" messages.

    The AsyncLegacyOperationStart Message allows an asynchronous service to create, package, and send a "legacy" message to another service or, indirectly, enqueue it to a non-asynchronous message queue. The code example below shows how this works:

       cout << " sending LEGACY to test server" << endl;
    
       Message *legacy = new Message(0x11100011,
    				 Message::getNextKey());
    
       AsyncLegacyOperationStart *req =
          new AsyncLegacyOperationStart(q_client->get_next_xid(),
    				    0,
    				    services[0],
    				    legacy,
    				    q_client->getQueueId());
       reply = q_client->SendWait(req);
       delete req;
       delete reply;
    
    

    The code sample above shows a Message object being embedded inside an AsyncLegacyOperationStart message and sent using the SendWaitAPI.

    Default Handler for Legacy Messages

    The MessageQueueService class has a default handler for legacy messages that extracts the Message out of its asynchronous "envelope" and dispatches it using the pre-existing synchronous interface, as shown below.

    void MessageQueueService::handle_AsyncLegacyOperationStart(
                                                   AsyncLegacyOperationStart *req)
    {
       // remove the legacy message from the request and enqueue it to its destination
       Uint32 result = async_results::CIM_NAK;
    
       Message *legacy = req->act;
       if ( legacy != 0 )
       {
          MessageQueue* queue = MessageQueue::lookup(req->legacy_destination);
          if( queue != 0 )
          {
    	// Enqueue the response:
    	 queue->enqueue(legacy);
    	 result = async_results::OK;
          }
       }
       _make_response(req, result);
    }
    
    

    The default handler shown above extracts the legacy message and attempts to enqueue that message syncrhonously using the pre-existing interface.

    Example of Custom Handler for Legacy Messages

    By implementing the virtual _handle_async_request method, a service can choose to implement its own handler for Legacy messages, as the code below shows:

    1. Implement the virtual _handle_async_request method.
    2. void MessageQueueServer::_handle_async_request(AsyncRequest *req)
      {
         if (req->getType() == 0x04100000 )
         {
            req->op->processing();
            handle_test_request(req);
         }
         else if ( req->getType() == async_messages::CIMSERVICE_STOP )
         {
            req->op->processing();
            handle_CimServiceStop(static_cast(req));
         }
      
    3. Implement a dispatcher for ASYNC_LEGACY_OP_START
    4.    else if ( req->getType() == async_messages::ASYNC_LEGACY_OP_START )
         {
            req->op->processing();
            handle_LegacyOpStart(static_cast(req));
         }
      
         else
            Base::_handle_async_request(req);
      }
      
      
    5. Implement a dispatcher for ASYNC_LEGACY_OP_START
    6. void MessageQueueServer::handle_LegacyOpStart(AsyncLegacyOperationStart *req)
      {
      
         Message *legacy = req->act;
         cout << " ### handling legacy messages " << endl;
      
      
            AsyncReply *resp =
      	 new AsyncReply(async_messages::REPLY,
      			req->getKey(),
      			req->getRouting(),
      			0,
      			req->op,
      			async_results::OK,
      			req->resp,
      			req->block);
            _completeAsyncResponse(req, resp, ASYNC_OPSTATE_COMPLETE, 0 );
      
            if (legacy != 0 )
      	 cout << " legacy msg type: " << legacy->getType() << endl;
      
      }
      
      

    Sending Messages without Blocking (Async with Callback)

    Whenever there is a possibility that the processing of one message may generate a nested message (message generated within the handler of a message) it is necessary to send messages without blocking, and to receive responses via callback routines. The diagram below shows the (more complicated) flow of non-blocking messages.


          Service A--Message----1---->
                                   |
            . <-----------(return)-+----->-(loop)--->-+
            .                      |  Meta Dispatcher |
            .                      +----<-----<---<---+
            .                                      Message---2-->Service B
            .                                                            |
            .                                        <--Response--3------+
            .                                        |
            .                        +--<--<-----<-----+--(return)---->
            .                        | Meta Dispatcher |
          Service A <--Callback--4---+--->-(loop)-->---+
                       |       ^
                       +-------+
        

    Test Program

    There is a test program that sends and receives non-blocking messages in $(PEGASUS_ROOT)/src/Pegasus/Common/tests/async_callback/

    SendAsync method

    The MessageQueueService class sends non-blocking messages using the SendAsync method from MessageQueueService.h.


    Boolean SendAsync(AsyncOpNode *op,
     		  Uint32 destination,
     		  void (*callback)(AsyncOpNode *, MessageQueue *, void *),
    		  MessageQueue *callback_q,
    		  void *callback_ptr);
    		

    AsyncOpNode *op (In) is the shared data structure that controls the message flow and consolidates the response data. This data structure is also passed to the callback function as the first parameter. The caller must allocate and free this data structure.

    Uint32 destination (In) is the queue ID of the service which will receive the asynchronous request message.

    void (*callback)(AsyncOpNode *, MessageQueue *, void *) (In) is the static class method that will be called when the request/response pair is complete.

    The callback is always passed op as the first parameter. The second parameter is the MessageQueue * object instance that is executing the callback function. Because callback functions must be static class methods, the callback can use the MessageQueue * parameter as a pseudo this pointer.

    MessageQueue *callback_q (In) is the queue pointer that will be passed to the callback routine as the second parameter. It also controls which queue instance receives the callback.

    void *callback_ptr (In) is a pointer that will be passed to the callback function.

    How the Meta Dispatcher Processes Non-Blocking Messages

    The focus of processing non-blocking messages is to remove the possibility of deadlock. Therefore, all message processing is performed by background threads. All access by services to shared data structures is discrete in the sense that one service never calls into another service. For example, callback routines are executed by the background thread of the receiving service. (As opposed to the sending service making a callback into the receiving service.)

    1. The requesting service creates a request, an AsyncOpNode and calls MessageQueueService::SendAsync(...).
    2. SendAsync marks the AsyncOpNode as ASYNC_OPFLAGS_CALLBACK with the ASYNC_OPSTATE_COMPLETE bit clear.
    3. The Meta Dispatcher routes the AsyncOpNode to the responding service.
    4. The responding service's background thread pulls the message off its internal queue and processes the request. This will frequently entail creating and inserting a response message into the AsyncOpNode by calling AsyncOpNode::put_response(AsyncMessage *).
    5. The responding service completes the response by calling MessageQueueService::_complete_op_node(AsyncOpNode *, Uint32, Uint32, Uint32).
    6. _complete_op_node passes the operation back to the Meta Dispatcher, which routes it back to the requesting service.
    7. The requesting service's background thread pulls the AsyncOpNode off its internal queue and calls AsyncOpNode::_async_callback, which is a function pointer that holds the address of the requesting service's static callback method..
    8. The requesting service deallocates the op node, the request message, and the response message.

    Virtual Methods for Non-Blocking Messages

    To use non-blocking messages, a service should implement the following virtual method from MessageQueueService, plus a static class callback function.

    virtual void _handle_async_request(AsyncRequest *req);
    		  
    AsyncRequest *req (In) is the incoming request. req->op is the AsyncOpNode that is controlling this operation.

    This method is called by the service's background thread whenever an AsyncRequest message is sent to the service. This includes both blocking and non-blocking request messages.

    Within _handle_async_request you can determine whether the message is blocking or non-blocking by using the following code segment:

    
    
    if(req->op->read_flags() & ASYNC_OPFLAGS_CALLBACK )
    {
    
       // let everyone know you are working on the request
       req->op->processing();
    
       // this request is part of a callback (non-blocking) operation
       AsyncResponse *response = new AsyncResponse(...);
    
       // initialize the response message however is appropriate
    
      // put the response to the op node
      req->op->put_response(response);
    
      // complete the operation
      _complete_op_node(req->op, 0, 0, async_results::OK);
    
      return;
    }
    

    static void async_callback_function(AsyncOpNode *op, MessageQueue *, void *);
    
    		  
    AsyncOpNode *op (In) is the incoming shared object that is controlling this completing operation.

    MessageQueue *q (In) is a pointer to the class that has received the complete op. This parameter is meant to be used as a this pointer because the callback is a static class method.

    void *parm (In) is a pointer that the class passed to SendAsync. It can be used as a convenience for the class.

    This method is only called when a non-blocking operation is complete, meaning that the responding service has added a response message to the op and set the ASYNC_OPSTATE_COMPLETE bit.

    void my_class::async_callback_function(AsyncOpNode *op,
                                           MessageQueue *q,
                                           void *parm)
    {
    
       my_class *myself = static_cast<my_class *>(q);
       AsyncRequest *request = op->get_request();
       AsyncResponse *response = op->get_response();
    
       // process response
    
       delete request;
       delete response;
       myself->put_op(op);
       return;
    }
    
    		  

    Strategies for Handling Non-Blocking Responses

    All of the legacy message handling code in Pegasus is syncrhonous, meaning that a service can send a request and receive the response in two adjacent lines of code. Non-blocking messages are different because the timing and existence of a response message is undetermined.

    There are couple of possible strategies for handing non-blocking response messages.

    1. Seperate the request creation and sending code from the response handling code into distinct methods.
    2. Handle request creation and sending and response handling in the same method using different code blocks.

    Separate Request and Response Handling Methods

    1. Write the request generation method.
       void my_class::generate_request(Uint32 destination)
      {
         AsyncOpNode *op = get_op();
         my_request *req = new my_request(...);
         op->put_request(req);
      
         SendAsync(op, destination, my_callback, this, (void *)0);
         return;
      }
      		      
    2. Write the request handling method.
      void my_class::handle_response(AsyncOpNode *op)
      {
         my_response *res = op->get_response();
         if(res != 0 )
         {
            // handle response
            delete response;
         }
         put_op(op);
         return;
      }
      		      
    3. Have your callback method call your response method.
      void my_class:my_callback(AsyncOpNode *op, MessageQueue *queue, void *parm)
      {
         my_class *myself = static_cast<queue>
      
         myself->handle_response(op);
         return;
      }
      		      

    Singe Request and Response Handling Method

    This strategy requires two separate code blocks within the request/response method, and conditional execution depending on the status of the operation.

    I think the advantage of this strategy is that it matches more closely the current
    handleEnqueue(Message *msg) code convention that is in Pegasus.

    1. Write the request generation block.
       void my_class::handle_operation(AsyncOpNode *op)
      {
         if(op == NULL)
         {
            AsyncOpNode *op = get_op();
            my_request *req = new my_request(...);
            op->put_request(req);
      
            SendAsync(op, destination, my_callback, this, (void *)0);
      
         }
         else
         {
      
         }
         return;
      }
      		      
    2. Write the request handling block.
       void my_class::handle_operation(AsyncOpNode *op)
      {
         if(op == NULL)
         {
            AsyncOpNode *op = get_op();
            my_request *req = new my_request(...);
            op->put_request(req);
      
            SendAsync(op, destination, my_callback, this, (void *)0);
      
         }
         else
         {
            my_response *res = op->get_response();
            if(res != 0 )
            {
               // handle response
               delete response;
            }
            put_op(op);
         }
         return;
      }
      
      		      
    3. Have your callback method call your handler method.
      void my_class:my_callback(AsyncOpNode *op, MessageQueue *queue, void *parm)
      {
         my_class *myself = static_cast<queue>
      
         myself->handle_operation(op);
         return;
      }
      		      


    Class Definitions

    cimom (Meta Dispatcher)

    
    //%LICENSE////////////////////////////////////////////////////////////////
    //
    // Licensed to The Open Group (TOG) under one or more contributor license
    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
    // this work for additional information regarding copyright ownership.
    // Each contributor licenses this file to you under the OpenPegasus Open
    // Source License; you may not use this file except in compliance with the
    // License.
    //
    // Permission is hereby granted, free of charge, to any person obtaining a
    // copy of this software and associated documentation files (the "Software"),
    // to deal in the Software without restriction, including without limitation
    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
    // and/or sell copies of the Software, and to permit persons to whom the
    // Software is furnished to do so, subject to the following conditions:
    //
    // The above copyright notice and this permission notice shall be included
    // in all copies or substantial portions of the Software.
    //
    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
    // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
    //
    //////////////////////////////////////////////////////////////////////////
    //
    // Author: Mike Day (mdday@us.ibm.com
    //
    // Modified By: 
    //
    //////////////////////////////////////////////////////////////////////////
    
    #ifndef CIMOM_include
    #define CIMOM_include
    
    #include <Pegasus/Common/Config.h>
    #include <Pegasus/Common/Exception.h>
    #include <Pegasus/Common/MessageQueue.h>
    #include <Pegasus/Common/DQueue.h>
    #include <Pegasus/Common/Thread.h>
    #include <Pegasus/Common/Array.h>
    #include <Pegasus/Common/AsyncOpNode.h>
    #include <Pegasus/Common/CimomMessage.h>
    #include <Pegasus/Common/MessageQueueService.h>
    
    PEGASUS_NAMESPACE_BEGIN
    
    extern const Uint32 CIMOM_Q_ID;
    
    class PEGASUS_COMMON_LINKAGE module_capabilities
    {
       public:
          static Uint32 async;
          static Uint32 remote;
          static Uint32 trusted;
          static Uint32 paused;
          static Uint32 stopped;
    } ;
    
    class PEGASUS_COMMON_LINKAGE cimom;
    
    class PEGASUS_COMMON_LINKAGE message_module
    {
       public:
          message_module(void)
    	 : _name(), _capabilities(0),
    	   _mask(0), _q_id(0) { }
    
          message_module(const String & name,
    		     Uint32 capabilities,
    		     Uint32 mask,
    		     Uint32 queue)
          	 : _name(name), _capabilities(capabilities),
    	   _mask(mask), _q_id(queue)  { }
    
          Boolean operator == (const message_module *mm) const;
          Boolean operator == (const String & name ) const ;
          Boolean operator == (const message_module & mm ) const ;
          Boolean operator == (const void *) const;
          Boolean operator == (Uint32) const;
    
          const String & get_name(void) const ;
          Uint32 get_capabilities(void) const ;
          Uint32 get_mask(void) const ;
          Uint32 get_queue(void) const ;
    
          void put_name(String & name);
          void put_capabilities(Uint32 capabilities);
          void put_mask(Uint32 mask);
          void put_queue(Uint32 queue) ;
    
       private:
          String _name;
          Uint32 _capabilities;
          Uint32 _mask;
          struct timeval _heartbeat;
    
    
          Uint32 _q_id;
          friend class cimom;
    };
    
    class MessageQueueService;
    
    
    class PEGASUS_COMMON_LINKAGE cimom : public MessageQueue
    {
       public :
          cimom(void);
    
          virtual ~cimom(void) ;
    
          Boolean moduleChange(struct timeval last);
    
          Uint32 getModuleCount(void);
          Uint32 getModuleIDs(Uint32 *ids, Uint32 count) throw(IPCException);
    
          AsyncOpNode *get_cached_op(void) throw(IPCException);
          void cache_op(AsyncOpNode *op) throw(IPCException);
    
          void set_default_op_timeout(const struct timeval *buffer);
          void get_default_op_timeout(struct timeval *timeout) const ;
    
          virtual void handleEnqueue();
          void register_module(RegisterCimService *msg);
          void deregister_module(Uint32 quid);
          void update_module(UpdateCimService *msg );
          void ioctl(AsyncIoctl *msg );
    
          void find_service_q(FindServiceQueue *msg );
          void enumerate_service(EnumerateService *msg );
          Boolean route_async(AsyncOpNode *operation);
          void _shutdown_routed_queue(void);
    
    
       protected:
          Uint32 get_module_q(const String & name);
          static void _make_response(Message *req, Uint32 code);
          static void _completeAsyncResponse(AsyncRequest *request,
    				  AsyncReply *reply,
    				  Uint32 state,
    				  Uint32 flag);
          static void _complete_op_node(AsyncOpNode *op, Uint32 state, Uint32 flag, Uint32 code);
          static void _default_callback(AsyncOpNode *, MessageQueue *, void *);
    
       private:
          struct timeval _default_op_timeout;
          struct timeval _last_module_change;
          DQueue<message_module> _modules;
    
          DQueue<AsyncOpNode> _recycle;
    
          AsyncDQueue<AsyncOpNode> _routed_ops;
          DQueue<AsyncOpNode> _internal_ops;
    
          static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _routing_proc(void *);
    
          Thread _routing_thread;
    
          static Uint32 get_xid(void);
          void _handle_cimom_op(AsyncOpNode *op, Thread *thread, MessageQueue *queue);
          Uint32 _ioctl(Uint32, Uint32, void *);
    
    
          AtomicInt _die;
          AtomicInt _routed_queue_shutdown;
    
          static AtomicInt _xid;
          static cimom *_global_this;
    
          friend class MessageQueueService;
    
    };
    
    PEGASUS_NAMESPACE_END
    
    #endif // CIMOM_include
    
    

    MessageQueueService

    
    //%LICENSE////////////////////////////////////////////////////////////////
    //
    // Licensed to The Open Group (TOG) under one or more contributor license
    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
    // this work for additional information regarding copyright ownership.
    // Each contributor licenses this file to you under the OpenPegasus Open
    // Source License; you may not use this file except in compliance with the
    // License.
    //
    // Permission is hereby granted, free of charge, to any person obtaining a
    // copy of this software and associated documentation files (the "Software"),
    // to deal in the Software without restriction, including without limitation
    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
    // and/or sell copies of the Software, and to permit persons to whom the
    // Software is furnished to do so, subject to the following conditions:
    //
    // The above copyright notice and this permission notice shall be included
    // in all copies or substantial portions of the Software.
    //
    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
    // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
    //
    //////////////////////////////////////////////////////////////////////////
    //
    // Author: Mike Day (mdday@us.ibm.com
    //
    // Modified By: 
    //
    //////////////////////////////////////////////////////////////////////////
    
    #ifndef Pegasus_MessageQueue_Service_h
    #define Pegasus_MessageQueue_Service_h
    
    #include <Pegasus/Common/Config.h>
    #include <Pegasus/Common/Message.h>
    #include <Pegasus/Common/Exception.h>
    #include <Pegasus/Common/IPC.h>
    #include <Pegasus/Common/Thread.h>
    #include <Pegasus/Common/AsyncOpNode.h>
    #include <Pegasus/Common/Cimom.h>
    #include <Pegasus/Common/CimomMessage.h>
    
    PEGASUS_NAMESPACE_BEGIN
    
    extern const Uint32 CIMOM_Q_ID;
    
    class message_module;
    
    class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
    {
       public:
    
          typedef MessageQueue Base;
    
          MessageQueueService(const char *name, Uint32 queueID,
    			  Uint32 capabilities = 0,
    			  Uint32 mask = message_mask::type_cimom |
    			  message_mask::type_service |
    			  message_mask::ha_request |
    			  message_mask::ha_reply |
    			  message_mask::ha_async ) ;
    
          virtual ~MessageQueueService(void);
    
          virtual Boolean isAsync(void) {  return true;  }
    
          virtual void enqueue(Message *) throw(IPCException);
    
          AsyncReply *SendWait(AsyncRequest *request);
          Boolean SendAsync(AsyncOpNode *op,
    			Uint32 destination,
    			void (*callback)(AsyncOpNode *, MessageQueue *, void *),
    			MessageQueue *callback_q,
    			void *callback_ptr);
          Boolean  SendForget(Message *msg);
          Boolean ForwardOp(AsyncOpNode *, Uint32 destination);
    
    
          Boolean register_service(String name, Uint32 capabilities, Uint32 mask);
          Boolean update_service(Uint32 capabilities, Uint32 mask);
          Boolean deregister_service(void);
          virtual void _shutdown_incoming_queue(void);
    
          void find_services(String name,
    			 Uint32 capabilities,
    			 Uint32 mask,
    			 Array<Uint32> *results);
          void enumerate_service(Uint32 queue, message_module *result);
          Uint32 get_next_xid(void);
          AsyncOpNode *get_op(void);
          void return_op(AsyncOpNode *op);
    
          Uint32 _mask;
          AtomicInt _die;
       protected:
          virtual Boolean accept_async(AsyncOpNode *op);
          virtual Boolean messageOK(const Message *msg) ;
          virtual void handleEnqueue(void) = 0;
          virtual void handleEnqueue(Message *) = 0;
          Boolean _enqueueResponse(Message *, Message *);
          virtual void _handle_incoming_operation(AsyncOpNode *);
    
          virtual void _handle_async_request(AsyncRequest *req);
          virtual void _handle_async_callback(AsyncOpNode *operation);
          virtual void _make_response(Message *req, Uint32 code);
    
    
          virtual void handle_heartbeat_request(AsyncRequest *req);
          virtual void handle_heartbeat_reply(AsyncReply *rep);
    
          virtual void handle_AsyncIoctl(AsyncIoctl *req);
          virtual void handle_CimServiceStart(CimServiceStart *req);
          virtual void handle_CimServiceStop(CimServiceStop *req);
          virtual void handle_CimServicePause(CimServicePause *req);
          virtual void handle_CimServiceResume(CimServiceResume *req);
    
          virtual void handle_AsyncOperationStart(AsyncOperationStart *req);
          virtual void handle_AsyncOperationResult(AsyncOperationResult *rep);
          virtual void handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req);
          virtual void handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep);
    
          void _completeAsyncResponse(AsyncRequest *request,
    				 AsyncReply *reply,
    				 Uint32 state,
    				 Uint32 flag);
          void _complete_op_node(AsyncOpNode *, Uint32, Uint32, Uint32);
    
    
          static cimom *_meta_dispatcher;
          static AtomicInt _service_count;
          static Mutex _meta_dispatcher_mutex;
    
       private:
    
          DQueue<AsyncOpNode> _pending;
          AsyncDQueue<AsyncOpNode> _incoming;
          AsyncDQueue<AsyncOpNode> _callback;
    
          static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _req_proc(void *);
          static void _sendwait_callback(AsyncOpNode *, MessageQueue *, void *);
          AtomicInt _incoming_queue_shutdown;
          Thread _req_thread;
          struct timeval _default_op_timeout;
          static AtomicInt _xid;
          friend class cimom;
    };
    
    PEGASUS_NAMESPACE_END
    
    #endif /* Pegasus_MessageQueue_Service_h */
    
    

    Asynchronous Messages

    
    //%LICENSE////////////////////////////////////////////////////////////////
    //
    // Licensed to The Open Group (TOG) under one or more contributor license
    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
    // this work for additional information regarding copyright ownership.
    // Each contributor licenses this file to you under the OpenPegasus Open
    // Source License; you may not use this file except in compliance with the
    // License.
    //
    // Permission is hereby granted, free of charge, to any person obtaining a
    // copy of this software and associated documentation files (the "Software"),
    // to deal in the Software without restriction, including without limitation
    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
    // and/or sell copies of the Software, and to permit persons to whom the
    // Software is furnished to do so, subject to the following conditions:
    //
    // The above copyright notice and this permission notice shall be included
    // in all copies or substantial portions of the Software.
    //
    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
    // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
    //
    //////////////////////////////////////////////////////////////////////////
    //
    // Author: Mike Day (mdday@us.ibm.com
    //
    // Modified By: 
    //
    //////////////////////////////////////////////////////////////////////////
    
    #ifndef Pegasus_CimomMessage_h
    #define Pegasus_CimomMessage_h
    
    #include <Pegasus/Common/Config.h>
    #include <Pegasus/Common/Exception.h>
    #include <Pegasus/Common/MessageQueue.h>
    #include <Pegasus/Common/AsyncOpNode.h>
    
    PEGASUS_NAMESPACE_BEGIN
    //
    // This identifier is the queue id for CIMOM queue. It is initialized in
    // CimomMessage.cpp by calling MessageQueue::getNextQueueId(). Note that
    // this value is passed in the constructor for the CIMOM queue.
    //
    extern const Uint32 CIMOM_Q_ID;
    
    class AsyncOpNode;
    
    class PEGASUS_COMMON_LINKAGE async_results
    {
       public:
          static const Uint32 OK;
          static const Uint32 PARAMETER_ERROR;
          static const Uint32 MODULE_ALREADY_REGISTERED;
          static const Uint32 MODULE_NOT_FOUND;
          static const Uint32 INTERNAL_ERROR;
    
          static const Uint32 ASYNC_STARTED;
          static const Uint32 ASYNC_PROCESSING;
          static const Uint32 ASYNC_COMPLETE;
          static const Uint32 ASYNC_CANCELLED;
          static const Uint32 ASYNC_PAUSED;
          static const Uint32 ASYNC_RESUMED;
    
          static const Uint32 CIM_SERVICE_STARTED;
          static const Uint32 CIM_SERVICE_STOPPED;
          static const Uint32 CIM_SERVICE_PAUSED;
    
          static const Uint32 CIM_SERVICE_RESUMED;
          static const Uint32 CIM_NAK;
    
          static const Uint32 ASYNC_PHASE_COMPLETE;
          static const Uint32 ASYNC_CHILD_COMPLETE;
          static const Uint32 ASYNC_PHASE_STARTED;
          static const Uint32 ASYNC_CHILD_STARTED;
          static const Uint32 CIM_PAUSED;
          static const Uint32 CIM_STOPPED;
    
    };
    
    
    class PEGASUS_COMMON_LINKAGE async_messages
    {
       public:
          static const Uint32 HEARTBEAT;
          static const Uint32 REPLY;
          static const Uint32 REGISTER_CIM_SERVICE;
          static const Uint32 DEREGISTER_CIM_SERVICE;
          static const Uint32 UPDATE_CIM_SERVICE;
          static const Uint32 IOCTL;
          static const Uint32 CIMSERVICE_START;
          static const Uint32 CIMSERVICE_STOP;
          static const Uint32 CIMSERVICE_PAUSE;
          static const Uint32 CIMSERVICE_RESUME;
    
          static const Uint32 ASYNC_OP_START;
          static const Uint32 ASYNC_OP_RESULT;
          static const Uint32 ASYNC_LEGACY_OP_START;
          static const Uint32 ASYNC_LEGACY_OP_RESULT;
    
          static const Uint32 FIND_SERVICE_Q;
          static const Uint32 FIND_SERVICE_Q_RESULT;
          static const Uint32 ENUMERATE_SERVICE;
          static const Uint32 ENUMERATE_SERVICE_RESULT;
    };
    
    
    class PEGASUS_COMMON_LINKAGE AsyncMessage : public Message
    {
       public:
          AsyncMessage(Uint32 type,
    		   Uint32 destination,
    		   Uint32 key,
    		   Uint32 routing,
    		   Uint32 mask,
    		   AsyncOpNode *operation);
    
          virtual ~AsyncMessage(void)
          {
    	
          }
    
          Boolean operator ==(const void *key);
          Boolean operator ==(const AsyncMessage& msg);
    
          AsyncOpNode *op;
    };
    
    
    inline Boolean AsyncMessage::operator ==(const void *key)
    {
       if( key == reinterpret_cast<void *>(this))
          return true;
       return false;
    }
    
    inline Boolean AsyncMessage::operator ==(const AsyncMessage& msg)
    {
       return this->operator==(reinterpret_cast<void *>(const_cast<AsyncMessage *>(&msg)));
    }
    
    
    class PEGASUS_COMMON_LINKAGE AsyncRequest : public AsyncMessage
    {
       public:
          AsyncRequest(Uint32 type,
    		   Uint32 key,
    		   Uint32 routing,
    		   Uint32 mask,
    		   AsyncOpNode *operation,
    		   Uint32 destination,
    		   Uint32 response,
    		   Boolean blocking);
    
    
          virtual ~AsyncRequest(void)
          {
    
          }
    
          Uint32 resp;
          Boolean block;
    };
    
    class PEGASUS_COMMON_LINKAGE AsyncReply : public AsyncMessage
    {
       public:
          AsyncReply(Uint32 type,
    		 Uint32 key,
    		 Uint32 routing,
    		 Uint32 mask,
    		 AsyncOpNode *operation,
    		 Uint32 result_code,
    		 Uint32 destination,
    		 Boolean blocking);
    
    
          virtual ~AsyncReply(void)
          {
    	
          }
    
          Uint32 result;
          Boolean block;
    };
    
    
    
    class PEGASUS_COMMON_LINKAGE RegisterCimService : public AsyncRequest
    {
       public:
          RegisterCimService(Uint32 routing,
    			 AsyncOpNode *operation,
    			 Boolean blocking,
    			 String service_name,
    			 Uint32 service_capabilities,
    			 Uint32 service_mask,
    			 Uint32 service_queue);
    
          virtual ~RegisterCimService(void)
          {
    
          }
    
          String name;
          Uint32 capabilities;
          Uint32 mask;
          Uint32 queue;
    };
    
    class PEGASUS_COMMON_LINKAGE DeRegisterCimService : public AsyncRequest
    {
       public:
          DeRegisterCimService(Uint32 routing,
    			   AsyncOpNode *operation,
    			   Boolean blocking,
    			   Uint32 service_queue);
    
    
          virtual ~DeRegisterCimService(void)
          {
    
          }
    
          Uint32 queue;
    } ;
    
    class PEGASUS_COMMON_LINKAGE UpdateCimService : public AsyncRequest
    {
       public:
          UpdateCimService(Uint32 routing,
    		       AsyncOpNode *operation,
    		       Boolean blocking,
    		       Uint32 service_queue,
    		       Uint32 service_capabilities,
    		       Uint32 service_mask);
    
          virtual ~UpdateCimService(void)
          {
    
          }
    
          Uint32 queue;
          Uint32 capabilities;
          Uint32 mask;
    };
    
    
    class PEGASUS_COMMON_LINKAGE AsyncIoctl : public AsyncRequest
    {
       public:
          AsyncIoctl(Uint32 routing,
    		 AsyncOpNode *operation,
    		 Uint32 destination,
    		 Uint32 response,
    		 Boolean blocking,
    		 Uint32 code,
    		 Uint32 int_param,
    		 void *p_param);
    
          virtual ~AsyncIoctl(void)
          {
    
          }
    
          enum
          {
    	 IO_CLOSE,
    	 IO_OPEN,
    	 IO_SOURCE_QUENCH,
    	 IO_SERVICE_DEFINED
          };
    
    
    
          Uint32 ctl;
          Uint32 intp;
          void *voidp;
    
    };
    
    class PEGASUS_COMMON_LINKAGE CimServiceStart : public AsyncRequest
    {
       public:
          CimServiceStart(Uint32 routing,
    		      AsyncOpNode *operation,
    		      Uint32 destination,
    		      Uint32 response,
    		      Boolean blocking);
    
          virtual ~CimServiceStart(void)
          {
    	
          }
    };
    
    
    class PEGASUS_COMMON_LINKAGE CimServiceStop : public AsyncRequest
    {
       public:
          CimServiceStop(Uint32 routing,
    		     AsyncOpNode *operation,
    		     Uint32 destination,
    		     Uint32 response,
    		     Boolean blocking);
    
          virtual ~CimServiceStop(void)
          {
    
          }
    };
    
    class PEGASUS_COMMON_LINKAGE CimServicePause : public AsyncRequest
    {
       public:
          CimServicePause(Uint32 routing,
    		      AsyncOpNode *operation,
    		      Uint32 destination,
    		      Uint32 response,
    		      Boolean blocking);
    
    
          virtual ~CimServicePause(void)
          {
    
          }
    };
    
    class PEGASUS_COMMON_LINKAGE CimServiceResume : public AsyncRequest
    {
       public:
          CimServiceResume(Uint32 routing,
    		       AsyncOpNode *operation,
    		       Uint32 destination,
    		       Uint32 response,
    		       Boolean blocking);
    
    
          virtual ~CimServiceResume(void)
          {
    
          }
    };
    
    class PEGASUS_COMMON_LINKAGE AsyncOperationStart : public AsyncRequest
    {
       public:
          AsyncOperationStart(Uint32 routing,
    			  AsyncOpNode *operation,
    			  Uint32 destination,
    			  Uint32 response,
    			  Boolean blocking,
    			  Message *action);
    
    
          virtual ~AsyncOperationStart(void)
          {
    	 delete _act;
          }
    
    
          Message *get_action(void ) ;
    
    
       private:
          friend class MessageQueueService;
          friend class cimom;
          Message *_act;
    };
    
    class PEGASUS_COMMON_LINKAGE AsyncOperationResult : public AsyncReply
    {
       public:
          AsyncOperationResult(Uint32 key,
    			   Uint32 routing,
    			   AsyncOpNode *operation,
    			   Uint32 result_code,
    			   Uint32 destination,
    			   Uint32 blocking);
    
    
          virtual ~AsyncOperationResult(void)
          {
    
          }
    };
    
    
    class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationStart : public AsyncRequest
    {
       public:
          AsyncLegacyOperationStart(Uint32 routing,
    				AsyncOpNode *operation,
    				Uint32 destination,
    				Message *action,
    				Uint32 action_destination);
    
    
          virtual ~AsyncLegacyOperationStart(void)
          {
    	 delete _act;
          }
    
          Message *get_action(void);
    
       private:
          friend class MessageQueueService;
          friend class cimom;
          Message *_act;
          Uint32 _legacy_destination;
    
    };
    
    class PEGASUS_COMMON_LINKAGE AsyncLegacyOperationResult : public AsyncReply
    {
       public:
          AsyncLegacyOperationResult(Uint32 key,
    				 Uint32 routing,
    				 AsyncOpNode *operation,
    				 Message *result);
    
          virtual ~AsyncLegacyOperationResult(void)
          {
    	 delete _res;
          }
    
          Message *get_result(void);
    
    
       private:
          friend class MessageQueueService;
          friend class cimom;
          Message *_res;
    };
    
    
    class PEGASUS_COMMON_LINKAGE FindServiceQueue : public AsyncRequest
    {
       public:
          FindServiceQueue(Uint32 routing,
    		       AsyncOpNode *operation,
    		       Uint32 response,
    		       Boolean blocking,
    		       String service_name,
    		       Uint32 service_capabilities,
    		       Uint32 service_mask);
    
          virtual ~FindServiceQueue(void)
          {
    
          }
    
          String name;
          Uint32 capabilities;
          Uint32 mask;
    } ;
    
    class PEGASUS_COMMON_LINKAGE FindServiceQueueResult : public AsyncReply
    {
       public:
          FindServiceQueueResult(Uint32 key,
    			     Uint32 routing,
    			     AsyncOpNode *operation,
    			     Uint32 result_code,
    			     Uint32 destination,
    			     Boolean blocking,
    			     Array<Uint32> queue_ids);
    
    
          virtual ~FindServiceQueueResult(void)
          {
    
          }
    
          Array<Uint32> qids;
    } ;
    
    class PEGASUS_COMMON_LINKAGE EnumerateService : public AsyncRequest
    {
       public:
          EnumerateService(Uint32 routing,
    		       AsyncOpNode *operation,
    		       Uint32 response,
    		       Boolean blocking,
    		       Uint32 queue_id);
    
    
          virtual ~EnumerateService(void)
          {
    
          }
    
          Uint32 qid;
    };
    
    class PEGASUS_COMMON_LINKAGE EnumerateServiceResponse : public AsyncReply
    {
       public:
          EnumerateServiceResponse(Uint32 key,
    			       Uint32 routing,
    			       AsyncOpNode *operation,
    			       Uint32 result_code,
    			       Uint32 response,
    			       Boolean blocking,
    			       String service_name,
    			       Uint32 service_capabilities,
    			       Uint32 service_mask,
    			       Uint32 service_qid);
    
    
          virtual ~EnumerateServiceResponse(void)
          {
    
          }
    
          String name;
          Uint32 capabilities;
          Uint32 mask;
          Uint32 qid;
    };
    
    
    PEGASUS_NAMESPACE_END
    
    #endif // CIMOM_MESSAGE_include
    
    

    AsyncOpNode

    
    //%LICENSE////////////////////////////////////////////////////////////////
    //
    // Licensed to The Open Group (TOG) under one or more contributor license
    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
    // this work for additional information regarding copyright ownership.
    // Each contributor licenses this file to you under the OpenPegasus Open
    // Source License; you may not use this file except in compliance with the
    // License.
    //
    // Permission is hereby granted, free of charge, to any person obtaining a
    // copy of this software and associated documentation files (the "Software"),
    // to deal in the Software without restriction, including without limitation
    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
    // and/or sell copies of the Software, and to permit persons to whom the
    // Software is furnished to do so, subject to the following conditions:
    //
    // The above copyright notice and this permission notice shall be included
    // in all copies or substantial portions of the Software.
    //
    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
    // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
    //
    //////////////////////////////////////////////////////////////////////////
    //
    // Author: Mike Day (mdday@us.ibm.com
    //
    // Modified By: 
    //
    //////////////////////////////////////////////////////////////////////////
    
    
    #ifndef Pegasus_AsyncOpNode_h
    #define Pegasus_AsyncOpNode_h
    
    #include <Pegasus/Common/Config.h>
    #include <Pegasus/Common/Message.h>
    #include <Pegasus/Common/OperationContext.h>
    #include <Pegasus/Common/internal_dq.h>
    #include <Pegasus/Common/IPC.h>
    
    PEGASUS_NAMESPACE_BEGIN
    
    // ATTN usage of flags and state is inconsistent
    // << Wed Jan 16 17:41:57 2002 mdd >>
    // resolved mdd 
    
    
    #define ASYNC_OPFLAGS_UNKNOWN           0x00000000
    #define ASYNC_OPFLAGS_INTERVAL_REPEAT   0x00000010
    #define ASYNC_OPFLAGS_INDICATION        0x00000020
    #define ASYNC_OPFLAGS_REMOTE            0x00000040
    #define ASYNC_OPFLAGS_LOCAL_OUT_OF_PROC 0x00000080
    #define ASYNC_OPFLAGS_PHASED            0x00000001
    #define ASYNC_OPFLAGS_PARTIAL           0x00000002
    #define ASYNC_OPFLAGS_NORMAL            0x00000000
    #define ASYNC_OPFLAGS_SINGLE            0x00000008
    #define ASYNC_OPFLAGS_MULTIPLE          0x00000010
    #define ASYNC_OPFLAGS_TOTAL             0x00000020
    #define ASYNC_OPFLAGS_META_DISPATCHER   0x00000040
    #define ASYNC_OPFLAGS_FIRE_AND_FORGET   0x00000080
    #define ASYNC_OPFLAGS_SIMPLE_STATUS     0x00000100
    #define ASYNC_OPFLAGS_CALLBACK          0x00000200
    #define ASYNC_OPFLAGS_FORWARD           0x00000400
    
    #define ASYNC_OPSTATE_UNKNOWN           0x00000000
    #define ASYNC_OPSTATE_OFFERED           0x00000001
    #define ASYNC_OPSTATE_DECLINED          0x00000002
    #define ASYNC_OPSTATE_STARTED           0x00000004
    #define ASYNC_OPSTATE_PROCESSING        0x00000008
    #define ASYNC_OPSTATE_DELIVER           0x00000010
    #define ASYNC_OPSTATE_RESERVE           0x00000020
    #define ASYNC_OPSTATE_COMPLETE          0x00000040
    #define ASYNC_OPSTATE_TIMEOUT           0x00000080
    #define ASYNC_OPSTATE_CANCELLED         0x00000100
    #define ASYNC_OPSTATE_PAUSED            0x00000200
    #define ASYNC_OPSTATE_SUSPENDED         0x00000400
    #define ASYNC_OPSTATE_RESUMED           0x00000800
    #define ASYNC_OPSTATE_ORPHANED          0x00001000
    #define ASYNC_OPSTATE_RELEASED          0x00002000
    
    class Cimom;
    class Thread;
    
    class PEGASUS_COMMON_LINKAGE AsyncOpNode
    {
       public:
    
          AsyncOpNode(void);
          ~AsyncOpNode(void);
    
          Boolean  operator == (const void *key) const;
          Boolean operator == (const AsyncOpNode & node) const;
    
          void get_timeout_interval(struct timeval *buffer) ;
          void set_timeout_interval(const struct timeval *interval);
    
          Boolean timeout(void)  ;
    
          OperationContext & get_context(void) ;
    
          void put_request(const Message *request) ;
          Message *get_request(void) ;
    
          void put_response(const Message *response) ;
          Message *get_response(void) ;
    
          Uint32 read_state(void) ;
          void write_state(Uint32) ;
    
          Uint32 read_flags(void);
          void write_flags(Uint32);
    
          void lock(void)  throw(IPCException);
          void unlock(void) throw(IPCException);
          void udpate(void) throw(IPCException);
          void deliver(const Uint32 count) throw(IPCException);
          void reserve(const Uint32 size) throw(IPCException);
          void processing(void) throw(IPCException) ;
          void processing(OperationContext *context) throw(IPCException);
          void complete(void) throw(IPCException) ;
          void complete(OperationContext *context) throw(IPCException);
          void release(void);
          void wait(void);
    
    
       private:
          Semaphore _client_sem;
          Mutex _mut;
          unlocked_dq<Message> _request;
          unlocked_dq<Message> _response;
    
          OperationContext _operation_list;
          Uint32 _state;
          Uint32 _flags;
          Uint32 _offered_count;
          Uint32 _total_ops;
          Uint32 _completed_ops;
          Uint32 _user_data;
          Uint32 _completion_code;
          MessageQueue *_op_dest;
    
          struct timeval _start;
          struct timeval _lifetime;
          struct timeval _updated;
          struct timeval _timeout_interval;
    
          AsyncOpNode *_parent;
          unlocked_dq<AsyncOpNode> _children;
    
          void _reset(unlocked_dq<AsyncOpNode> *dst_q);
    
          // the lifetime member is for cache management by the cimom
          void _set_lifetime(struct timeval *lifetime) ;
          Boolean _check_lifetime(void) ;
    
          Boolean _is_child(void) ;
          Uint32 _is_parent(void) ;
          Boolean _is_my_child(const AsyncOpNode & caller) const;
          void _make_orphan( AsyncOpNode & parent) ;
          void _adopt_child(AsyncOpNode *child) ;
          void _disown_child(AsyncOpNode *child) ;
          void (*_async_callback)(AsyncOpNode *,
    			      MessageQueue *,
    			      void *);
          // << Tue Mar 12 14:44:51 2002 mdd >>
          // pointers for async callbacks  - don't use 
          AsyncOpNode *_callback_node;
          MessageQueue *_callback_response_q;
          void *_callback_ptr;
          MessageQueue *_callback_request_q;
          //      << Tue Mar 12 14:44:53 2002 mdd >>
          // pointers to help static class message handlers - don't use 
          MessageQueue *_service_ptr;
          Thread *_thread_ptr;
    
          friend class cimom;
          friend class MessageQueueService;
    
    };
    
    
    inline Boolean AsyncOpNode::operator == (const void *key) const
    {
       if (key == (void *)this)
          return true;
       return false;
    }
    
    inline Boolean AsyncOpNode::operator == (const AsyncOpNode & node) const
    {
       return AsyncOpNode::operator==((const void *)&node);
    }
    
    
    inline void AsyncOpNode::get_timeout_interval(struct timeval *buffer)
    {
       if(buffer != 0)
       {
          _mut.lock( pegasus_thread_self() );
          buffer->tv_sec = _timeout_interval.tv_sec;
          buffer->tv_usec = _timeout_interval.tv_usec;
          _mut.unlock();
       }
       return;
    }
    
    inline void AsyncOpNode::set_timeout_interval(const struct timeval *interval)
    {
       if(interval != 0)
       {
          _mut.lock(pegasus_thread_self());
          _timeout_interval.tv_sec = interval->tv_sec;
          _timeout_interval.tv_usec = interval->tv_usec;
          gettimeofday(&_updated, NULL);
          _mut.unlock();
       }
    }
    
    
    inline Boolean AsyncOpNode::timeout(void)
    {
       struct timeval now;
       gettimeofday(&now, NULL);
       Boolean ret = false;
       _mut.lock(pegasus_thread_self());
       if((_updated.tv_sec + _timeout_interval.tv_sec ) < now.tv_sec)
          if((_updated.tv_usec + _timeout_interval.tv_usec ) < now.tv_usec)
    	 ret =  true;
       _mut.unlock();
       return ret;
    }
    
    // context is now a locked list
    inline OperationContext & AsyncOpNode::get_context(void)
    {
       gettimeofday(&_updated, NULL);
       return _operation_list;
    }
    
    
    inline  void AsyncOpNode::put_request(const Message *request)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       if( false == _request.exists(reinterpret_cast<void *>(const_cast<Message *>(request))) )
       _request.insert_last( const_cast<Message *>(request) ) ;
    
    //   _request = const_cast<Message *>(request);
    
       _mut.unlock();
    }
    
    inline Message * AsyncOpNode::get_request(void)
    {
       Message *ret;
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       ret = _request.remove_first() ;
    //   ret = _request;
    
       _mut.unlock();
       return ret;
    }
    
    inline void AsyncOpNode::put_response(const Message *response)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       if (false == _response.exists(reinterpret_cast<void *>(const_cast<Message *>(response))))
       _response.insert_last( const_cast<Message *>(response) );
    
    //   _response = const_cast<Message *>(response);
    
       _mut.unlock();
    }
    
    inline Message * AsyncOpNode::get_response(void)
    {
       Message *ret;
    
       _mut.lock(pegasus_thread_self());
    //   gettimeofday(&_updated, NULL);
       ret = _response.remove_first();
    //   ret = _response;
    
       _mut.unlock();
       return ret;
    }
    
    inline Uint32 AsyncOpNode::read_state(void)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       Uint32 ret = _state;
       _mut.unlock();
       return ret;
    
    }
    
    inline void AsyncOpNode::write_state(Uint32 state)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       _state = state;
       _mut.unlock();
    }
    
    inline Uint32 AsyncOpNode::read_flags(void)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       Uint32 ret = _flags;
       _mut.unlock();
       return ret;
    }
    
    inline void AsyncOpNode::write_flags(Uint32 flags)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       _flags = flags;
       _mut.unlock();
    }
    
    
    inline  void AsyncOpNode::lock(void)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
    }
    
    inline void AsyncOpNode::unlock(void)
       throw(IPCException)
    {
       _mut.unlock();
    }
    
    inline void AsyncOpNode::udpate(void)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       gettimeofday(&_updated, NULL);
       _mut.unlock();
       return;
    }
    
    inline void AsyncOpNode::deliver(const Uint32 count)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       _completed_ops = count;
       _state |= ASYNC_OPSTATE_DELIVER;
       gettimeofday(&_updated, NULL);
       _mut.unlock();
       return;
    }
    
    inline void AsyncOpNode::reserve(const Uint32 size)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       _total_ops = size;
       _state |= ASYNC_OPSTATE_RESERVE;
       gettimeofday(&_updated, NULL);
       _mut.unlock();
       return;
    }
    
    inline void AsyncOpNode::processing(void)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       _state |= ASYNC_OPSTATE_PROCESSING;
       gettimeofday(&_updated, NULL);
       _mut.unlock();
       return;
    }
    
    // con will be empty upon return of this member function
    inline void AsyncOpNode::processing(OperationContext *con)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       _state |= ASYNC_OPSTATE_PROCESSING;
       gettimeofday(&_updated, NULL);
    
       context *c = con->remove_context();
       while(c != 0)
       {
          _operation_list.add_context(c);
          c = con->remove_context();
       }
       _mut.unlock();
       return;
    }
    
    inline void AsyncOpNode::complete(void)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       _state |= ASYNC_OPSTATE_COMPLETE;
       gettimeofday(&_updated, NULL);
       _mut.unlock();
    
       return;
    }
    
    inline void AsyncOpNode::complete(OperationContext *con)
       throw(IPCException)
    {
       _mut.lock(pegasus_thread_self());
       _state |= ASYNC_OPSTATE_COMPLETE;
       gettimeofday(&_updated, NULL);
       context *c = con->remove_context();
       while(c != 0)
       {
          _operation_list.add_context(c);
          c = con->remove_context();
       }
       _mut.unlock();
    }
    
    inline void AsyncOpNode::wait(void)
    {
       _client_sem.wait();
    }
    
    inline void AsyncOpNode::release(void)
    {
       _mut.lock(pegasus_thread_self());
       _state |= ASYNC_OPSTATE_RELEASED;
       _mut.unlock();
    }
    
    inline  void AsyncOpNode::_set_lifetime(struct timeval *lifetime)
    {
       _mut.lock(pegasus_thread_self());
       _lifetime.tv_sec = lifetime->tv_sec;
       _lifetime.tv_usec = lifetime->tv_usec;
       _mut.unlock();
    }
    
    inline Boolean AsyncOpNode::_check_lifetime(void)
    {
       struct timeval now;
    
       gettimeofday(&now, NULL);
       if((_start.tv_sec + _lifetime.tv_sec ) >= now.tv_sec)
          if((_start.tv_usec + _lifetime.tv_usec ) >= now.tv_usec)
    	 return true;
       return false;
    }
    
    inline Boolean AsyncOpNode::_is_child(void)
    {
       if (_parent != 0)
          return true;
       return false;
    }
    
    inline Uint32 AsyncOpNode::_is_parent(void)
    {
       return _children.count();
    }
    
    inline Boolean AsyncOpNode::_is_my_child(const AsyncOpNode & caller) const
    {
       if ( _parent == &caller )
          return true;
       return false;
    }
    
    inline void AsyncOpNode::_make_orphan( AsyncOpNode & parent)
    {
       if( _parent == &parent )
       {
          _parent = NULL;
          parent._children.remove(this);
       }
       else
          throw Permission(pegasus_thread_self());
    }
    
    inline void AsyncOpNode::_adopt_child(AsyncOpNode *child)
    {
       if(child == NULL)
          throw NullPointer();
       if(true == child->_is_child())
          throw Permission(pegasus_thread_self());
       child->_parent = this;
       _children.insert_last(child);
    }
    
    inline void AsyncOpNode::_disown_child(AsyncOpNode *child)
    {
       if(child == NULL)
          throw NullPointer();
       if( false == child->_is_child() || false == child->_is_my_child( *this ))
          throw Permission(pegasus_thread_self());
       child->_make_orphan( *this );
       _children.remove(child);
    }
    
    PEGASUS_NAMESPACE_END
    
    #endif //Pegasus_AsyncOpNode_h
    
    

    Michael Day
    Last modified: Wed Mar 13 12:26:56 EST 2002