PEP #: 5

TITLE: Proposal to Fix Indications in Pegasus

Version: $Revision: 1.1.2.1 $
Author: Mike Day
State: Draft
Type: Architecture
Vote:
Created: 6 January 2003
Version 1.0 Mon Jan 6 17:39:09 2003

Summary

This document details my experiences getting CIM Indications to work with the Pegasus CIMOM. It explains the changes I had to make, the bugs I had to fix, and then makes some design recommendations for integrating Indication support into the Open Group CVS tree.

What Works?

I have Pegasus supporting Indications, subscriptions, and handlers. It works really well. All the CIM operations related to indications and subscriptions work. I have tests and test providers.

Proposal

Commit the changes outlined in this document to the development (main) branch of the TOG CVS for Pegasus for inclusion in Pegasus release 2.2

Terminology

Early feedback on this document has requested clarification of some terms I use later.

Response Handler
Callback class that receives operation results from Providers. This object heirarchy was designed specifically to support asynchronous result delivery from providers, such as Indications. Examples include OperationResponseHandler and EnableIndicationsResponseHandler
Indication Consumer Provider
The C++ object that receives the CIMIndication for handling. This proposal defines a new type of provider that supports the CIMIndicationConsumer interface as defined by Chip Vincent. The CIMIndicationConsumer replaces the IndicationHandler that is currently in the source tree. However, the IndicationHandler can be adjacent (post) to the CIMIndicationConsumer in the indication processing path if required for compatibilty.
Indication Handler
Existing service in Pegasus for receiving indications and dispatching them to applications. This service has never been supported, and I propose to replace it with the Indication Consumer Provider (above). It can be optionally layered upon the Indication Consumer Provider if required.
Export Server
Pegasus Service that exports CIMIndication instances to other Pegasus instances. I do not refer to the Export Server in this proposal. Its use is compatible with this proposal but optional.
Export Client
Pegasus Service that listens for and receives CIMIndication instances exported by other Pegasus instances. I do not refer to the Export Client in this proposal. Its use is compatible with this proposal but optional.

Table of Contents

  1. My Plan of Attack
  2. What did I have to Do?
  3. Where is the Source Code?
  4. What is the Status of Indication Support in the CVS?
  5. Indication Consumer Provider
  6. Elusive Bugs I Fixed
  7. Indication Consumer Support Code
  8. Provider Schema Modifications
  9. Provider Registration Manager Modifications
  10. Instantiating the Modified Schema and Providers
  11. Indication Provider
  12. Indication Consumer Provider
  13. Indication Processing Pathway

My Plan of Attack

There is a ton of rich functionality in the existing Pegasus code base for handling Indications and Subscriptions. The problem is, none of it has been tested due to gating bugs in the Pegasus core.

All of the subscription options, policies, and error handling is fantastic code but causes real problems when all you want to do is get the underlying mechanisms to work. In fact, the "commercial grade" features of the IndicationService tend to obfuscate the underlying breaks in the Pegasus core that are gating Indication support. So I embarked on a simplification strategy.

Basic Indication Handling should be easy, and robust Indication Handling should be possible. (Yes, I stole this quote from Larry Wall.) To this end I removed some of the functionality from the IndicationService, such as enforcing the schema and implementing policy.

I wrote a basic IndicationService and ConsumerProvider that stresses the core of Pegasus and assures the functionality of Indications and Subscriptions.

If I need error handling policy and schema enforcement, I should be able to create or reactivate the more robust HP Indication Service and substitute it for the simple one. Or I can graft features from the HP service to the simple one. But I shouldn't have to use the untested complex features to debug the inner workings of Pegasus.

What Did I Have to Do to get Indications Working?

A little bit of everything. This is an area of Pegasus that has never been completed nore tested. The table below shows the steps I had to take, in order, to get this work completed.

    Tasks I Completed to Get Indications working in Pegasus

  1. Fix some elusive bugs.
  2. Successfully run the existing wbem-exec tests in the test/wetest/cimv2/subscription directory.
  3. Subclass the PG_Provider and PG_Indication Schemata to allow Providers to be Indication Handlers.
  4. Modify the ProviderRegistrationManager to allow for the new "type 6" provider (see above).
  5. Create a new CIMConsumeIndication Message (see item 3 above).
  6. Rewrite the IndicationService to redirect Indications to the ProviderManager (see items 3-5 and above).
  7. Modify the ProviderManager to treat CIMConsumerProvider modules differently because they are asynchronous by definition.
  8. Instantiate the new Providers in a compiled Schema. (I piggy-backed on the existing repository build scripts.)
  9. Write a CIMIndicationProvider and a CIMIndicationConsumer and start testing.

Where is the source code?

IBM has, out of necessity, maintained a version of Pegasus that has the original C++ provider interfaces. We did this to support existing providers that we shipped prior to the interface changes this past fall. I did the work necessary to support indications on IBM's "classic" snapshot of the source repository. That's where the source code is. Most of the changes were not related to provider interfaces so merging indication support into the Open Group CVS is feasible.

Consider this document as a design proposal for indication support; and also as a proposal for checking the source code into the Open Group CVS.

I also anticipate making this available as a snapshot on an ftp server.

What is the status of Indication Support in the Open Group CVS?

Indications in the Open Group Pegasus 2.x do not work and are not close to working. There are bugs that need to be fixed and design changes that need to be made. They are fairly completely discussed in this document.

Indication Consumer Provider

Chip Vincent and I have long preferred to use Pegasus providers to handle indications. Doing so removes the need for a separate IndicationHandler service. Instead, the handler can reuse all the infrastructure that exists for Providers.

Another benefit of using Pegasus providers to handle indications is that developers do not need to learn another service interface. The provider interface is sufficient. Also, this means that we (Pegasus developers) do not need to provide separate developer support for handling indications and generating indications.

Once a CIMIndicationProvider routine generates a CIMIndication, the modified OperationResponseHandler creates a CIMConsumeIndication message and passes the indication to the ProviderManagerService; the provider manager then finds the "handler," which is really a provider, and calls provider.handleIndication().

There is no need for a separate handler service, which makes error handling and debugging simpler.

Tasks to Support Indication Consumer Providers

This was not as simple as activiating the CIMIndicationConsumer code that has been in the source tree for quite some time. I had to ensure that this new type of provider was supported throughout Pegasus.

  1. Indication Consumer Support Code
  2. Provider Schema Modifications
  3. Provider Registration Manager Modifications
  4. Instantiating the Modified Schema and Providers

Elusive Bugs I Fixed

  1. Indication Response Handler Allocated on Stack

  2. Response Handler Created with Reference to Transient Messages

  3. Converting References to CIMInstances

  4. Pegasus Array Template and Virtual Base Classes in Provider Heirarchy

  5. Double Deletes in Redirected Message Pathways

Indication Response Handler allocated on stack

The ProviderManagerService was creating an indication response handler within a local code block and passing that to the Indication Provider with an EnableIndication message. After the ProviderManagerService returned from enabling the indication, response handler went out of scope and was destroyed. When the Indication Provider called handler.deliver(indication), it was using a reference to an object that was destroyed. The result was a core dump.

The fix - allocate response handler from heap and store for the life of the indication enablement

The important lines for enabling a new subscription are 32 and 58.

The important line for disabling an existing subscription is 136.

  1 void ProviderManagerService::handleEnableIndicationsRequest(
  2    AsyncOpNode *op, const Message * message) throw()
  3 {
  4    PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, 
  5 		    "ProviderManagerService:: handleEnableIndicationsRequest");
  6    CIMEnableIndicationsRequestMessage * request =
  7       dynamic_cast<CIMEnableIndicationsRequestMessage *>(
  8 	 const_cast<Message *>(message));
  9 
 10    AsyncRequest *async = 
 11       static_cast<AsyncRequest *>(op->_request.next(0));
 12 
 13    PEGASUS_ASSERT(request != 0 && async != 0 );
 14 
 15    CIMEnableIndicationsResponseMessage * response =
 16       new CIMEnableIndicationsResponseMessage(
 17 	 request->messageId,
 18 	 CIMException(),
 19 	 request->queueIds.copyAndPop());
 20 
 21    PEGASUS_ASSERT(response != 0);
 22 
 23    // preserve message key
 24    response->setKey(request->getKey());
 25 
 26    response->dest = request->queueIds.top();
 27 
 28    // Allocate the response handler from the heap using new (). 
 29    // Otherwise it will go out of scope and the indication, which 
 30    // happens later, will go off into outer space...
 31 
 32    EnableIndicationsResponseHandler *handler = 
 33       new EnableIndicationsResponseHandler(request, response, this);
 34 
 35    try
 36    {
 37       // get the provider file name and logical name
 38       Triad<String, String, String> triad =
 39 	 _getProviderRegPair(request->provider, request->providerModule);
 40 
 41       // get cached or load new provider module
 42       Provider provider =
 43 	 providerManager.getProvider(triad.first, triad.second, triad.third);
 44 
 45       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
 46 		       "Calling provider.enableIndications: " + 
 47 		       provider.getName());
 48        
 49       provider.enableIndications(*handler);
 50 
 51       // if no exception, store the handler so it is persistent for as 
 52       // long as the provider has indications enabled. 
 53       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
 54 		       "Storing indication handler for " + provider.getName());
 55        
 56 
 57       // keep indication handlers in a hash table keyed by the provider. 
 58       _insertEntry(provider, handler);
 59    }
 60    catch(CIMException & e)
 61    {
 62       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
 63 		       "Exception: " + e.getMessage());
 64       response->cimException = CIMException(e);
 65    }
 66    catch(Exception & e)
 67    {
 68       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
 69 		       "Exception: " + e.getMessage());
 70       response->cimException = CIMException(CIM_ERR_FAILED, "Internal Error");
 71    }
 72    catch(...)
 73    {
 74       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
 75 		       "Exception: Unknown");
 76       response->cimException = CIMException(CIM_ERR_FAILED, "Unknown Error");
 77    }
 78        
 79    AsyncLegacyOperationResult *async_result =
 80       new AsyncLegacyOperationResult(
 81 	 async->getKey(),
 82 	 async->getRouting(),
 83 	 op,
 84 	 response);
 85 
 86    _complete_op_node(op, ASYNC_OPSTATE_COMPLETE, 0, 0);
 87    PEG_METHOD_EXIT();
 88 }
 89 
 90 void ProviderManagerService::handleDisableIndicationsRequest(
 91    AsyncOpNode *op, const Message * message) throw()
 92 {
 93    PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, 
 94 		    "ProviderManagerService::handleDisableIndicationsRequest");
 95    CIMDisableIndicationsRequestMessage * request =
 96       dynamic_cast<CIMDisableIndicationsRequestMessage *>(
 97 	 const_cast<Message *>(message));
 98 
 99    AsyncRequest *async = static_cast<AsyncRequest *>(op->_request.next(0));
100 
101    PEGASUS_ASSERT(request != 0 && async != 0 );
102 
103    CIMDisableIndicationsResponseMessage * response =
104       new CIMDisableIndicationsResponseMessage(
105 	 request->messageId,
106 	 CIMException(),
107 	 request->queueIds.copyAndPop());
108 
109    PEGASUS_ASSERT(response != 0);
110 
111    // preserve message key
112    response->setKey(request->getKey());
113 
114    OperationResponseHandler<CIMIndication> handler(request, response);
115    try
116    {
117       // get the provider file name and logical name
118       Triad<String, String, String> triad =
119 	 _getProviderRegPair(request->provider, request->providerModule);
120 
121       // get cached or load new provider module
122       Provider provider =
123 	 providerManager.getProvider(triad.first, triad.second, triad.third);
124       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
125 		       "Calling provider.disableIndications: " + 
126 		       provider.getName());
127 	
128       provider.disableIndications();
129 
130       // Now that indications are disabled we can extract and remove the 
131       // indication response handler 
132       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
133 		       "Removing and Destroying indication handler for " + 
134 		       provider.getName());
135 	
136       delete _removeEntry(_generateKey(provider));
137    }
138 
139    catch(CIMException & e)
140    {
141       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
142 		       "Exception: " + e.getMessage());
143       response->cimException = CIMException(e);
144    }
145    catch(Exception & e)
146    {
147       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
148 		       "Exception: " + e.getMessage());
149       response->cimException = CIMException(CIM_ERR_FAILED, "Internal Error");
150    }
151    catch(...)
152    {
153       PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 
154 		       "Exception: Unknown");
155       response->cimException = CIMException(CIM_ERR_FAILED, "Unknown Error");
156    }
157 
158    AsyncLegacyOperationResult *async_result =
159       new AsyncLegacyOperationResult(
160 	 async->getKey(),
161 	 async->getRouting(),
162 	 op,
163 	 response);
164 
165    _complete_op_node(op, ASYNC_OPSTATE_COMPLETE, 0, 0);
166    PEG_METHOD_EXIT();
167 }

Response Handler Created with Reference to Transient Messages

The OperationResponseHandler and derivative classes are constructed with a reference to CIMRequest and CIMResponse messages (i.e., the request/response pair related to the operation). These references were stored and used during the processing of responses. These references pointed to message that were deleted when the thread of execution left the dispatcher. For most CIM operations this is not a problem. However, in the case of indications the responses occur asynchronously. This caused the message references to point to deleted memory. Even this did not manifest a problem until the response handler was destroyed, which occured when the DisableIndications message made its way to the ProviderManagerService.

The fix - copy the messages passed to the response handler.

The lines that implement the fix are 14 and 15 and 102 and 103.

This is a fix that, in my opinion, should be generally applied to all response handlers.

  1 ////////////////////-*-c++-*-///////////////////////////////////////////
  2 class EnableIndicationsResponseHandler : 
  3    public OperationResponseHandler<CIMIndication>
  4 {
  5    public:
  6       EnableIndicationsResponseHandler(
  7 	 CIMEnableIndicationsRequestMessage * request,
  8 	 CIMEnableIndicationsResponseMessage * response,
  9 	 MessageQueueService * source,
 10 	 MessageQueueService * target = 0)
 11 	 : OperationResponseHandler<CIMIndication>(request, response),
 12 	   _source(source),
 13 	   _target(target),
 14 	   _request_copy(*request),
 15 	   _response_copy(*response)
 16       {
 17         PEGASUS_ASSERT(_source != 0);
 18 
 19         // get indication service
 20         if(_target == 0)
 21         {
 22             Array<Uint32> serviceIds;
 23 
 24             _source->find_services(
 25 	       PEGASUS_QUEUENAME_ESERVER_INDICATIONSERVICE, 
 26 	       0, 0, &serviceIds);
 27 
 28             PEGASUS_ASSERT(serviceIds.size() != 0);
 29 
 30             _target = dynamic_cast<MessageQueueService *>(
 31 	       MessageQueue::lookup(serviceIds[0]));
 32 
 33             PEGASUS_ASSERT(_target != 0);
 34         }
 35     }
 36 
 37     virtual void deliver(const CIMIndication & cimIndication)
 38     {
 39         OperationContext context;
 40 
 41         deliver(context, cimIndication);
 42     }
 43 
 44     virtual void deliver(const OperationContext & context, 
 45 			 const CIMIndication & cimIndication)
 46     {
 47         // ATTN: temporarily convert indication to instance
 48         CIMInstance cimInstance(cimIndication);
 49 
 50         // create message
 51         CIMProcessIndicationRequestMessage * request =
 52             new CIMProcessIndicationRequestMessage(
 53 	       _request_copy.messageId,
 54             cimInstance.getPath().getNameSpace(),
 55             cimInstance,
 56             QueueIdStack(_target->getQueueId(), _source->getQueueId()));
 57 
 58         // send message
 59         // <<< Wed Apr 10 21:04:00 2002 mdd >>>
 60         // AsyncOpNode * op = _source->get_op();
 61 
 62         AsyncLegacyOperationStart * asyncRequest =
 63             new AsyncLegacyOperationStart(
 64             _source->get_next_xid(),
 65             0,
 66             _target->getQueueId(),
 67             request,
 68             _target->getQueueId());
 69 
 70         PEGASUS_ASSERT(asyncRequest != 0);
 71 
 72         //AsyncReply * asyncReply = _source->SendWait(asyncRequest);
 73         // <<< Wed Apr 10 21:04:50 2002 mdd >>>
 74         _source->SendForget(asyncRequest);
 75         //PEGASUS_ASSERT(asyncReply != 0);
 76 
 77         //  Chip - receiver of the request should delete it
 78         //delete asyncRequest;
 79         // <<< Wed Apr 10 21:05:10 2002 mdd >>>
 80     }
 81 
 82     virtual void deliver(const Array<CIMIndication> & cimIndications)
 83     {
 84         OperationContext context;
 85 
 86         deliver(context, cimIndications);
 87     }
 88 
 89     virtual void deliver(const OperationContext & context, 
 90 			 const Array<CIMIndication> & cimIndications)
 91     {
 92         for(Uint32 i = 0, n = cimIndications.size(); i < n; i++)
 93         {
 94             deliver(context, cimIndications[i]);
 95         }
 96     }
 97 
 98    protected:
 99       MessageQueueService * _source;
100       MessageQueueService * _target;
101    private:
102       CIMEnableIndicationsRequestMessage _request_copy;
103       CIMEnableIndicationsResponseMessage _response_copy;
104 };

Converting References to CIMInstances

References stored in subscriptions and providers were sometimes being handled incorrectly in the IndicationService and the ProviderRegistrationManager, at least when the relationship between class names, class properties, and reference properties was concerned. Reference handling in general could use a library of helper routines. It is too error prone right now to extract a reference and turn it into a CIMInstance.

As I was reviewing the IndicationService I noticed that a lot of the "dereferencing" of references was done in fairly long code segments that repeated themselves several times with only slight differences. I decided to distill that code into some more compact helper routines. I started on this but haven't finished yet.

Partial Fix: helper routines

I started working on some helper routines in my re-written IndicationService. Distilling a few routines into lower-level helpers allowed my to jettison a lot of code. As far as I could tell the code in the original Indication Service that I replaced should work just fine but I couldn't get it to. I felt that distilling the code would give me an edge in fixing underlying Pegasus bugs. I still think the bugs were elsewhere (not in the IndicationService) but I can't prove it.

In general I think that Pegasus could benefit greatly from a library of similar, more general, static helper routines to deal with name spaces and the conversion between references and the instances they refer to.

I also did some similar rewrites in the ProviderRegistrationManager.

  1 
  2 //%/////////////////////////////////////////////////////////////////
  3 // Take an association class and a name of one or the references. 
  4 // Return a CIMObjectPath that contains the instance of the referred 
  5 // to object. 
  6 ///////////////////////////////////////////////////////////////////
  7 Boolean eServerIndicationService::_get_object_path_from_association(
  8    const String & reference_name,
  9    const CIMInstance & association, 
 10    CIMObjectPath & path) const 
 11 {
 12    
 13    path.clear();
 14    try
 15    {
 16       CIMValue path_value = association.getProperty(
 17 	 association.findProperty(reference_name)).getValue();
 18       path_value.get(path);
 19    }
 20    catch(...)
 21    {
 22       return false;
 23    }
 24    
 25    return true;
 26 }
 27 
 28 //%/////////////////////////////////////////////////////////////////
 29 // Get all the name spaces in the repository. 
 30 // This should really be a static method that any class should be 
 31 // able to call
 32 ///////////////////////////////////////////////////////////////////
 33 void eServerIndicationService::_getNameSpaceNames (
 34    Array<String> & nameSpaceNames) const
 35 {
 36    
 37    PEG_METHOD_ENTER (TRC_INDICATION_SERVICE,
 38 		     "eServerIndicationService::__getNameSpaceNames");
 39    nameSpaceNames.clear();
 40    
 41    _repository->read_lock ();
 42    
 43    try
 44    {
 45       nameSpaceNames = _repository->enumerateNameSpaces ();
 46    }
 47    catch (Exception &)
 48    {
 49       _repository->read_unlock ();
 50       PEG_METHOD_EXIT ();
 51    }
 52    
 53    _repository->read_unlock ();
 54    
 55    PEG_METHOD_EXIT ();
 56 }
 57 
 58 //%/////////////////////////////////////////////////////////////////
 59 // Get all subscriptions that refer to a specific indication class
 60 // returns subscriptions in all name spaces. Calls lower-level
 61 // _getMatchingSubscriptions. 
 62 ///////////////////////////////////////////////////////////////////
 63 void eServerIndicationService::_getMatchingSubscriptions(
 64    const CIMInstance & indication,
 65    Array<CIMInstance> & subscriptions) const 
 66 {
 67    Array<String> nameSpaceNames;
 68    _getNameSpaceNames(nameSpaceNames);
 69    
 70    for(Uint8 i = 0; i < nameSpaceNames.size() ; ++i)
 71    {
 72       _getMatchingSubscriptions( nameSpaceNames[i],
 73 				 indication, 
 74 				 subscriptions);
 75    }
 76 }
 77 
 78 
 79 //%/////////////////////////////////////////////////////////////////
 80 // Get all subscriptions that refer to a specific indication class
 81 // for a specific name space. 
 82 ///////////////////////////////////////////////////////////////////
 83 void eServerIndicationService::_getMatchingSubscriptions(
 84    const String & nameSpaceName, 
 85    const CIMInstance & indication,
 86    Array<CIMInstance> & subscriptions) const
 87 {
 88    Array<CIMInstance> all_subscriptions;
 89    _getSubscriptions(nameSpaceName, all_subscriptions);
 90    
 91    String indication_class_name = indication.getClassName();
 92    CIMObjectPath compare_path;
 93    
 94 
 95    for(Uint8 x = 0; 
 96        x < all_subscriptions.size() ; 
 97        ++x, compare_path.clear())
 98    {
 99       _get_object_path_from_association("Indication_Class", 
100 					all_subscriptions[x],
101 					compare_path);
102       if(compare_path.getClassName() == indication_class_name)
103       {
104 	 CIMObjectPath temp = all_subscriptions[x].getPath();
105 	 temp.setNameSpace(nameSpaceName);
106 	 all_subscriptions[x].setPath(temp);
107 	 subscriptions.append(all_subscriptions[x]);
108       }
109     }
110    return;
111 }
112 
113 //%/////////////////////////////////////////////////////////////////
114 // Get all subscriptions in the repository for a specific name space
115 ///////////////////////////////////////////////////////////////////
116 void eServerIndicationService::_getSubscriptions (
117    const String & nameSpaceName, 
118    Array <CIMInstance> & subscriptions) const
119 {
120    
121 
122    PEG_METHOD_ENTER (TRC_INDICATION_SERVICE,
123 		     "eServerIndicationService::_getSubscriptions");
124    
125    //
126    //  Get existing subscriptions in current namespace
127    //
128    _repository->read_lock ();
129 
130    try
131    {
132       subscriptions  = _repository->enumerateInstances
133 	 (nameSpaceName, ESERVER_CLASSNAME_INDSUBSCRIPTION);
134    }
135    catch (CIMException e)
136    {
137       //
138       //  Some namespaces may not include the subscription class
139       //  In that case, just return no subscriptions
140       //  Any other exception is an error
141       //
142       if (e.getCode () != CIM_ERR_INVALID_CLASS)
143       {
144 	 _repository->read_unlock ();
145 	 PEG_METHOD_EXIT ();
146       }
147    }
148 
149    _repository->read_unlock ();
150 
151    PEG_METHOD_EXIT ();
152 }

Pegasus Array Template and Virtual Base Classes in Provider Heirarchy

I think this bug is pretty well understood among Pegasus developers. The problem is in the Array and the way it moves elements that are inserted or deleted from to or from an existing array. The problem only manifests itself when arrays store objects that have a virtual base class, such as Provider.

Fix - only store pointers to dynamically allocated objects in hashtables - when those objects have a virtual base.

The source of the problem is in lines 17 and 47.

Classes like Provider, which has a virtual base class, stores a pointer to that virtual base class within its object. When the Array moves the memory upon an insert or delete, it moves the class object but does not reinitialize the object's pointer to its virtual base. The object is then corrupt because its vtable has a bad pointer to the object's virtual base class.

The problem is related to indications because the IndicationService needs to load Indication providers when an indication is enabled. Typically this happens after Pegasus is up and running for a while and therefore the memmove is more likely to corrupt vtables as described above. The fix is below.

 1 // -*-c++-*-
 2 #ifndef PEGASUS_ARRAY_T
 3 template<class PEGASUS_ARRAY_T>
 4 #endif
 5 void Array<PEGASUS_ARRAY_T>::insert(Uint32 pos, 
 6 				    const PEGASUS_ARRAY_T* x, 
 7 				    Uint32 size)
 8 {
 9     if (pos > this->size())
10         ThrowOutOfBounds();
11 
12     reserve(this->size() + size);
13 
14     Uint32 n = this->size() - pos;
15 
16     if (n)
17         memmove(
18             _data() + pos + size, 
19 	    _data() + pos, 
20 	    sizeof(PEGASUS_ARRAY_T) * n);
21 
22     CopyToRaw(_data() + pos, x, size);
23     static_cast<ArrayRep<PEGASUS_ARRAY_T>*>(_rep)->size += size;
24 }
25 
26 #ifndef PEGASUS_ARRAY_T
27 template<class PEGASUS_ARRAY_T>
28 #endif
29 void Array<PEGASUS_ARRAY_T>::remove(Uint32 pos)
30 {
31     remove(pos, 1);
32 }
33 
34 #ifndef PEGASUS_ARRAY_T
35 template<class PEGASUS_ARRAY_T>
36 #endif
37 void Array<PEGASUS_ARRAY_T>::remove(Uint32 pos, Uint32 size)
38 {
39     if (pos + size - 1 > this->size())
40         ThrowOutOfBounds();
41 
42     Destroy(_data() + pos, size);
43 
44     Uint32 rem = this->size() - (pos + size);
45 
46     if (rem)
47         memmove(
48             _data() + pos, 
49 	    _data() + pos + size, 
50 	    sizeof(PEGASUS_ARRAY_T) * rem);
51 
52     static_cast<ArrayRep<PEGASUS_ARRAY_T>*>(_rep)->size -= size;
53 }

To fix the problem with Array and classes having a virtual base, you must store a pointer to the object instead of a reference. An example of the fix in the ProviderManager is in lines 10-11, and lines 33 and 45.

These lines cause pointer to Provider objects to be stored in the ProviderManager's hash table, instead of the objects themselves. This avoids the problem with vtable corruption but requires management of dynamically allocated objects.

 1 
 2 ProviderModule *module;
 3 if( false  == _modules.lookup(*(parms->fileName), module) )
 4 {
 5    PEG_TRACE_STRING(TRC_PROVIDERMANAGER, 
 6 		    Tracer::LEVEL4,
 7 		    "Creating Provider Module " + 
 8 		    *(parms->fileName) );
 9 	    
10    module = new ProviderModule(*(parms->fileName));
11    _modules.insert((*parms->fileName), module);
12 }
13 else 
14 {
15    PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
16 		    "Using Cached  Provider Module " + 
17 		    *(parms->fileName) );
18 }
19 	 
20 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
21 		 "Loading/Linking Provider Module " + 
22 		 *(parms->fileName) );
23 CIMBaseProvider *base = module->load(*(parms->providerName));
24 	    
25 // create provider module
26    
27 MessageQueue * queue = MessageQueue::lookup(
28    PEGASUS_QUEUENAME_PROVIDERMANAGER_CPP);
29 PEGASUS_ASSERT(queue != 0);
30 MessageQueueService * service = 
31    dynamic_cast<MessageQueueService *>(queue);
32 PEGASUS_ASSERT(service != 0);
33 pr = new Provider(*(parms->providerName), module, base);
34 if(0 == (pr->_cimom_handle =  new CIMOMHandle(service)))
35    throw NullPointer();
36 
37 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2, 
38 		 "Loading Provider " +  pr->_name);
39 	 
40 PEGASUS_STD(cout) << "Loading Provider " << 
41    pr->_name << PEGASUS_STD(endl);
42 	 
43 pr->initialize(*(pr->_cimom_handle));
44 gettimeofday(&(pr->_timeout), NULL);
45 _providers.insert(*(parms->providerName), pr);

Double Deletes in Redirected Message Pathways

There were a couple of double deletes of asyncOpNodes in message paths that were two or three steps removed from the shortest path. These were in code that had never been exersized.

Untested code

The IndicationService, the IndicationHandlerService, and the ProviderRegistrationManager each contained code paths that were untested. Really, they were blocked from running successfully due to elusive bugs (see above).

In the case of the ProviderRegistrationManager, most of that code was tested so I only needed to step through a few cases.

Most of the IndicationService and all of the IndicationHandler were untested to my knowledge and they each comprise a lot of code.

Indication Consumer Support Code

Chip Vincent defined the CIMIndicationConsumer interface during the definition of the original C++ provider manager. It is not used in the Open Group CVS right now, but I use it to support Indication handling.

 1 //%/////////////////////////////////////////////////////////////////////////////
 2 //
 3 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
 4 // The Open Group, Tivoli Systems
 5 //
 6 // Permission is hereby granted, free of charge, to any person obtaining a copy
 7 // of this software and associated documentation files (the "Software"), to
 8 // deal in the Software without restriction, including without limitation the
 9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Chip Vincent (cvincent@us.ibm.com)
25 //
26 // Modified By: Nitin Upasani, Hewlett-Packard Company (Nitin_Upasani@hp.com)
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29 
30 #ifndef Pegasus_CIMIndicationConsumer_h
31 #define Pegasus_CIMIndicationConsumer_h
32 
33 #include <Pegasus/Common/Config.h>
34 #include <Pegasus/Provider/CIMBaseProvider.h>
35 
36 #include <Pegasus/Common/CIMInstance.h>
37 
38 PEGASUS_NAMESPACE_BEGIN
39 
40 /**
41 This class defines the set of methods implemented by an indication consumer provider.
42 A providers that derives from this class must implement all methods. The minimal method
43 implementation simply throw the NotSupported exception.
44 */
45 class PEGASUS_PROVIDER_LINKAGE CIMIndicationConsumer : public virtual CIMBaseProvider
46 {
47 public:
48     CIMIndicationConsumer(void);
49     virtual ~CIMIndicationConsumer(void);
50 
51     /**
52     @param contex contains security and locale information relevant for the lifetime
53     of this operation.
54 
55     @param indication
56 
57     @param handler asynchronusly processes the results of this operation.
58 
59     @exception NotSupported
60     @exception InvalidParameter
61     */
62     virtual void handleIndication(
63 	const OperationContext & context,
64 	const CIMInstance & indication,
65 	ResponseHandler<void> & handler) = 0;
66 
67     // ATTN: The following method is only for testing purposes.
68     virtual void handleIndication(
69 	const OperationContext & context,
70 	const String & url,
71 	const CIMInstance& indicationInstance)
72     {
73     }
74 };
75 
76 PEGASUS_NAMESPACE_END
77 
78 #endif

Provider Schema Modifications

I derived the eServer_ProviderModule schema from the PG_ProviderModule schema that we are using. I only made a three additions. I added the new provider type; I added some new provider interface types (looking ahead); and I added the OperationalStatus attributes to the provider. This last is important because providers are now handling indications.

  1 //=================================================================
  2 // eServer_ProviderModule
  3 //
  4 // This schema is derived from PG_ProviderModule. It adds some 
  5 // qualifiers to support remote and out-of-proc providers.
  6 // It also defines some new provider types. 
  7 //
  8 //=================================================================
  9 
 10 // Named (meta) elements:
 11 // Class, Property, Method, Association, Reference, Property, Parameter, 
 12 // Indication, Schema, Trigger
 13 
 14 #pragma local ("en_US")
 15 
 16 Qualifier Remote : boolean = false , Scope(class, reference, property, method) ;
 17 Qualifier Out_of_Proc : boolean = false, Scope(class, reference, property, method);
 18 Qualifier Trusted : boolean = false, Scope(class, reference, property, method);
 19 Qualifier isURL : boolean = false, Scope(reference, property);
 20 
 21 //=================================================================
 22 // eServer_ProviderModule
 23 //=================================================================
 24 
 25 [Version("2.0.0."), Description("Derived from PG_ProviderModule "
 26 	"modified to support additional provider types and additional "
 27 	"provider qualifications. ")]
 28 class eServer_ProviderModule : PG_ProviderModule {
 29 	[Key, Override("Name"), Description("A human-readable name that uniquely "
 30 		"identifies the Provider Module, inherited from PG_ProviderModule.")]	
 31 	string Name;
 32 
 33 	[ Override("Location"), Description("The file path to the module. " 
 34 		" If the usURL qualifier is present, indicates that the "
 35 		"path is a URL. Otherwise the path is a string that represents "
 36 		" the location of the module in the local host's file system.") ]
 37 	string Location;
 38 
 39 	[ Override("InterfaceType"), Description ("The interface definition "
 40 	  	"supported by this module. Implies both a method signature and "
 41 		" a binary specification for linking to the module and executing its code. "
 42 		"In Pegasus 2.01 the InterfaceType also implies a specific "
 43 		"ProviderManagerService, although this link will be broken in the "
 44 		"future. "),
 45 		Values {"C++Default", "CMPI", "Java_Default", "Java_SNIA", 
 46 			"Java_Wbem_Services", "Perl", "Unix_Domain", 
 47 			"Named_Pipe", "WSDL_SOAP"}]
 48 	string InterfaceType;
 49 	
 50 	[ Description ("The host of the computer upon which this module resides. "
 51 	    	"For local providers this will be \"localhost\". For remote "
 52 		"providers this property should contain the DNS name of the "
 53 		"remote host. ") ]
 54 	string host;
 55 
 56 	[ Description ("The PGP Signature of the module's file. "
 57 		"The provider manager or any other module can use this "
 58 		" signature to determine the authenticity if the module " 
 59 		" and the integrity of the module's image. ")]
 60 	string pgp_Signature;
 61 
 62 };
 63 
 64 
 65 //=================================================================
 66 // eServer_Provider
 67 // 
 68 // Derived from the PG_Provider class. The operational status 
 69 // properties from PG_ProviderModule are present in the eServer_Provider
 70 // class. This is to recognize that providers may have operational 
 71 // states that are distinct from the states of their modules.
 72 //
 73 //=================================================================
 74 
 75 [ Version("2.0.0"), Description("eServer_Provider is derived from "
 76 	"PG_Provider. ") ]
 77 
 78 class eServer_Provider : PG_Provider {
 79 	[Key, Description("The scoping eServer_ProviderModule name. "
 80  		"Inherited as a propagated key from PG_Provider, having "
 81 		"the syntax PG_ProviderModule.name")]
 82 	string ProviderModuleName;
 83 	[Key, Override("Name"), Description("A human-readable name that:
 84 		"uniquely identifies the provider within the Provider Module." 
 85 		"Inherited from PG_Provider.")]
 86 	string Name;
 87       [Description ( 
 88         "   Indicates the current status(es) of the element. "
 89         "Various health and operational statuses are "
 90         "defined. Many of the enumeration's values are self-"
 91         "explanatory.  However, a few are not and are described "
 92         "in more detail. \"Stressed\" indicates that the element "
 93         "is functioning, but needs attention. Examples of "
 94         "\"Stressed\" states are overload, overheated, etc. "
 95         "\"Predictive Failure\" indicates that an element is "
 96         "functioning nominally but predicting a failure in the "
 97         "near future. \"In Service\" describes an element being "
 98         "configured, maintained, cleaned, or otherwise administered. "
 99         "\"No Contact\" indicates that the monitoring system "
100         "has knowledge of this element, but has never been able to "
101         "establish communications with it. \"Lost Communication\" "
102         "indicates that the ManagedSystemElement is known to exist "
103         "and has been contacted successfully in the past, but is "
104         "currently unreachable. \"Stopped\" indicates that the "	
105         "element is known to exist, is not operational (e.g., it "
106         "is unable to provide service to users), but it has not "
107         "failed. It has purposely been made non-operational. \n"
108         "  OperationalStatus replaces the Status property on "
109         "ManagedSystemElement to provide a consistent approach to "
110         "enumerations, to address implementation needs for an "
111         "array property, and to provide a migration path from today's "
112         "environment to the future. This change was not made earlier "
113         "since it required the DEPRECATED qualifier. Due to the "
114         "widespread use of the existing Status property in "
115         "management applications, it is strongly recommended that "
116         "providers/instrumentation provide BOTH the Status and "
117         "OperationalStatus properties. As always, Status (since it "
118         "is single-valued) provides the primary status of the "
119         "element."),
120        ValueMap {"0", "1", "2", "3", "4", "5", "6", "7", "8", 
121              "9", "10", "11", "12", "13"}, 
122        Values {"Unknown", "Other", "OK", "Degraded", "Stressed",
123              "Predictive Failure", "Error", "Non-Recoverable Error", 
124              "Starting", "Stopping", "Stopped", "In Service", 
125              "No Contact", "Lost Communication"}, 
126        ModelCorrespondence {
127         "CIM_ManagedSystemElement.OtherStatusDescription"} ] 
128     uint16 OperationalStatus[];
129       [Description (
130         "A string describing the status - used when the "
131         "OperationalStatus property is set to 1 (\"Other\")."), 
132        ModelCorrespondence {
133         "CIM_ManagedSystemElement.OperationalStatus"} ]
134     string OtherStatusDescription;
135 };
136 
137 
138 //=================================================================
139 // eServer_ProviderCapabilities
140 // 
141 // Derived from the PG_ProviderCapabilities class
142 //
143 //=================================================================
144 
145 	[Version ("2.0.0"), Description("An instance of ProviderCapabilities "
146 	"describes a set of abilities for a specific provider.") ]
147 class eServer_ProviderCapabilities : PG_ProviderCapabilities {
148 	[Key, Description("The scoping eServer_ProviderModuleName. "
149 	"Inherited as a Propagated key from PG_ProviderCapabilities, "
150 	"having the syntax PG_Provider.ProviderModuleName") ]
151    string ProviderModuleName;
152 	[Key, Description("The scoping eServer_Provider Name."
153 	"Inherited as a Propagated key from PG_ProviderCapabilities, "
154 	"having the syntax PG_Provider.Name") ]
155    string ProviderName;
156 
157 	[Key, Description("A value that uniquely identifies this "
158 		" set of abilities for the designated Providers.") ]
159    string CapabilityID;
160 
161 	[Override("ProviderType"), Description("ProviderType enumerates " 
162 		" the specific method types that the provider supports. "),
163 		ArrayType("Indexed"), 
164 		ValueMap { "2", "3", "4", "5", "6" },
165 		Values{ "Instance", "Association", "Indication", "Method", 
166 			"Indication_Consumer" } ]
167      uint16 ProviderType[];
168 };
169 	

Provider Registration Manager Modifications

After creating a new type of provider, I needed to modify the ProviderRegistrationManager to load, find, and dispatch the new provider type. This was pretty simple. I wrote some additional routines that are basically copies of what is already there.

Here is one of the new routines I wrote for the ProviderRegistrationManager. There are several others.

  1 Boolean ProviderRegistrationManager::lookupConsumerProvider(
  2     const String & nameSpace, 
  3     const String & className,
  4     CIMInstance & provider,
  5     CIMInstance & providerModule)
  6 {
  7     String providerName;
  8     String providerModuleName;
  9 
 10     PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
 11 		     "ProviderRegistrationManager::lookupConsumerProvider");
 12     
 13     ProviderRegistrationTable* providerCapability = 0;
 14     ProviderRegistrationTable* _provider= 0;
 15     ProviderRegistrationTable* _providerModule = 0;
 16     
 17     //
 18     // create the key by using nameSpace, className, and providerType
 19     //
 20     String capabilityKey = _generateKey(nameSpace, className, CONSUMER_PROVIDER);
 21     PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
 22 		     "\nnameSpace = " + nameSpace + "; className = " +
 23 			className +  "; capabilityKey = " + capabilityKey);
 24 
 25     try
 26     {
 27         // 
 28         // get provider capability instance from the table
 29         //
 30         if (!_registrationTable->table.lookup(
 31                   capabilityKey, providerCapability))
 32         {
 33             PEG_METHOD_EXIT();
 34             throw CIMException(CIM_ERR_FAILED, CAPABILITY_NOT_REGISTERED);
 35         }
 36 
 37         Array<CIMInstance> instances = providerCapability->getInstances();
 38 
 39         Uint32 pos = instances[0].findProperty(_PROPERTY_PROVIDERNAME);
 40 
 41         if (pos == PEG_NOT_FOUND)
 42         {
 43             PEG_METHOD_EXIT();
 44     	    throw CIMException(CIM_ERR_FAILED, 
 45 		"Missing ProviderName which is key in PG_ProviderCapabilities class.");
 46         }
 47 
 48         //
 49         // get provider name
 50         //
 51         instances[0].getProperty(pos).getValue().get(providerName);
 52 
 53         //
 54         // get provider module name
 55         //
 56         Uint32 pos2 = instances[0].findProperty(_PROPERTY_PROVIDERMODULENAME);
 57         if (pos2 == PEG_NOT_FOUND)
 58         {
 59 	   
 60             PEG_METHOD_EXIT();
 61     	    throw CIMException(CIM_ERR_FAILED, 
 62 		"Missing ProviderModuleName which is key in PG_ProviderCapabilities class.");
 63         }
 64 
 65         instances[0].getProperty(pos2).getValue().get(providerModuleName);
 66 
 67 	//
 68 	// create the key by using providerModuleName and providerName
 69 	//
 70 	String _providerKey = _generateKey(providerModuleName, providerName);
 71 
 72 	//
 73 	// create the key by using providerModuleName and MODULE_KEY
 74 	//
 75 	String _moduleKey = _generateKey(providerModuleName, MODULE_KEY);
 76 
 77         // 
 78         // get provider instance from the table 
 79         //
 80         if (!_registrationTable->table.lookup(_providerKey, _provider))
 81         {
 82             PEG_METHOD_EXIT();
 83 	    throw CIMException(CIM_ERR_FAILED, PROVIDER_NOT_FOUND);
 84         }
 85 
 86         Array<CIMInstance> providerInstances = _provider->getInstances();
 87         provider = providerInstances[0];
 88 
 89         // 
 90         // get provider module instance from the table 
 91         //
 92         if (!_registrationTable->table.lookup(_moduleKey, _providerModule))
 93         {
 94             PEG_METHOD_EXIT();
 95 	    throw CIMException(CIM_ERR_FAILED, MODULE_NOT_FOUND);
 96         }
 97 
 98         Array<CIMInstance> providerModuleInstances = _providerModule->getInstances();
 99         providerModule = providerModuleInstances[0];
100     
101     }
102     catch(CIMException & exception)
103     {
104 	Tracer::traceCIMException(TRC_PROVIDERMANAGER, Tracer::LEVEL4, exception);
105 	PEG_METHOD_EXIT();
106 	return (false);
107     }
108 
109     PEG_METHOD_EXIT();
110     return (true);
111 }

Instantiating the Modified Schema and Providers

This was also pretty simple. I piggybacked off the compilation and installation scripts that are already part of the project.

A couple of items to note in the MOF below. Line 47 shows a Type 6 provider, which is an Indication Consumer provider. Also, the Indication provider and the Indication Consumer provider are both part of the same module.

Here is the registration mof:

You can see that I simplified the PG_Subscription class in line 53 below. There are no policy attributes, no attribute list, and no filter.

To get things working, I assumed that every subscription includes all attributes every time it is fired. I assumed that the subscription was created with valid class references and that providers were registered correctly.

In the schema, however, I left the filter in the Subscription definition. I just didn't make it a required attribute. I think it is viable to want to test Indication delivery without having to have filters defined and working.

 1 instance of eServer_ProviderModule
 2 {
 3 	Name = "ProcessIndicationProvider";
 4 	Location = "ProcessIndicationProvider";
 5 	Vendor = "Pegasus Community";
 6 	Version = "2.0.0";
 7 	InterfaceType = "C++Default";
 8 	InterfaceVersion = "2.0.0";
 9 	host = "localhost";
10 	OperationalStatus = {2};
11 };
12 	
13 instance of eServer_Provider
14 {
15 	ProviderModuleName = "ProcessIndicationProvider";
16 	Name = "eServer_ProcessIndicationProvider";
17 	OperationalStatus = {2};
18 };
19 
20 instance of eServer_ProviderCapabilities
21 {
22 	ProviderModuleName = "ProcessIndicationProvider";
23 	ProviderName = "eServer_ProcessIndicationProvider";
24 	CapabilityID = "1";
25 	ClassName = "CIM_ProcessIndication";
26 	Namespaces = {"root/PG_Interop"};
27 	ProviderType = { 4 }; // indication
28 	SupportedProperties = NULL;
29 	SupportedMethods = NULL;
30 };
31 
32 
33 instance of eServer_Provider
34 {
35 	ProviderModuleName = "ProcessIndicationProvider";
36 	Name = "eServer_ProcessIndicationConsumer";
37 	OperationalStatus = {2};
38 };
39 
40 instance of eServer_ProviderCapabilities
41 {
42 	ProviderModuleName = "ProcessIndicationProvider";
43 	ProviderName = "eServer_ProcessIndicationConsumer";
44 	CapabilityID = "2";
45 	ClassName = "eServer_ProcessIndicationConsumer";
46 	Namespaces = {"root/PG_Interop"};
47 	ProviderType = { 6 }; // consumer
48 	SupportedProperties = NULL;
49 	SupportedMethods = NULL;
50 };
51 
52 
53 instance of eServer_IndicationSubscription
54 {
55 	Indication_Class  = "CIM_ProcessIndication";
56 	Handler = "eServer_ProcessIndicationConsumer";
57 };

Indication Provider

To test Indications, I modified the provider in $(PEGASUS_HOME)/src/Providers/sample/ProcessIndicationProvider. The biggest mod is the _monitor routine, which runs as an independent thread.

Independent Indication Thread

Most providers that generate indications will need to have an independent thread. I chose to start and stop the thread in the enableIndications and disableIndications methods, respectively.

 1 
 2 
 3 void ProcessIndicationProvider::enableIndications (
 4     ResponseHandler <CIMIndication> & handler)
 5 {
 6    cout << "eServer_ProcessIndicationProvider: enableIndications" << endl;
 7 
 8    _enable_disable.lock(pegasus_thread_self());
 9    if(_indications_enabled.value())
10    {
11       _enable_disable.unlock();
12       return;
13    }
14    _response_handler = &handler;
15    _response_handler->processing ();   
16    _indications_enabled = 1;
17    _indication_thread.run();
18    _enable_disable.unlock();
19    
20 }
21 
22 void ProcessIndicationProvider::disableIndications (void)
23 {
24    cout << "eServer_ProcessIndicationProvider: disableIndications" << endl;
25    _enable_disable.lock(pegasus_thread_self());
26    
27    if(_indications_enabled.value() == 0 )
28    {
29       _enable_disable.unlock();
30       return;
31    }
32 
33    _indications_enabled = 0;
34    _response_handler = 0;
35    _indication_thread.join();
36    _enable_disable.unlock();
37 
38 }
39       

Delivering the Indication

To deliver the indication, the _monitor thread calls deliver(CIMInstance) using its response handler.

The importance of allocating the response handler on the heap in the ProviderManagerService and preserving it for the life of the indication thread is demonstrated pretty clearly in the code below. It should be obvious that the response handler must not be destroyed after the subscription is enabled.

 1 
 2 PEGASUS_THREAD_RETURN 
 3 PEGASUS_THREAD_CDECL 
 4 ProcessIndicationProvider::_monitor(void *parm)
 5 {
 6    PEGASUS_ASSERT(parm != 0);
 7    
 8    Thread *th = static_cast<Thread *>(parm);
 9    ProcessIndicationProvider *myself = 
10       static_cast<ProcessIndicationProvider *>(th->get_parm());
11 
12    while(true)
13    {
14       pegasus_sleep(1000);
15       try
16       {
17 	 myself->_enable_disable.try_lock(pegasus_thread_self());
18       }
19       catch(...)
20       {
21 	 if(myself->_indications_enabled.value() == 0  || 
22              myself->_response_handler == 0 )
23 	    exit_thread((PEGASUS_THREAD_RETURN)0);
24 	 else
25 	    continue;
26       }
27       try
28       {
29 	 
30 	 CIMInstance indicationInstance ("root/PG_Interop:CIM_ProcessIndication");
31 	 
32 	 indicationInstance.addProperty
33 	    (CIMProperty ("IndicationTime", CIMValue (CIMDateTime ())));
34 	 
35 	 indicationInstance.addProperty
36 	    (CIMProperty ("IndicationIdentifier", "ProcessIndication01"));
37 	 
38 	 Array <String> correlatedIndications;
39 	 indicationInstance.addProperty (
40                  CIMProperty ("CorrelatedIndications", 
41 		 CIMValue (correlatedIndications)));
42 	 
43 	 CIMIndication cimIndication (indicationInstance);
44 	 
45 	 myself->_response_handler->deliver (cimIndication);
46 	 
47       }
48       catch(...)
49       {
50 	
51       }
52       myself->_enable_disable.unlock();
53    }
54    exit_thread((PEGASUS_THREAD_RETURN)0);
55    return(PEGASUS_THREAD_RETURN)0;
56 }

Indication Consumer Provider

Toe receive Indications, I added an additional provider to the module at $(PEGASUS_HOME)/src/Providers/sample/ProcessIndicationProvider. The following code in the PegasusCreateProvider symbol shows how the module creates the different types of providers.

 1 
 2 
 3 extern "C" PEGASUS_EXPORT CIMBaseProvider * PegasusCreateProvider
 4     (const String & providerName)
 5 {
 6    
 7     if ((String::equalNoCase (providerName, "ProcessIndicationProvider")) ||
 8         (String::equalNoCase (providerName, "ProcessIndicationProvider2")) || 
 9 	(String::equalNoCase (providerName, "eServer_ProcessIndicationProvider")))
10     {
11 	return (new ProcessIndicationProvider ());
12     }
13     if(String::equalNoCase(providerName, "eServer_ProcessIndicationConsumer"))
14     {
15        return new ProcessIndicationConsumer();
16     }
17 
18     return (0);
19 }

Handling the Indication

The code to handle the Indication in the Consumer Provider is straightforward. Note that the consumer clones the indication immediately.

 1 
 2 
 3 void ProcessIndicationConsumer::handleIndication(
 4       const OperationContext & context, 
 5       const CIMInstance & indication,
 6       ResponseHandler<void> & handler)
 7 {
 8    
 9    CIMInstance indication_copy = indication.clone();
10    PEGASUS_STD(cout) << 
11       "Indication Consumer: recieved indication of class: " << 
12       indication_copy.getClassName() << PEGASUS_STD(endl);
13     
14 }

Indication Processing Pathway

Now we can show the entire pathway of an Indication as it is generated, moved through Pegasus, and handled.

  1. When a subscription is created, the IndicationService sends a CIMEnableIndicationsRequestMessage to the ProviderManagerService.

  2. The ProviderManagerService creates a response handler from the heap and stores the handler in a hashtable. It then calls the provider's enableIndications method.

  3. The CIMIndicationProvider handles the call to enableIndications by creating an independent monitor thread and storing the response handler.

  4. The indication is created by the CIMIndicationProvider and passed to the response handler.

  5. The response handler sends a CIMProcessIndicationRequestMessage, which contains the indication, to the IndicationService.

  6. The IndicationService retrieves subscriptions that match the indication and creates a list of consumers.

  7. For each consumer in the list the IndicationService creates a CIMConsumeIndicationRequestMessage and sends that message to the consumer.

  8. When the CIMIndicationConsumer recieves the CIMConsumeIndicationRequestMessage message it clones the indication and processes it.

 1 
 2 virtual void deliver(const OperationContext & context, const CIMIndication & cimIndication)
 3 {
 4      // ATTN: temporarily convert indication to instance
 5      CIMInstance cimInstance(cimIndication);
 6 
 7      // create message
 8      CIMProcessIndicationRequestMessage * request =
 9          new CIMProcessIndicationRequestMessage(
10            _request_copy.messageId,
11            cimInstance.getPath().getNameSpace(),
12            cimInstance,
13            QueueIdStack(_target->getQueueId(), _source->getQueueId()));
14 
15      AsyncLegacyOperationStart * asyncRequest =
16          new AsyncLegacyOperationStart(
17          _source->get_next_xid(),
18          0,
19          _target->getQueueId(),
20          request,
21          _target->getQueueId());
22 
23     PEGASUS_ASSERT(asyncRequest != 0);
24 
25      _source->SendForget(asyncRequest);
26 }
 1 
 2 void eServerIndicationService::_handleProcessIndicationRequest(
 3       const Message *message)
 4 {
 5    PEG_METHOD_ENTER (TRC_INDICATION_SERVICE,
 6 		     "eServerIndicationService::_handleProcessIndicationRequest");
 7 
 8    CIMProcessIndicationRequestMessage* request =
 9       (CIMProcessIndicationRequestMessage*) message;
10    
11    CIMException cimException;
12 
13    CIMProcessIndicationResponseMessage* response =
14       new CIMProcessIndicationResponseMessage(
15 	 request->messageId,
16 	 cimException,
17 	 request->queueIds.copyAndPop());
18    try 
19    {
20       /* get each name space */
21       Array<String> nameSpaceNames;
22       _getNameSpaceNames(nameSpaceNames);
23 
24       Array<CIMInstance> subscriptions;
25       CIMInstance indication = request->indicationInstance.clone();
26 
27       _getMatchingSubscriptions(indication, subscriptions);
28 
29       // keep this super simple
30       // always deliver all properties of the indication class to the consumer
31       // every subscription has one indication class and one consumer
32       // the consumer is responsible for filtering and multiplexing
33 
34       Array<CIMObjectPath> absent_consumers;
35 
36       _deliverIndication(subscriptions, 
37 			 indication,
38 			 absent_consumers);
39    
40    }
41    catch(CIMException & e)
42    {
43       response->cimException = e;
44    }
45    catch(Exception & e)
46    {
47       response->cimException = PEGASUS_CIM_EXCEPTION(CIM_ERR_FAILED, 
48 						     e.getMessage());
49    }
50    catch(...)
51    {
52    response->cimException = PEGASUS_CIM_EXCEPTION(CIM_ERR_FAILED, 
53 						  "Internal Error");
54    }
55    _enqueueResponse(request, response);
56 
57    PEG_METHOD_EXIT();
58 }
  1 
  2 // will return true if ONE subscription is successful. 
  3 // e.g., if one out of three subscriptions results in a 
  4 // successful delivery, will return true. if zero are 
  5 // successful, returns false. 
  6 // consumers that are absent; i.e., not found by the provider registration 
  7 // manager are returned in the absent_consumer
  8 Boolean  eServerIndicationService::_deliverIndication(
  9    const Array<CIMInstance> & subscriptions, 
 10    const CIMInstance & indication,
 11    Array<CIMObjectPath> & absent_consumers)
 12 {
 13    
 14    PEG_METHOD_ENTER (TRC_INDICATION_SERVICE, 
 15       "eServerIndicationService::_deliverIndication");
 16    CIMObjectPath consumer;
 17    CIMInstance provider, providerModule;   
 18    Boolean ccode = false;
 19    try 
 20    {
 21       for(Uint8 i = 0 ; i < subscriptions.size(); ++i, consumer.clear())
 22       {
 23 	 if(true == _get_object_path_from_association("Handler", 
 24 						      subscriptions[i],
 25 						      consumer))
 26 	 {
 27 
 28 	    // get the name space 
 29 	    if(true == _providerRegManager->lookupConsumerProvider(
 30 		  subscriptions[i].getPath().getNameSpace(),
 31 		  consumer.getClassName(),
 32 		  provider, 
 33 		  providerModule))
 34 	    {
 35 	       
 36 	       CIMConsumeIndicationRequestMessage *request = 
 37 		  new CIMConsumeIndicationRequestMessage(
 38 		     XmlWriter::getNextMessageId (),
 39 		     indication.getPath().getNameSpace(), 
 40 		     indication, 
 41 		     provider, 
 42 		     providerModule, 
 43 		     QueueIdStack(_providerManager, _queueId));
 44 	    
 45 	       AsyncOpNode *op = get_op();
 46 	    
 47 	       AsyncLegacyOperationStart *async_req  = 
 48 		  new AsyncLegacyOperationStart(
 49 		     get_next_xid(), 
 50 		     op, 
 51 		     _providerManager, 
 52 		     request, 
 53 		     _queueId);
 54 	       try 
 55 	       {
 56 		  if( false == SendForget(async_req) )
 57 		  {
 58 		     async_req = static_cast<AsyncLegacyOperationStart *>(op->get_request());
 59 		     if(async_req)
 60 			delete async_req->get_action();
 61 		     delete async_req;
 62 		     delete op;
 63 		     // the consumer may not truly be absent, but we couldn't get 
 64 		     // the message to him. Maybe the service can do something 
 65 		     // with this information. 
 66 		     absent_consumers.append(consumer);
 67 		  }
 68 		  else 
 69 		  {
 70 		     ccode = true;
 71 		  }
 72 	       }
 73 	       catch(...)
 74 	       {
 75 
 76 		  async_req = static_cast<AsyncLegacyOperationStart *>(op->get_request());
 77 		  if(async_req)
 78 		     delete async_req->get_action();
 79 		  delete async_req;
 80 		  delete op;
 81 		  // the consumer may not truly be absent, but we couldn't get 
 82 		  // the message to him. Maybe the service can do something 
 83 		  // with this information. 
 84 		  absent_consumers.append(consumer);
 85 	       }
 86 	    } // handler is registered as a consumer provider 
 87 	    else 
 88 	    {
 89 	       absent_consumers.append(consumer);
 90 	    }
 91 	 
 92 	 } // found the handler path 
 93       } // for each subscription
 94    } // try 
 95    catch(...)
 96    {
 97       throw;
 98    }
 99    
100    // each subscription contains a reference to a provider 
101    // that can handle the subscription. extract the handler
102    // instance from the reference 
103    PEG_METHOD_EXIT();
104    return ccode;
105 }

Michael Day (work)
Michael Day (home)
Last modified: Thu Feb 13 11:46:43 EST 2003