PEP #: 5 |
|
TITLE: Proposal to Fix Indications in Pegasus |
This document details my experiences getting CIM Indications to work with the Pegasus CIMOM. It explains the changes I had to make, the bugs I had to fix, and then makes some design recommendations for integrating Indication support into the Open Group CVS tree.
I have Pegasus supporting Indications, subscriptions, and handlers. It works really well. All the CIM operations related to indications and subscriptions work. I have tests and test providers.
Commit the changes outlined in this document to the development (main) branch of the TOG CVS for Pegasus for inclusion in Pegasus release 2.2
Early feedback on this document has requested clarification of some terms I use later.
OperationResponseHandler
and
EnableIndicationsResponseHandler
CIMIndication
for
handling. This proposal defines a new type of provider that
supports the CIMIndicationConsumer
interface
as defined by Chip Vincent. The CIMIndicationConsumer
replaces the IndicationHandler
that is currently
in the source tree. However, the IndicationHandler
can be
adjacent (post) to the CIMIndicationConsumer
in the
indication processing path if required for compatibilty.CIMIndication
instances
to other Pegasus instances. I do not refer to the Export Server in this
proposal. Its use is compatible with this proposal but optional.CIMIndication
instances exported by other
Pegasus instances. I do not refer to the Export Client in this
proposal. Its use is compatible with this proposal but
optional.There is a ton of rich functionality in the existing Pegasus code base for handling Indications and Subscriptions. The problem is, none of it has been tested due to gating bugs in the Pegasus core.
All of the subscription options, policies, and error handling is
fantastic code but causes real problems when all you want to do
is get the underlying mechanisms to work. In fact, the
"commercial grade" features of the
IndicationService
tend to obfuscate the underlying
breaks in the Pegasus core that are gating Indication
support. So I embarked on a simplification strategy.
Basic Indication Handling should be easy, and robust
Indication Handling should be possible. (Yes, I stole this
quote from Larry Wall.) To this end I removed some of the
functionality from the IndicationService
, such as
enforcing the schema and implementing policy.
I wrote a basic IndicationService
and
ConsumerProvider
that stresses the core of
Pegasus and assures the functionality of Indications and
Subscriptions.
If I need error handling policy and schema enforcement, I should be able to create or reactivate the more robust HP Indication Service and substitute it for the simple one. Or I can graft features from the HP service to the simple one. But I shouldn't have to use the untested complex features to debug the inner workings of Pegasus.
A little bit of everything. This is an area of Pegasus that has never been completed nore tested. The table below shows the steps I had to take, in order, to get this work completed.
wbem-exec
tests
in the test/wetest/cimv2/subscription
directory.ProviderRegistrationManager
to allow
for the new "type 6" provider (see above).CIMConsumeIndication
Message (see
item 3 above).
IndicationService
to redirect
Indications to the ProviderManager
(see items 3-5
and above).
ProviderManager
to treat
CIMConsumerProvider
modules differently because
they are asynchronous by definition.CIMIndicationProvider
and a
CIMIndicationConsumer
and start testing.IBM has, out of necessity, maintained a version of Pegasus that has the original C++ provider interfaces. We did this to support existing providers that we shipped prior to the interface changes this past fall. I did the work necessary to support indications on IBM's "classic" snapshot of the source repository. That's where the source code is. Most of the changes were not related to provider interfaces so merging indication support into the Open Group CVS is feasible.
Consider this document as a design proposal for indication support; and also as a proposal for checking the source code into the Open Group CVS.
I also anticipate making this available as a snapshot on an ftp server.
Indications in the Open Group Pegasus 2.x do not work and are not close to working. There are bugs that need to be fixed and design changes that need to be made. They are fairly completely discussed in this document.
Chip Vincent and I have long preferred to use Pegasus providers
to handle indications. Doing so removes the need for a separate
IndicationHandler
service. Instead, the handler can
reuse all the infrastructure that exists for Providers.
Another benefit of using Pegasus providers to handle indications is that developers do not need to learn another service interface. The provider interface is sufficient. Also, this means that we (Pegasus developers) do not need to provide separate developer support for handling indications and generating indications.
Once a CIMIndicationProvider
routine generates a
CIMIndication
, the modified
OperationResponseHandler
creates a
CIMConsumeIndication
message and passes the
indication to the ProviderManagerService
; the
provider manager then finds the "handler," which is really a
provider, and calls provider.handleIndication()
.
There is no need for a separate handler service, which makes error handling and debugging simpler.
This was not as simple as activiating the
CIMIndicationConsumer
code that has been in the
source tree for quite some time. I had to ensure that this new
type of provider was supported throughout Pegasus.
The ProviderManagerService
was creating an indication
response handler within a local code block and passing that to
the Indication Provider with an EnableIndication
message. After the ProviderManagerService returned from enabling
the indication, response handler went out of scope and was
destroyed. When the Indication Provider called
handler.deliver(indication)
, it was using a
reference to an object that was destroyed. The result was a
core dump.
The important lines for enabling a new subscription are
32 and
58.
The important line for disabling an existing subscription is
136.
1 void ProviderManagerService::handleEnableIndicationsRequest( 2 AsyncOpNode *op, const Message * message) throw() 3 { 4 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, 5 "ProviderManagerService:: handleEnableIndicationsRequest"); 6 CIMEnableIndicationsRequestMessage * request = 7 dynamic_cast<CIMEnableIndicationsRequestMessage *>( 8 const_cast<Message *>(message)); 9 10 AsyncRequest *async = 11 static_cast<AsyncRequest *>(op->_request.next(0)); 12 13 PEGASUS_ASSERT(request != 0 && async != 0 ); 14 15 CIMEnableIndicationsResponseMessage * response = 16 new CIMEnableIndicationsResponseMessage( 17 request->messageId, 18 CIMException(), 19 request->queueIds.copyAndPop()); 20 21 PEGASUS_ASSERT(response != 0); 22 23 // preserve message key 24 response->setKey(request->getKey()); 25 26 response->dest = request->queueIds.top(); 27 28 // Allocate the response handler from the heap using new (). 29 // Otherwise it will go out of scope and the indication, which 30 // happens later, will go off into outer space... 31 32 EnableIndicationsResponseHandler *handler = 33 new EnableIndicationsResponseHandler(request, response, this); 34 35 try 36 { 37 // get the provider file name and logical name 38 Triad<String, String, String> triad = 39 _getProviderRegPair(request->provider, request->providerModule); 40 41 // get cached or load new provider module 42 Provider provider = 43 providerManager.getProvider(triad.first, triad.second, triad.third); 44 45 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 46 "Calling provider.enableIndications: " + 47 provider.getName()); 48 49 provider.enableIndications(*handler); 50 51 // if no exception, store the handler so it is persistent for as 52 // long as the provider has indications enabled. 53 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 54 "Storing indication handler for " + provider.getName()); 55 56 57 // keep indication handlers in a hash table keyed by the provider. 58 _insertEntry(provider, handler); 59 } 60 catch(CIMException & e) 61 { 62 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 63 "Exception: " + e.getMessage()); 64 response->cimException = CIMException(e); 65 } 66 catch(Exception & e) 67 { 68 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 69 "Exception: " + e.getMessage()); 70 response->cimException = CIMException(CIM_ERR_FAILED, "Internal Error"); 71 } 72 catch(...) 73 { 74 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 75 "Exception: Unknown"); 76 response->cimException = CIMException(CIM_ERR_FAILED, "Unknown Error"); 77 } 78 79 AsyncLegacyOperationResult *async_result = 80 new AsyncLegacyOperationResult( 81 async->getKey(), 82 async->getRouting(), 83 op, 84 response); 85 86 _complete_op_node(op, ASYNC_OPSTATE_COMPLETE, 0, 0); 87 PEG_METHOD_EXIT(); 88 } 89 90 void ProviderManagerService::handleDisableIndicationsRequest( 91 AsyncOpNode *op, const Message * message) throw() 92 { 93 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, 94 "ProviderManagerService::handleDisableIndicationsRequest"); 95 CIMDisableIndicationsRequestMessage * request = 96 dynamic_cast<CIMDisableIndicationsRequestMessage *>( 97 const_cast<Message *>(message)); 98 99 AsyncRequest *async = static_cast<AsyncRequest *>(op->_request.next(0)); 100 101 PEGASUS_ASSERT(request != 0 && async != 0 ); 102 103 CIMDisableIndicationsResponseMessage * response = 104 new CIMDisableIndicationsResponseMessage( 105 request->messageId, 106 CIMException(), 107 request->queueIds.copyAndPop()); 108 109 PEGASUS_ASSERT(response != 0); 110 111 // preserve message key 112 response->setKey(request->getKey()); 113 114 OperationResponseHandler<CIMIndication> handler(request, response); 115 try 116 { 117 // get the provider file name and logical name 118 Triad<String, String, String> triad = 119 _getProviderRegPair(request->provider, request->providerModule); 120 121 // get cached or load new provider module 122 Provider provider = 123 providerManager.getProvider(triad.first, triad.second, triad.third); 124 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 125 "Calling provider.disableIndications: " + 126 provider.getName()); 127 128 provider.disableIndications(); 129 130 // Now that indications are disabled we can extract and remove the 131 // indication response handler 132 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 133 "Removing and Destroying indication handler for " + 134 provider.getName()); 135 136 delete _removeEntry(_generateKey(provider)); 137 } 138 139 catch(CIMException & e) 140 { 141 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 142 "Exception: " + e.getMessage()); 143 response->cimException = CIMException(e); 144 } 145 catch(Exception & e) 146 { 147 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 148 "Exception: " + e.getMessage()); 149 response->cimException = CIMException(CIM_ERR_FAILED, "Internal Error"); 150 } 151 catch(...) 152 { 153 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 154 "Exception: Unknown"); 155 response->cimException = CIMException(CIM_ERR_FAILED, "Unknown Error"); 156 } 157 158 AsyncLegacyOperationResult *async_result = 159 new AsyncLegacyOperationResult( 160 async->getKey(), 161 async->getRouting(), 162 op, 163 response); 164 165 _complete_op_node(op, ASYNC_OPSTATE_COMPLETE, 0, 0); 166 PEG_METHOD_EXIT(); 167 }
The OperationResponseHandler
and derivative classes
are constructed with a reference to CIMRequest
and
CIMResponse
messages (i.e., the request/response
pair related to the operation). These references were stored and
used during the processing of responses. These references
pointed to message that were deleted when the thread of
execution left the dispatcher. For most CIM operations this is
not a problem. However, in the case of indications the responses
occur asynchronously. This caused the message references to
point to deleted memory. Even this did not manifest a problem until the
response handler was destroyed, which occured when the
DisableIndications
message made its way to the
ProviderManagerService
.
The lines that implement the fix are 14 and 15 and 102 and 103.
This is a fix that, in my opinion, should be generally applied to all response handlers.
1 ////////////////////-*-c++-*-/////////////////////////////////////////// 2 class EnableIndicationsResponseHandler : 3 public OperationResponseHandler<CIMIndication> 4 { 5 public: 6 EnableIndicationsResponseHandler( 7 CIMEnableIndicationsRequestMessage * request, 8 CIMEnableIndicationsResponseMessage * response, 9 MessageQueueService * source, 10 MessageQueueService * target = 0) 11 : OperationResponseHandler<CIMIndication>(request, response), 12 _source(source), 13 _target(target), 14 _request_copy(*request), 15 _response_copy(*response) 16 { 17 PEGASUS_ASSERT(_source != 0); 18 19 // get indication service 20 if(_target == 0) 21 { 22 Array<Uint32> serviceIds; 23 24 _source->find_services( 25 PEGASUS_QUEUENAME_ESERVER_INDICATIONSERVICE, 26 0, 0, &serviceIds); 27 28 PEGASUS_ASSERT(serviceIds.size() != 0); 29 30 _target = dynamic_cast<MessageQueueService *>( 31 MessageQueue::lookup(serviceIds[0])); 32 33 PEGASUS_ASSERT(_target != 0); 34 } 35 } 36 37 virtual void deliver(const CIMIndication & cimIndication) 38 { 39 OperationContext context; 40 41 deliver(context, cimIndication); 42 } 43 44 virtual void deliver(const OperationContext & context, 45 const CIMIndication & cimIndication) 46 { 47 // ATTN: temporarily convert indication to instance 48 CIMInstance cimInstance(cimIndication); 49 50 // create message 51 CIMProcessIndicationRequestMessage * request = 52 new CIMProcessIndicationRequestMessage( 53 _request_copy.messageId, 54 cimInstance.getPath().getNameSpace(), 55 cimInstance, 56 QueueIdStack(_target->getQueueId(), _source->getQueueId())); 57 58 // send message 59 // <<< Wed Apr 10 21:04:00 2002 mdd >>> 60 // AsyncOpNode * op = _source->get_op(); 61 62 AsyncLegacyOperationStart * asyncRequest = 63 new AsyncLegacyOperationStart( 64 _source->get_next_xid(), 65 0, 66 _target->getQueueId(), 67 request, 68 _target->getQueueId()); 69 70 PEGASUS_ASSERT(asyncRequest != 0); 71 72 //AsyncReply * asyncReply = _source->SendWait(asyncRequest); 73 // <<< Wed Apr 10 21:04:50 2002 mdd >>> 74 _source->SendForget(asyncRequest); 75 //PEGASUS_ASSERT(asyncReply != 0); 76 77 // Chip - receiver of the request should delete it 78 //delete asyncRequest; 79 // <<< Wed Apr 10 21:05:10 2002 mdd >>> 80 } 81 82 virtual void deliver(const Array<CIMIndication> & cimIndications) 83 { 84 OperationContext context; 85 86 deliver(context, cimIndications); 87 } 88 89 virtual void deliver(const OperationContext & context, 90 const Array<CIMIndication> & cimIndications) 91 { 92 for(Uint32 i = 0, n = cimIndications.size(); i < n; i++) 93 { 94 deliver(context, cimIndications[i]); 95 } 96 } 97 98 protected: 99 MessageQueueService * _source; 100 MessageQueueService * _target; 101 private: 102 CIMEnableIndicationsRequestMessage _request_copy; 103 CIMEnableIndicationsResponseMessage _response_copy; 104 };
References stored in subscriptions and providers were sometimes
being handled incorrectly in the IndicationService
and the ProviderRegistrationManager
, at least when
the relationship between class names, class properties, and
reference properties was concerned. Reference handling in
general could use a library of helper routines. It is too error
prone right now to extract a reference and turn it into a
CIMInstance
.
As I was reviewing the IndicationService
I noticed
that a lot of the "dereferencing" of references was done in
fairly long code segments that repeated themselves several times
with only slight differences. I decided to distill that code
into some more compact helper routines. I started on this but
haven't finished yet.
I started working on some helper routines in my re-written
IndicationService
. Distilling a few routines into
lower-level helpers allowed my to jettison a lot of code. As
far as I could tell the code in the original Indication
Service that I replaced should work just fine but I couldn't
get it to. I felt that distilling the code would give me an
edge in fixing underlying Pegasus bugs. I still think the bugs
were elsewhere (not in the IndicationService
)
but I can't prove it.
In general I think that Pegasus could benefit greatly from a library of similar, more general, static helper routines to deal with name spaces and the conversion between references and the instances they refer to.
I also did some similar rewrites in the
ProviderRegistrationManager
.
1 2 //%///////////////////////////////////////////////////////////////// 3 // Take an association class and a name of one or the references. 4 // Return a CIMObjectPath that contains the instance of the referred 5 // to object. 6 /////////////////////////////////////////////////////////////////// 7 Boolean eServerIndicationService::_get_object_path_from_association( 8 const String & reference_name, 9 const CIMInstance & association, 10 CIMObjectPath & path) const 11 { 12 13 path.clear(); 14 try 15 { 16 CIMValue path_value = association.getProperty( 17 association.findProperty(reference_name)).getValue(); 18 path_value.get(path); 19 } 20 catch(...) 21 { 22 return false; 23 } 24 25 return true; 26 } 27 28 //%///////////////////////////////////////////////////////////////// 29 // Get all the name spaces in the repository. 30 // This should really be a static method that any class should be 31 // able to call 32 /////////////////////////////////////////////////////////////////// 33 void eServerIndicationService::_getNameSpaceNames ( 34 Array<String> & nameSpaceNames) const 35 { 36 37 PEG_METHOD_ENTER (TRC_INDICATION_SERVICE, 38 "eServerIndicationService::__getNameSpaceNames"); 39 nameSpaceNames.clear(); 40 41 _repository->read_lock (); 42 43 try 44 { 45 nameSpaceNames = _repository->enumerateNameSpaces (); 46 } 47 catch (Exception &) 48 { 49 _repository->read_unlock (); 50 PEG_METHOD_EXIT (); 51 } 52 53 _repository->read_unlock (); 54 55 PEG_METHOD_EXIT (); 56 } 57 58 //%///////////////////////////////////////////////////////////////// 59 // Get all subscriptions that refer to a specific indication class 60 // returns subscriptions in all name spaces. Calls lower-level 61 // _getMatchingSubscriptions. 62 /////////////////////////////////////////////////////////////////// 63 void eServerIndicationService::_getMatchingSubscriptions( 64 const CIMInstance & indication, 65 Array<CIMInstance> & subscriptions) const 66 { 67 Array<String> nameSpaceNames; 68 _getNameSpaceNames(nameSpaceNames); 69 70 for(Uint8 i = 0; i < nameSpaceNames.size() ; ++i) 71 { 72 _getMatchingSubscriptions( nameSpaceNames[i], 73 indication, 74 subscriptions); 75 } 76 } 77 78 79 //%///////////////////////////////////////////////////////////////// 80 // Get all subscriptions that refer to a specific indication class 81 // for a specific name space. 82 /////////////////////////////////////////////////////////////////// 83 void eServerIndicationService::_getMatchingSubscriptions( 84 const String & nameSpaceName, 85 const CIMInstance & indication, 86 Array<CIMInstance> & subscriptions) const 87 { 88 Array<CIMInstance> all_subscriptions; 89 _getSubscriptions(nameSpaceName, all_subscriptions); 90 91 String indication_class_name = indication.getClassName(); 92 CIMObjectPath compare_path; 93 94 95 for(Uint8 x = 0; 96 x < all_subscriptions.size() ; 97 ++x, compare_path.clear()) 98 { 99 _get_object_path_from_association("Indication_Class", 100 all_subscriptions[x], 101 compare_path); 102 if(compare_path.getClassName() == indication_class_name) 103 { 104 CIMObjectPath temp = all_subscriptions[x].getPath(); 105 temp.setNameSpace(nameSpaceName); 106 all_subscriptions[x].setPath(temp); 107 subscriptions.append(all_subscriptions[x]); 108 } 109 } 110 return; 111 } 112 113 //%///////////////////////////////////////////////////////////////// 114 // Get all subscriptions in the repository for a specific name space 115 /////////////////////////////////////////////////////////////////// 116 void eServerIndicationService::_getSubscriptions ( 117 const String & nameSpaceName, 118 Array <CIMInstance> & subscriptions) const 119 { 120 121 122 PEG_METHOD_ENTER (TRC_INDICATION_SERVICE, 123 "eServerIndicationService::_getSubscriptions"); 124 125 // 126 // Get existing subscriptions in current namespace 127 // 128 _repository->read_lock (); 129 130 try 131 { 132 subscriptions = _repository->enumerateInstances 133 (nameSpaceName, ESERVER_CLASSNAME_INDSUBSCRIPTION); 134 } 135 catch (CIMException e) 136 { 137 // 138 // Some namespaces may not include the subscription class 139 // In that case, just return no subscriptions 140 // Any other exception is an error 141 // 142 if (e.getCode () != CIM_ERR_INVALID_CLASS) 143 { 144 _repository->read_unlock (); 145 PEG_METHOD_EXIT (); 146 } 147 } 148 149 _repository->read_unlock (); 150 151 PEG_METHOD_EXIT (); 152 }
I think this bug is pretty well understood among Pegasus
developers. The problem is in the Array
and the way
it moves elements that are inserted or deleted from to or from
an existing array. The problem only manifests itself when arrays
store objects that have a virtual base class, such as
Provider
.
The source of the problem is in lines 17 and 47.
Classes like Provider
, which has a virtual base
class, stores a pointer to that virtual base class within
its object. When the Array
moves the memory
upon an insert or delete, it moves the class object but does
not reinitialize the object's pointer to its virtual
base. The object is then corrupt because its vtable has a
bad pointer to the object's virtual base class.
The problem is related to indications because the
IndicationService
needs to load Indication
providers when an indication is enabled. Typically this
happens after Pegasus is up and running for a while and
therefore the memmove
is more likely to corrupt
vtables as described above. The fix
is below.
1 // -*-c++-*- 2 #ifndef PEGASUS_ARRAY_T 3 template<class PEGASUS_ARRAY_T> 4 #endif 5 void Array<PEGASUS_ARRAY_T>::insert(Uint32 pos, 6 const PEGASUS_ARRAY_T* x, 7 Uint32 size) 8 { 9 if (pos > this->size()) 10 ThrowOutOfBounds(); 11 12 reserve(this->size() + size); 13 14 Uint32 n = this->size() - pos; 15 16 if (n) 17 memmove( 18 _data() + pos + size, 19 _data() + pos, 20 sizeof(PEGASUS_ARRAY_T) * n); 21 22 CopyToRaw(_data() + pos, x, size); 23 static_cast<ArrayRep<PEGASUS_ARRAY_T>*>(_rep)->size += size; 24 } 25 26 #ifndef PEGASUS_ARRAY_T 27 template<class PEGASUS_ARRAY_T> 28 #endif 29 void Array<PEGASUS_ARRAY_T>::remove(Uint32 pos) 30 { 31 remove(pos, 1); 32 } 33 34 #ifndef PEGASUS_ARRAY_T 35 template<class PEGASUS_ARRAY_T> 36 #endif 37 void Array<PEGASUS_ARRAY_T>::remove(Uint32 pos, Uint32 size) 38 { 39 if (pos + size - 1 > this->size()) 40 ThrowOutOfBounds(); 41 42 Destroy(_data() + pos, size); 43 44 Uint32 rem = this->size() - (pos + size); 45 46 if (rem) 47 memmove( 48 _data() + pos, 49 _data() + pos + size, 50 sizeof(PEGASUS_ARRAY_T) * rem); 51 52 static_cast<ArrayRep<PEGASUS_ARRAY_T>*>(_rep)->size -= size; 53 }
To fix the problem with
Array
and classes having a virtual base, you must
store a pointer to the object instead of a reference. An example
of the fix in the ProviderManager
is in lines 10-11, and lines 33 and 45.
These lines cause pointer to Provider
objects
to be stored in the ProviderManager
's hash
table, instead of the objects themselves. This avoids the
problem with vtable corruption but requires management of
dynamically allocated objects.
1 2 ProviderModule *module; 3 if( false == _modules.lookup(*(parms->fileName), module) ) 4 { 5 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, 6 Tracer::LEVEL4, 7 "Creating Provider Module " + 8 *(parms->fileName) ); 9 10 module = new ProviderModule(*(parms->fileName)); 11 _modules.insert((*parms->fileName), module); 12 } 13 else 14 { 15 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 16 "Using Cached Provider Module " + 17 *(parms->fileName) ); 18 } 19 20 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 21 "Loading/Linking Provider Module " + 22 *(parms->fileName) ); 23 CIMBaseProvider *base = module->load(*(parms->providerName)); 24 25 // create provider module 26 27 MessageQueue * queue = MessageQueue::lookup( 28 PEGASUS_QUEUENAME_PROVIDERMANAGER_CPP); 29 PEGASUS_ASSERT(queue != 0); 30 MessageQueueService * service = 31 dynamic_cast<MessageQueueService *>(queue); 32 PEGASUS_ASSERT(service != 0); 33 pr = new Provider(*(parms->providerName), module, base); 34 if(0 == (pr->_cimom_handle = new CIMOMHandle(service))) 35 throw NullPointer(); 36 37 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2, 38 "Loading Provider " + pr->_name); 39 40 PEGASUS_STD(cout) << "Loading Provider " << 41 pr->_name << PEGASUS_STD(endl); 42 43 pr->initialize(*(pr->_cimom_handle)); 44 gettimeofday(&(pr->_timeout), NULL); 45 _providers.insert(*(parms->providerName), pr);
There were a couple of double deletes of
asyncOpNodes
in message paths that were two or
three steps removed from the shortest path. These were in
code that had never been exersized.
The IndicationService
, the
IndicationHandlerService
,
and the ProviderRegistrationManager
each contained
code paths that were untested. Really, they were blocked from
running successfully due to elusive bugs (see above).
In the case of the ProviderRegistrationManager
, most of
that code was tested so I only needed to step through a few
cases.
Most of the IndicationService
and all of
the IndicationHandler
were untested to my knowledge
and they each comprise a lot of code.
Chip Vincent defined the CIMIndicationConsumer
interface during the definition of the original C++ provider
manager. It is not used in the Open Group CVS right now, but I
use it to support Indication handling.
1 //%///////////////////////////////////////////////////////////////////////////// 2 // 3 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM, 4 // The Open Group, Tivoli Systems 5 // 6 // Permission is hereby granted, free of charge, to any person obtaining a copy 7 // of this software and associated documentation files (the "Software"), to 8 // deal in the Software without restriction, including without limitation the 9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 // sell copies of the Software, and to permit persons to whom the Software is 11 // furnished to do so, subject to the following conditions: 12 // 13 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN 14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED 15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 21 // 22 //============================================================================== 23 // 24 // Author: Chip Vincent (cvincent@us.ibm.com) 25 // 26 // Modified By: Nitin Upasani, Hewlett-Packard Company (Nitin_Upasani@hp.com) 27 // 28 //%///////////////////////////////////////////////////////////////////////////// 29 30 #ifndef Pegasus_CIMIndicationConsumer_h 31 #define Pegasus_CIMIndicationConsumer_h 32 33 #include <Pegasus/Common/Config.h> 34 #include <Pegasus/Provider/CIMBaseProvider.h> 35 36 #include <Pegasus/Common/CIMInstance.h> 37 38 PEGASUS_NAMESPACE_BEGIN 39 40 /** 41 This class defines the set of methods implemented by an indication consumer provider. 42 A providers that derives from this class must implement all methods. The minimal method 43 implementation simply throw the NotSupported exception. 44 */ 45 class PEGASUS_PROVIDER_LINKAGE CIMIndicationConsumer : public virtual CIMBaseProvider 46 { 47 public: 48 CIMIndicationConsumer(void); 49 virtual ~CIMIndicationConsumer(void); 50 51 /** 52 @param contex contains security and locale information relevant for the lifetime 53 of this operation. 54 55 @param indication 56 57 @param handler asynchronusly processes the results of this operation. 58 59 @exception NotSupported 60 @exception InvalidParameter 61 */ 62 virtual void handleIndication( 63 const OperationContext & context, 64 const CIMInstance & indication, 65 ResponseHandler<void> & handler) = 0; 66 67 // ATTN: The following method is only for testing purposes. 68 virtual void handleIndication( 69 const OperationContext & context, 70 const String & url, 71 const CIMInstance& indicationInstance) 72 { 73 } 74 }; 75 76 PEGASUS_NAMESPACE_END 77 78 #endif
I derived the eServer_ProviderModule
schema from
the PG_ProviderModule
schema that we are using. I
only made a three additions. I
added the new provider type; I added
some new provider interface types (looking ahead);
and I added the OperationalStatus
attributes to the provider. This last is important because providers
are now handling indications.
1 //================================================================= 2 // eServer_ProviderModule 3 // 4 // This schema is derived from PG_ProviderModule. It adds some 5 // qualifiers to support remote and out-of-proc providers. 6 // It also defines some new provider types. 7 // 8 //================================================================= 9 10 // Named (meta) elements: 11 // Class, Property, Method, Association, Reference, Property, Parameter, 12 // Indication, Schema, Trigger 13 14 #pragma local ("en_US") 15 16 Qualifier Remote : boolean = false , Scope(class, reference, property, method) ; 17 Qualifier Out_of_Proc : boolean = false, Scope(class, reference, property, method); 18 Qualifier Trusted : boolean = false, Scope(class, reference, property, method); 19 Qualifier isURL : boolean = false, Scope(reference, property); 20 21 //================================================================= 22 // eServer_ProviderModule 23 //================================================================= 24 25 [Version("2.0.0."), Description("Derived from PG_ProviderModule " 26 "modified to support additional provider types and additional " 27 "provider qualifications. ")] 28 class eServer_ProviderModule : PG_ProviderModule { 29 [Key, Override("Name"), Description("A human-readable name that uniquely " 30 "identifies the Provider Module, inherited from PG_ProviderModule.")] 31 string Name; 32 33 [ Override("Location"), Description("The file path to the module. " 34 " If the usURL qualifier is present, indicates that the " 35 "path is a URL. Otherwise the path is a string that represents " 36 " the location of the module in the local host's file system.") ] 37 string Location; 38 39 [ Override("InterfaceType"), Description ("The interface definition " 40 "supported by this module. Implies both a method signature and " 41 " a binary specification for linking to the module and executing its code. " 42 "In Pegasus 2.01 the InterfaceType also implies a specific " 43 "ProviderManagerService, although this link will be broken in the " 44 "future. "), 45 Values {"C++Default", "CMPI", "Java_Default", "Java_SNIA", 46 "Java_Wbem_Services", "Perl", "Unix_Domain", 47 "Named_Pipe", "WSDL_SOAP"}] 48 string InterfaceType; 49 50 [ Description ("The host of the computer upon which this module resides. " 51 "For local providers this will be \"localhost\". For remote " 52 "providers this property should contain the DNS name of the " 53 "remote host. ") ] 54 string host; 55 56 [ Description ("The PGP Signature of the module's file. " 57 "The provider manager or any other module can use this " 58 " signature to determine the authenticity if the module " 59 " and the integrity of the module's image. ")] 60 string pgp_Signature; 61 62 }; 63 64 65 //================================================================= 66 // eServer_Provider 67 // 68 // Derived from the PG_Provider class. The operational status 69 // properties from PG_ProviderModule are present in the eServer_Provider 70 // class. This is to recognize that providers may have operational 71 // states that are distinct from the states of their modules. 72 // 73 //================================================================= 74 75 [ Version("2.0.0"), Description("eServer_Provider is derived from " 76 "PG_Provider. ") ] 77 78 class eServer_Provider : PG_Provider { 79 [Key, Description("The scoping eServer_ProviderModule name. " 80 "Inherited as a propagated key from PG_Provider, having " 81 "the syntax PG_ProviderModule.name")] 82 string ProviderModuleName; 83 [Key, Override("Name"), Description("A human-readable name that: 84 "uniquely identifies the provider within the Provider Module." 85 "Inherited from PG_Provider.")] 86 string Name; 87 [Description ( 88 " Indicates the current status(es) of the element. " 89 "Various health and operational statuses are " 90 "defined. Many of the enumeration's values are self-" 91 "explanatory. However, a few are not and are described " 92 "in more detail. \"Stressed\" indicates that the element " 93 "is functioning, but needs attention. Examples of " 94 "\"Stressed\" states are overload, overheated, etc. " 95 "\"Predictive Failure\" indicates that an element is " 96 "functioning nominally but predicting a failure in the " 97 "near future. \"In Service\" describes an element being " 98 "configured, maintained, cleaned, or otherwise administered. " 99 "\"No Contact\" indicates that the monitoring system " 100 "has knowledge of this element, but has never been able to " 101 "establish communications with it. \"Lost Communication\" " 102 "indicates that the ManagedSystemElement is known to exist " 103 "and has been contacted successfully in the past, but is " 104 "currently unreachable. \"Stopped\" indicates that the " 105 "element is known to exist, is not operational (e.g., it " 106 "is unable to provide service to users), but it has not " 107 "failed. It has purposely been made non-operational. \n" 108 " OperationalStatus replaces the Status property on " 109 "ManagedSystemElement to provide a consistent approach to " 110 "enumerations, to address implementation needs for an " 111 "array property, and to provide a migration path from today's " 112 "environment to the future. This change was not made earlier " 113 "since it required the DEPRECATED qualifier. Due to the " 114 "widespread use of the existing Status property in " 115 "management applications, it is strongly recommended that " 116 "providers/instrumentation provide BOTH the Status and " 117 "OperationalStatus properties. As always, Status (since it " 118 "is single-valued) provides the primary status of the " 119 "element."), 120 ValueMap {"0", "1", "2", "3", "4", "5", "6", "7", "8", 121 "9", "10", "11", "12", "13"}, 122 Values {"Unknown", "Other", "OK", "Degraded", "Stressed", 123 "Predictive Failure", "Error", "Non-Recoverable Error", 124 "Starting", "Stopping", "Stopped", "In Service", 125 "No Contact", "Lost Communication"}, 126 ModelCorrespondence { 127 "CIM_ManagedSystemElement.OtherStatusDescription"} ] 128 uint16 OperationalStatus[]; 129 [Description ( 130 "A string describing the status - used when the " 131 "OperationalStatus property is set to 1 (\"Other\")."), 132 ModelCorrespondence { 133 "CIM_ManagedSystemElement.OperationalStatus"} ] 134 string OtherStatusDescription; 135 }; 136 137 138 //================================================================= 139 // eServer_ProviderCapabilities 140 // 141 // Derived from the PG_ProviderCapabilities class 142 // 143 //================================================================= 144 145 [Version ("2.0.0"), Description("An instance of ProviderCapabilities " 146 "describes a set of abilities for a specific provider.") ] 147 class eServer_ProviderCapabilities : PG_ProviderCapabilities { 148 [Key, Description("The scoping eServer_ProviderModuleName. " 149 "Inherited as a Propagated key from PG_ProviderCapabilities, " 150 "having the syntax PG_Provider.ProviderModuleName") ] 151 string ProviderModuleName; 152 [Key, Description("The scoping eServer_Provider Name." 153 "Inherited as a Propagated key from PG_ProviderCapabilities, " 154 "having the syntax PG_Provider.Name") ] 155 string ProviderName; 156 157 [Key, Description("A value that uniquely identifies this " 158 " set of abilities for the designated Providers.") ] 159 string CapabilityID; 160 161 [Override("ProviderType"), Description("ProviderType enumerates " 162 " the specific method types that the provider supports. "), 163 ArrayType("Indexed"), 164 ValueMap { "2", "3", "4", "5", "6" }, 165 Values{ "Instance", "Association", "Indication", "Method", 166 "Indication_Consumer" } ] 167 uint16 ProviderType[]; 168 }; 169
After creating a new type of provider, I needed to modify the
ProviderRegistrationManager
to load, find, and
dispatch the new provider type. This was pretty simple. I wrote
some additional routines that are basically copies of what is
already there.
Here is one of the new routines I wrote for the
ProviderRegistrationManager
. There are several others.
1 Boolean ProviderRegistrationManager::lookupConsumerProvider( 2 const String & nameSpace, 3 const String & className, 4 CIMInstance & provider, 5 CIMInstance & providerModule) 6 { 7 String providerName; 8 String providerModuleName; 9 10 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, 11 "ProviderRegistrationManager::lookupConsumerProvider"); 12 13 ProviderRegistrationTable* providerCapability = 0; 14 ProviderRegistrationTable* _provider= 0; 15 ProviderRegistrationTable* _providerModule = 0; 16 17 // 18 // create the key by using nameSpace, className, and providerType 19 // 20 String capabilityKey = _generateKey(nameSpace, className, CONSUMER_PROVIDER); 21 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4, 22 "\nnameSpace = " + nameSpace + "; className = " + 23 className + "; capabilityKey = " + capabilityKey); 24 25 try 26 { 27 // 28 // get provider capability instance from the table 29 // 30 if (!_registrationTable->table.lookup( 31 capabilityKey, providerCapability)) 32 { 33 PEG_METHOD_EXIT(); 34 throw CIMException(CIM_ERR_FAILED, CAPABILITY_NOT_REGISTERED); 35 } 36 37 Array<CIMInstance> instances = providerCapability->getInstances(); 38 39 Uint32 pos = instances[0].findProperty(_PROPERTY_PROVIDERNAME); 40 41 if (pos == PEG_NOT_FOUND) 42 { 43 PEG_METHOD_EXIT(); 44 throw CIMException(CIM_ERR_FAILED, 45 "Missing ProviderName which is key in PG_ProviderCapabilities class."); 46 } 47 48 // 49 // get provider name 50 // 51 instances[0].getProperty(pos).getValue().get(providerName); 52 53 // 54 // get provider module name 55 // 56 Uint32 pos2 = instances[0].findProperty(_PROPERTY_PROVIDERMODULENAME); 57 if (pos2 == PEG_NOT_FOUND) 58 { 59 60 PEG_METHOD_EXIT(); 61 throw CIMException(CIM_ERR_FAILED, 62 "Missing ProviderModuleName which is key in PG_ProviderCapabilities class."); 63 } 64 65 instances[0].getProperty(pos2).getValue().get(providerModuleName); 66 67 // 68 // create the key by using providerModuleName and providerName 69 // 70 String _providerKey = _generateKey(providerModuleName, providerName); 71 72 // 73 // create the key by using providerModuleName and MODULE_KEY 74 // 75 String _moduleKey = _generateKey(providerModuleName, MODULE_KEY); 76 77 // 78 // get provider instance from the table 79 // 80 if (!_registrationTable->table.lookup(_providerKey, _provider)) 81 { 82 PEG_METHOD_EXIT(); 83 throw CIMException(CIM_ERR_FAILED, PROVIDER_NOT_FOUND); 84 } 85 86 Array<CIMInstance> providerInstances = _provider->getInstances(); 87 provider = providerInstances[0]; 88 89 // 90 // get provider module instance from the table 91 // 92 if (!_registrationTable->table.lookup(_moduleKey, _providerModule)) 93 { 94 PEG_METHOD_EXIT(); 95 throw CIMException(CIM_ERR_FAILED, MODULE_NOT_FOUND); 96 } 97 98 Array<CIMInstance> providerModuleInstances = _providerModule->getInstances(); 99 providerModule = providerModuleInstances[0]; 100 101 } 102 catch(CIMException & exception) 103 { 104 Tracer::traceCIMException(TRC_PROVIDERMANAGER, Tracer::LEVEL4, exception); 105 PEG_METHOD_EXIT(); 106 return (false); 107 } 108 109 PEG_METHOD_EXIT(); 110 return (true); 111 }
This was also pretty simple. I piggybacked off the compilation and installation scripts that are already part of the project.
A couple of items to note in the MOF below. Line 47 shows a Type 6
provider, which is an Indication Consumer provider. Also, the
Indication provider and the Indication Consumer provider are both
part of the same module.
Here is the registration mof:
You can see that I simplified the PG_Subscription
class
in line 53 below. There are no policy attributes,
no attribute list, and no filter.
To get things working, I assumed that every subscription includes all attributes every time it is fired. I assumed that the subscription was created with valid class references and that providers were registered correctly.
In the schema, however, I left the filter in the Subscription definition. I just didn't make it a required attribute. I think it is viable to want to test Indication delivery without having to have filters defined and working.
1 instance of eServer_ProviderModule 2 { 3 Name = "ProcessIndicationProvider"; 4 Location = "ProcessIndicationProvider"; 5 Vendor = "Pegasus Community"; 6 Version = "2.0.0"; 7 InterfaceType = "C++Default"; 8 InterfaceVersion = "2.0.0"; 9 host = "localhost"; 10 OperationalStatus = {2}; 11 }; 12 13 instance of eServer_Provider 14 { 15 ProviderModuleName = "ProcessIndicationProvider"; 16 Name = "eServer_ProcessIndicationProvider"; 17 OperationalStatus = {2}; 18 }; 19 20 instance of eServer_ProviderCapabilities 21 { 22 ProviderModuleName = "ProcessIndicationProvider"; 23 ProviderName = "eServer_ProcessIndicationProvider"; 24 CapabilityID = "1"; 25 ClassName = "CIM_ProcessIndication"; 26 Namespaces = {"root/PG_Interop"}; 27 ProviderType = { 4 }; // indication 28 SupportedProperties = NULL; 29 SupportedMethods = NULL; 30 }; 31 32 33 instance of eServer_Provider 34 { 35 ProviderModuleName = "ProcessIndicationProvider"; 36 Name = "eServer_ProcessIndicationConsumer"; 37 OperationalStatus = {2}; 38 }; 39 40 instance of eServer_ProviderCapabilities 41 { 42 ProviderModuleName = "ProcessIndicationProvider"; 43 ProviderName = "eServer_ProcessIndicationConsumer"; 44 CapabilityID = "2"; 45 ClassName = "eServer_ProcessIndicationConsumer"; 46 Namespaces = {"root/PG_Interop"}; 47 ProviderType = { 6 }; // consumer 48 SupportedProperties = NULL; 49 SupportedMethods = NULL; 50 }; 51 52 53 instance of eServer_IndicationSubscription 54 { 55 Indication_Class = "CIM_ProcessIndication"; 56 Handler = "eServer_ProcessIndicationConsumer"; 57 };
To test Indications, I modified the provider in
$(PEGASUS_HOME)/src/Providers/sample/ProcessIndicationProvider
.
The biggest mod is the _monitor
routine, which runs
as an independent thread.
Most providers that generate indications will need to have an
independent thread. I chose to start and stop the thread in the
enableIndications
and
disableIndications
methods, respectively.
1 2 3 void ProcessIndicationProvider::enableIndications ( 4 ResponseHandler <CIMIndication> & handler) 5 { 6 cout << "eServer_ProcessIndicationProvider: enableIndications" << endl; 7 8 _enable_disable.lock(pegasus_thread_self()); 9 if(_indications_enabled.value()) 10 { 11 _enable_disable.unlock(); 12 return; 13 } 14 _response_handler = &handler; 15 _response_handler->processing (); 16 _indications_enabled = 1; 17 _indication_thread.run(); 18 _enable_disable.unlock(); 19 20 } 21 22 void ProcessIndicationProvider::disableIndications (void) 23 { 24 cout << "eServer_ProcessIndicationProvider: disableIndications" << endl; 25 _enable_disable.lock(pegasus_thread_self()); 26 27 if(_indications_enabled.value() == 0 ) 28 { 29 _enable_disable.unlock(); 30 return; 31 } 32 33 _indications_enabled = 0; 34 _response_handler = 0; 35 _indication_thread.join(); 36 _enable_disable.unlock(); 37 38 } 39
To deliver the indication, the _monitor
thread
calls deliver(CIMInstance)
using its response handler.
The importance of allocating the response handler on the heap in
the ProviderManagerService
and preserving it for the
life of the indication thread is demonstrated pretty clearly in the
code below. It should be obvious that the response handler must
not be destroyed after the subscription is enabled.
1 2 PEGASUS_THREAD_RETURN 3 PEGASUS_THREAD_CDECL 4 ProcessIndicationProvider::_monitor(void *parm) 5 { 6 PEGASUS_ASSERT(parm != 0); 7 8 Thread *th = static_cast<Thread *>(parm); 9 ProcessIndicationProvider *myself = 10 static_cast<ProcessIndicationProvider *>(th->get_parm()); 11 12 while(true) 13 { 14 pegasus_sleep(1000); 15 try 16 { 17 myself->_enable_disable.try_lock(pegasus_thread_self()); 18 } 19 catch(...) 20 { 21 if(myself->_indications_enabled.value() == 0 || 22 myself->_response_handler == 0 ) 23 exit_thread((PEGASUS_THREAD_RETURN)0); 24 else 25 continue; 26 } 27 try 28 { 29 30 CIMInstance indicationInstance ("root/PG_Interop:CIM_ProcessIndication"); 31 32 indicationInstance.addProperty 33 (CIMProperty ("IndicationTime", CIMValue (CIMDateTime ()))); 34 35 indicationInstance.addProperty 36 (CIMProperty ("IndicationIdentifier", "ProcessIndication01")); 37 38 Array <String> correlatedIndications; 39 indicationInstance.addProperty ( 40 CIMProperty ("CorrelatedIndications", 41 CIMValue (correlatedIndications))); 42 43 CIMIndication cimIndication (indicationInstance); 44 45 myself->_response_handler->deliver (cimIndication); 46 47 } 48 catch(...) 49 { 50 51 } 52 myself->_enable_disable.unlock(); 53 } 54 exit_thread((PEGASUS_THREAD_RETURN)0); 55 return(PEGASUS_THREAD_RETURN)0; 56 }
Toe receive Indications, I added an additional provider to the
module at
$(PEGASUS_HOME)/src/Providers/sample/ProcessIndicationProvider
.
The following code in the PegasusCreateProvider
symbol
shows how the module creates the different types of providers.
1 2 3 extern "C" PEGASUS_EXPORT CIMBaseProvider * PegasusCreateProvider 4 (const String & providerName) 5 { 6 7 if ((String::equalNoCase (providerName, "ProcessIndicationProvider")) || 8 (String::equalNoCase (providerName, "ProcessIndicationProvider2")) || 9 (String::equalNoCase (providerName, "eServer_ProcessIndicationProvider"))) 10 { 11 return (new ProcessIndicationProvider ()); 12 } 13 if(String::equalNoCase(providerName, "eServer_ProcessIndicationConsumer")) 14 { 15 return new ProcessIndicationConsumer(); 16 } 17 18 return (0); 19 }
The code to handle the Indication in the Consumer Provider is straightforward. Note that the consumer clones the indication immediately.
1 2 3 void ProcessIndicationConsumer::handleIndication( 4 const OperationContext & context, 5 const CIMInstance & indication, 6 ResponseHandler<void> & handler) 7 { 8 9 CIMInstance indication_copy = indication.clone(); 10 PEGASUS_STD(cout) << 11 "Indication Consumer: recieved indication of class: " << 12 indication_copy.getClassName() << PEGASUS_STD(endl); 13 14 }
Now we can show the entire pathway of an Indication as it is generated, moved through Pegasus, and handled.
IndicationService
sends a
CIMEnableIndicationsRequestMessage
to the
ProviderManagerService
.ProviderManagerService
creates a
response handler from the heap
and stores the handler in a hashtable.
It then calls the provider's enableIndications
method.
CIMIndicationProvider
handles the
call to enableIndications
by
creating an independent monitor
thread and storing the response handler.
CIMProcessIndicationRequestMessage
, which
contains the indication, to the
IndicationService
.IndicationService
retrieves subscriptions that match the
indication and creates a list of consumers. IndicationService
creates a
CIMConsumeIndicationRequestMessage
and sends
that message to the consumer.
CIMIndicationConsumer
recieves the
CIMConsumeIndicationRequestMessage
message it
clones the indication and processes it.
1 2 virtual void deliver(const OperationContext & context, const CIMIndication & cimIndication) 3 { 4 // ATTN: temporarily convert indication to instance 5 CIMInstance cimInstance(cimIndication); 6 7 // create message 8 CIMProcessIndicationRequestMessage * request = 9 new CIMProcessIndicationRequestMessage( 10 _request_copy.messageId, 11 cimInstance.getPath().getNameSpace(), 12 cimInstance, 13 QueueIdStack(_target->getQueueId(), _source->getQueueId())); 14 15 AsyncLegacyOperationStart * asyncRequest = 16 new AsyncLegacyOperationStart( 17 _source->get_next_xid(), 18 0, 19 _target->getQueueId(), 20 request, 21 _target->getQueueId()); 22 23 PEGASUS_ASSERT(asyncRequest != 0); 24 25 _source->SendForget(asyncRequest); 26 }
1 2 void eServerIndicationService::_handleProcessIndicationRequest( 3 const Message *message) 4 { 5 PEG_METHOD_ENTER (TRC_INDICATION_SERVICE, 6 "eServerIndicationService::_handleProcessIndicationRequest"); 7 8 CIMProcessIndicationRequestMessage* request = 9 (CIMProcessIndicationRequestMessage*) message; 10 11 CIMException cimException; 12 13 CIMProcessIndicationResponseMessage* response = 14 new CIMProcessIndicationResponseMessage( 15 request->messageId, 16 cimException, 17 request->queueIds.copyAndPop()); 18 try 19 { 20 /* get each name space */ 21 Array<String> nameSpaceNames; 22 _getNameSpaceNames(nameSpaceNames); 23 24 Array<CIMInstance> subscriptions; 25 CIMInstance indication = request->indicationInstance.clone(); 26 27 _getMatchingSubscriptions(indication, subscriptions); 28 29 // keep this super simple 30 // always deliver all properties of the indication class to the consumer 31 // every subscription has one indication class and one consumer 32 // the consumer is responsible for filtering and multiplexing 33 34 Array<CIMObjectPath> absent_consumers; 35 36 _deliverIndication(subscriptions, 37 indication, 38 absent_consumers); 39 40 } 41 catch(CIMException & e) 42 { 43 response->cimException = e; 44 } 45 catch(Exception & e) 46 { 47 response->cimException = PEGASUS_CIM_EXCEPTION(CIM_ERR_FAILED, 48 e.getMessage()); 49 } 50 catch(...) 51 { 52 response->cimException = PEGASUS_CIM_EXCEPTION(CIM_ERR_FAILED, 53 "Internal Error"); 54 } 55 _enqueueResponse(request, response); 56 57 PEG_METHOD_EXIT(); 58 }
1 2 // will return true if ONE subscription is successful. 3 // e.g., if one out of three subscriptions results in a 4 // successful delivery, will return true. if zero are 5 // successful, returns false. 6 // consumers that are absent; i.e., not found by the provider registration 7 // manager are returned in the absent_consumer 8 Boolean eServerIndicationService::_deliverIndication( 9 const Array<CIMInstance> & subscriptions, 10 const CIMInstance & indication, 11 Array<CIMObjectPath> & absent_consumers) 12 { 13 14 PEG_METHOD_ENTER (TRC_INDICATION_SERVICE, 15 "eServerIndicationService::_deliverIndication"); 16 CIMObjectPath consumer; 17 CIMInstance provider, providerModule; 18 Boolean ccode = false; 19 try 20 { 21 for(Uint8 i = 0 ; i < subscriptions.size(); ++i, consumer.clear()) 22 { 23 if(true == _get_object_path_from_association("Handler", 24 subscriptions[i], 25 consumer)) 26 { 27 28 // get the name space 29 if(true == _providerRegManager->lookupConsumerProvider( 30 subscriptions[i].getPath().getNameSpace(), 31 consumer.getClassName(), 32 provider, 33 providerModule)) 34 { 35 36 CIMConsumeIndicationRequestMessage *request = 37 new CIMConsumeIndicationRequestMessage( 38 XmlWriter::getNextMessageId (), 39 indication.getPath().getNameSpace(), 40 indication, 41 provider, 42 providerModule, 43 QueueIdStack(_providerManager, _queueId)); 44 45 AsyncOpNode *op = get_op(); 46 47 AsyncLegacyOperationStart *async_req = 48 new AsyncLegacyOperationStart( 49 get_next_xid(), 50 op, 51 _providerManager, 52 request, 53 _queueId); 54 try 55 { 56 if( false == SendForget(async_req) ) 57 { 58 async_req = static_cast<AsyncLegacyOperationStart *>(op->get_request()); 59 if(async_req) 60 delete async_req->get_action(); 61 delete async_req; 62 delete op; 63 // the consumer may not truly be absent, but we couldn't get 64 // the message to him. Maybe the service can do something 65 // with this information. 66 absent_consumers.append(consumer); 67 } 68 else 69 { 70 ccode = true; 71 } 72 } 73 catch(...) 74 { 75 76 async_req = static_cast<AsyncLegacyOperationStart *>(op->get_request()); 77 if(async_req) 78 delete async_req->get_action(); 79 delete async_req; 80 delete op; 81 // the consumer may not truly be absent, but we couldn't get 82 // the message to him. Maybe the service can do something 83 // with this information. 84 absent_consumers.append(consumer); 85 } 86 } // handler is registered as a consumer provider 87 else 88 { 89 absent_consumers.append(consumer); 90 } 91 92 } // found the handler path 93 } // for each subscription 94 } // try 95 catch(...) 96 { 97 throw; 98 } 99 100 // each subscription contains a reference to a provider 101 // that can handle the subscription. extract the handler 102 // instance from the reference 103 PEG_METHOD_EXIT(); 104 return ccode; 105 }
Author: Mike Day
State: Draft
Type: Architecture
Vote:
Created: 6 January 2003
Version 1.0 Mon Jan 6 17:39:09 2003