1 s.kodali 1.1 //%LICENSE////////////////////////////////////////////////////////////////
2 //
3 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
9 //
10 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
16 //
17 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
19 //
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 s.kodali 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //////////////////////////////////////////////////////////////////////////
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Signal.h>
33 #include <Pegasus/Common/Config.h>
34 #include <Pegasus/Common/Constants.h>
35 #include <Pegasus/Common/AutoPtr.h>
36 #include <Pegasus/Common/ArrayInternal.h>
37 #include <Pegasus/Common/CIMMessage.h>
38 #include <Pegasus/Common/CIMMessageSerializer.h>
39 #include <Pegasus/Common/CIMMessageDeserializer.h>
40 #include <Pegasus/Common/OperationContextInternal.h>
41 #include <Pegasus/Common/System.h>
42 #include <Pegasus/Common/AnonymousPipe.h>
43 s.kodali 1.1 #include <Pegasus/Common/Tracer.h>
44 #include <Pegasus/Common/Logger.h>
45 #include <Pegasus/Common/Thread.h>
46 #include <Pegasus/Common/MessageQueueService.h>
47 #include <Pegasus/Config/ConfigManager.h>
48 #include <Pegasus/Common/Executor.h>
49 #include <Pegasus/Common/StringConversion.h>
50 #include <Pegasus/Common/SCMOClassCache.h>
51
52 #if defined (PEGASUS_OS_TYPE_WINDOWS)
53 # include <windows.h> // For CreateProcess()
54 #elif defined (PEGASUS_OS_VMS)
55 # include <perror.h>
56 # include <climsgdef.h>
57 # include <stdio.h>
58 # include <stdlib.h>
59 # include <string.h>
60 # include <processes.h>
61 # include <unixio.h>
62 #else
63 # include <unistd.h> // For fork(), exec(), and _exit()
64 s.kodali 1.1 # include <errno.h>
65 # include <sys/types.h>
66 # include <sys/resource.h>
67 # if defined(PEGASUS_HAS_SIGNALS)
68 # include <sys/wait.h>
69 # endif
70 #endif
71
72 #include "OOPProviderManagerRouter.h"
73
74 PEGASUS_USING_STD;
75
76 PEGASUS_NAMESPACE_BEGIN
77
78 static String _GROUP_PREFIX = "grp:";
79 static String _MODULE_PREFIX = "mod:";
80
81 static struct timeval deallocateWait = {300, 0};
82
83 // This calss is used to aggregate the responses sent when a single requests can
84 // result in many responses and these responses need to be aggregated before a
85 s.kodali 1.1 // response is sent back to the ProviderManageService.
86 class RespAggCounter
87 {
88 public:
89 RespAggCounter(Uint32 count):
90 _expectedResponseCount(count),
91 _receivedResponseCount(0)
92 {
93 }
94
95 Boolean isComplete(CIMException &e)
96 {
97 AutoMutex mtx(_mutex);
98 if (e.getCode() != CIM_ERR_SUCCESS)
99 {
100 _exception = e;
101 }
102 _receivedResponseCount++;
103 return _receivedResponseCount == _expectedResponseCount ;
104 }
105
106 s.kodali 1.1 CIMException getException()
107 {
108 return _exception;
109 }
110
111 private:
112 Mutex _mutex;
113 Uint32 _expectedResponseCount, _receivedResponseCount ;
114 CIMException _exception;
115 };
116
117
118 /////////////////////////////////////////////////////////////////////////////
119 // OutstandingRequestTable and OutstandingRequestEntry
120 /////////////////////////////////////////////////////////////////////////////
121
122 /**
123 An OutstandingRequestEntry represents a request message sent to a
124 Provider Agent for which no response has been received. The request
125 sender provides the message ID and a location for the response to be
126 returned. When a response matching the message ID is received, the
127 s.kodali 1.1 OutstandingRequestEntry is updated to indicate that the response
128 will arrive asynchronously. This entry will be deleted
129 when the response arrives. */
130 class OutstandingRequestEntry
131 {
132 public:
133 OutstandingRequestEntry(
134 String originalMessageId_,
135 CIMRequestMessage* requestMessage_,
136 CIMResponseMessage*& responseMessage_,
137 RespAggCounter* respAggregator_=NULL)
138 : originalMessageId(originalMessageId_),
139 requestMessage(requestMessage_),
140 responseMessage(responseMessage_),
141 respAggregator(respAggregator_)
142 {
143 }
144
145 /**
146 A unique value is substituted as the request messageId attribute to
147 allow responses to be definitively correllated with requests.
148 s.kodali 1.1 The original messageId value is stored here to avoid a race condition
149 between the processing of a response chunk and the resetting of the
150 original messageId in the request message.
151 */
152 String originalMessageId;
153 CIMRequestMessage* requestMessage;
154 CIMResponseMessage*& responseMessage;
155
156 // The aggregator object which aggregates the responses for requests
157 // like CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE etc.
158 RespAggCounter* respAggregator;
159 };
160
161 typedef HashTable<String, SharedPtr<OutstandingRequestEntry>, EqualFunc<String>,
162 HashFunc<String> > OutstandingRequestTable;
163
164 class RetryThreadParam{
165 public:
166 ProviderAgentContainer *pac;
167 Array<CIMRequestMessage *> retryRequestArray;
168 };
169 s.kodali 1.1
170
171 /////////////////////////////////////////////////////////////////////////////
172 // ProviderAgentContainer
173 /////////////////////////////////////////////////////////////////////////////
174
175 class ProviderAgentContainer
176 {
177 public:
178 ProviderAgentContainer(
|
521 s.kodali 1.1 (const char*)_moduleOrGroupName.getCString(),
522 ConfigManager::getPegasusHome(),
523 _userName,
524 pid,
525 readPipe,
526 writePipe);
527
528 if (status != 0)
529 {
530 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1,
531 "Executor::startProviderAgent() failed"));
532 PEG_METHOD_EXIT();
533 throw Exception(MessageLoaderParms(
534 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
535 "Failed to start cimprovagt \"$0\".",
536 _moduleOrGroupName));
537 }
538
539 # if defined(PEGASUS_HAS_SIGNALS)
540 _pid = pid;
541 # endif
542 s.kodali 1.1
543 _pipeFromAgent.reset(readPipe);
544 _pipeToAgent.reset(writePipe);
545
546 PEG_METHOD_EXIT();
547 }
548
549 // Note: Caller must lock _agentMutex
550 void ProviderAgentContainer::_sendInitializationData()
551 {
552 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
553 "ProviderAgentContainer::_sendInitializationData");
554
555 //
556 // Gather config properties to pass to the Provider Agent
557 //
558 ConfigManager* configManager = ConfigManager::getInstance();
559 Array<Pair<String, String> > configProperties;
560
561 Array<String> configPropertyNames;
562 configManager->getAllPropertyNames(configPropertyNames, true);
563 s.kodali 1.1 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
564 {
565 String configPropertyValue =
566 configManager->getCurrentValue(configPropertyNames[i]);
567 String configPropertyDefaultValue =
568 configManager->getDefaultValue(configPropertyNames[i]);
569 if (configPropertyValue != configPropertyDefaultValue)
570 {
571 configProperties.append(Pair<String, String>(
572 configPropertyNames[i], configPropertyValue));
573 }
574 }
575
576 //
577 // Create a Provider Agent initialization message
578 //
579 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
580 new CIMInitializeProviderAgentRequestMessage(
581 String("0"), // messageId
582 configManager->getPegasusHome(),
583 configProperties,
584 s.kodali 1.1 System::bindVerbose,
585 _subscriptionInitComplete,
586 QueueIdStack()));
587
588 //
589 // Write the initialization message to the pipe
590 //
591 AnonymousPipe::Status writeStatus =
592 _pipeToAgent->writeMessage(request.get());
593
594 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
595 {
596 PEG_METHOD_EXIT();
597 throw Exception(MessageLoaderParms(
598 "ProviderManager.OOPProviderManagerRouter."
599 "CIMPROVAGT_COMMUNICATION_FAILED",
600 "Failed to communicate with cimprovagt \"$0\".",
601 _moduleOrGroupName));
602 }
603
604 // Wait for a null response from the Provider Agent indicating it has
605 s.kodali 1.1 // initialized successfully.
606
607 CIMMessage* message;
608 AnonymousPipe::Status readStatus;
609 do
610 {
611 readStatus = _pipeFromAgent->readMessage(message);
612
613 } while (readStatus == AnonymousPipe::STATUS_INTERRUPT);
614
615 if (readStatus != AnonymousPipe::STATUS_SUCCESS)
616 {
617 PEG_METHOD_EXIT();
618 throw Exception(MessageLoaderParms(
619 "ProviderManager.OOPProviderManagerRouter."
620 "CIMPROVAGT_COMMUNICATION_FAILED",
621 "Failed to communicate with cimprovagt \"$0\".",
622 _moduleOrGroupName));
623 }
624
625 PEGASUS_ASSERT(message == 0);
626 s.kodali 1.1
627 PEG_METHOD_EXIT();
628 }
629
630 // Note: Caller must lock _agentMutex
631 void ProviderAgentContainer::_initialize()
632 {
633 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
634 "ProviderAgentContainer::_initialize");
635
636 if (_isInitialized)
637 {
638 PEGASUS_ASSERT(0);
639 PEG_METHOD_EXIT();
640 return;
641 }
642
643 //Get the current value of maxProviderProcesses
644 String maxProviderProcessesString = ConfigManager::getInstance()->
645 getCurrentValue("maxProviderProcesses");
646 Uint64 v;
647 s.kodali 1.1 StringConversion::decimalStringToUint64(
648 maxProviderProcessesString.getCString(),
649 v);
650 Uint32 maxProviderProcesses = (Uint32)v;
651
652 char* end = 0;
653
654 {
655 AutoMutex lock(_numProviderProcessesMutex);
656
657 if ((maxProviderProcesses != 0) &&
658 (_numProviderProcesses >= maxProviderProcesses))
659 {
660 throw PEGASUS_CIM_EXCEPTION(
661 CIM_ERR_FAILED,
662 MessageLoaderParms(
663 "ProviderManager.OOPProviderManagerRouter."
664 "MAX_PROVIDER_PROCESSES_REACHED",
665 "The maximum number of cimprovagt processes has been "
666 "reached."));
667 }
668 s.kodali 1.1 else
669 {
670 _numProviderProcesses++;
671 }
672 }
673
674 try
675 {
676 _startAgentProcess();
677 _isInitialized = true;
678 _sendInitializationData();
679
680 // Start a thread to read and process responses from the Provider Agent
681 ThreadStatus rtn = PEGASUS_THREAD_OK;
682 while ((rtn = MessageQueueService::get_thread_pool()->
683 allocate_and_awaken(this, _responseProcessor)) !=
684 PEGASUS_THREAD_OK)
685 {
686 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
687 {
688 Threads::yield();
689 s.kodali 1.1 }
690 else
691 {
692 PEG_TRACE_CSTRING(TRC_PROVIDERMANAGER, Tracer::LEVEL1,
693 "Could not allocate thread to process responses from the "
694 "provider agent.");
695
696 throw Exception(MessageLoaderParms(
697 "ProviderManager.OOPProviderManagerRouter."
698 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
699 "Failed to allocate thread for cimprovagt \"$0\".",
700 _moduleOrGroupName));
701 }
702 }
703 }
704 catch (...)
705 {
706 // Closing the connection causes the agent process to exit
707 _pipeToAgent.reset();
708 _pipeFromAgent.reset();
709
710 s.kodali 1.1 #if defined(PEGASUS_HAS_SIGNALS)
711 if (_isInitialized)
712 {
713 // Harvest the status of the agent process to prevent a zombie
714 int status = Executor::reapProviderAgent(_pid);
715
716 if (status == -1)
717 {
718 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
719 "ProviderAgentContainer::_initialize(): "
720 "Executor::reapProviderAgent() failed"));
721 }
722 }
723 #endif
724
725 _isInitialized = false;
726
727 {
728 AutoMutex lock(_numProviderProcessesMutex);
729 _numProviderProcesses--;
730 }
731 s.kodali 1.1
732 PEG_METHOD_EXIT();
733 throw;
734 }
735
736 PEG_METHOD_EXIT();
737 }
738
739 Boolean ProviderAgentContainer::isInitialized()
740 {
741 AutoMutex lock(_agentMutex);
742 return _isInitialized;
743 }
744
745 void ProviderAgentContainer::_uninitialize(Boolean cleanShutdown)
746 {
747 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
748 "ProviderAgentContainer::_uninitialize");
749
750 #if defined(PEGASUS_HAS_SIGNALS)
751 pid_t pid = 0;
752 s.kodali 1.1 #endif
753
754 try
755 {
756 AutoMutex lock(_agentMutex);
757
758 PEGASUS_ASSERT(_isInitialized);
759
760 // Close the connection with the Provider Agent
761 _pipeFromAgent.reset();
762 _pipeToAgent.reset();
763
764 _providerModuleCache = CIMInstance();
765
766 {
767 AutoMutex lock2(_numProviderProcessesMutex);
768 _numProviderProcesses--;
769 }
770
771 _isInitialized = false;
772
773 s.kodali 1.1 #if defined(PEGASUS_HAS_SIGNALS)
774 // Save the _pid so we can use it after we've released the _agentMutex
775 pid = _pid;
776 #endif
777
778 // In case of a clean shutdown requests which could not be processed are
779 // retried in a new thread.
780 Array<CIMRequestMessage *> retryReqArray;
781
782 //
783 // Complete with null responses all outstanding requests on this
784 // connection
785 //
786 {
787 AutoMutex tableLock(_outstandingRequestTableMutex);
788
789 for (OutstandingRequestTable::Iterator i =
790 _outstandingRequestTable.start();
791 i != 0; i++)
792 {
793 Boolean sendResponseNow = false;
794 s.kodali 1.1 CIMResponseMessage *response;
795
796 if(cleanShutdown)
797 {
798 MessageType msgType = i.value()->requestMessage->getType();
799
800 if(msgType == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE ||
801 msgType == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE ||
802 msgType ==
803 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE ||
804 msgType ==
805 CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE ||
806 msgType == CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE)
807 {
|
818 s.kodali 1.1 }
819 else
820 {
821 // retry the request
822 retryReqArray.append(i.value()->requestMessage);
823 }
824 }
825 else
826 {
827 response = i.value()->requestMessage->buildResponse();
828 response->cimException = PEGASUS_CIM_EXCEPTION(
829 CIM_ERR_FAILED,
830 MessageLoaderParms(
831 "ProviderManager.OOPProviderManagerRouter."
832 "CIMPROVAGT_CONNECTION_LOST",
833 "Lost connection with cimprovagt \"$0\".",
834 _moduleOrGroupName));
835 sendResponseNow = true;
836 }
837
838 if(sendResponseNow)
839 s.kodali 1.1 {
840 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL4,
841 "Completing messageId \"%s\" with a default response.",
842 (const char*)i.key().getCString()));
843
844 response->setComplete(true);
845 _asyncResponseCallback(
846 i.value()->requestMessage,
847 response);
848 }
849 // delete the response aggregator
850 delete i.value()->respAggregator;
851 }
852 _outstandingRequestTable.clear();
853 }
854
855 if(retryReqArray.size() > 0 )
856 {
857 ThreadStatus rtn = PEGASUS_THREAD_OK;
858 AutoPtr<RetryThreadParam> parms(new RetryThreadParam());
859 parms->pac = this;
860 s.kodali 1.1 parms->retryRequestArray = retryReqArray;
861
862 Boolean didRetry = true;
863
864 while((rtn = _threadPool->allocate_and_awaken(
865 (void*)parms.release(),
866 ProviderAgentContainer::_retryRequestHandler))
867 != PEGASUS_THREAD_OK)
868 {
869 if(rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
870 {
871 Threads::yield();
872 }
873 else
874 {
875 PEG_TRACE((TRC_PROVIDERMANAGER,
876 Tracer::LEVEL1,
877 "Could not allocate thread to retry "
878 "request in %s",
879 (const char *)_moduleOrGroupName. \
880 getCString()));
881 s.kodali 1.1 didRetry = false;
882 }
883 }
884
885 if(!didRetry)
886 {
887 for(Uint32 i=0; i<retryReqArray.size(); i++)
888 {
889 CIMResponseMessage *response =
890 retryReqArray[i]->buildResponse();
891 response->setComplete(true);
892 response->cimException =
893 PEGASUS_CIM_EXCEPTION(
894 CIM_ERR_FAILED,
895 MessageLoaderParms("ProviderManager."
896 "OOPProviderManagerRouter."
897 "REQUEST_RETRY_THREAD_"
898 "ALLOCATION_FAILED",
899 "Failed to allocate a thread to "
900 "retry a request in \"$0\".",
901 _moduleOrGroupName));
902 s.kodali 1.1
903 _asyncResponseCallback(
904 retryReqArray[i],
905 response);
906 }
907 }
908 }
909
910 //
911 // If not a clean shutdown, call the provider module failure callback
912 //
913 if (!cleanShutdown)
914 {
915 //
916 // Call the provider module failure callback to communicate
917 // the failure to the Provider Manager Service. The Provider
918 // Manager Service will inform the Indication Service.
919 //
920
921 // If this agent is servicing the group of modules, get all related
922 // provider module names.
923 s.kodali 1.1 Boolean isGroup = !String::compare(
924 _groupNameWithType, _GROUP_PREFIX, 4);
925
926 _providerModuleGroupFailCallback(
927 _moduleOrGroupName, _userName, _userContext, isGroup);
928 }
929 }
930 catch (...)
931 {
932 // We're uninitializing, so do not propagate the exception
933 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
934 "Ignoring _uninitialize() exception.");
935 }
936
937 #if defined(PEGASUS_HAS_SIGNALS)
938 // Harvest the status of the agent process to prevent a zombie. Do not
939 // hold the _agentMutex during this operation.
940
941 if ((pid != 0) && (Executor::reapProviderAgent(pid) == -1))
942 {
943 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL2,
944 s.kodali 1.1 "ProviderAgentContainer::_uninitialize(): "
945 "Executor::reapProviderAgent() failed."));
946 }
947 #endif
948
949 PEG_METHOD_EXIT();
950 }
951
952 String ProviderAgentContainer::getGroupNameWithType() const
953 {
954 return _groupNameWithType;
955 }
956
957 CIMResponseMessage* ProviderAgentContainer::processMessage(
958 CIMRequestMessage* request,RespAggCounter* respAggregator)
959 {
960 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
961 "ProviderAgentContainer::processMessage");
962
963 CIMResponseMessage* response;
964 MessageType msgType = request->getType();
965 s.kodali 1.1
966 do
967 {
968 response = _processMessage(request,respAggregator);
969
970 if (response == _REQUEST_NOT_PROCESSED)
971 {
972 // Check for request message types that should not be retried.
973 if ((request->getType() ==
974 CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
975 (request->getType() ==
976 CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) ||
977 (request->getType() ==
978 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
979 (request->getType() ==
980 CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE) ||
981 (request->getType() ==
982 CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE))
983 {
984 response = request->buildResponse();
985 break;
986 s.kodali 1.1 }
987 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
988 {
989 response = request->buildResponse();
990 CIMDisableModuleResponseMessage* dmResponse =
991 dynamic_cast<CIMDisableModuleResponseMessage*>(response);
992 PEGASUS_ASSERT(dmResponse != 0);
993
994 Array<Uint16> operationalStatus;
995 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
996 dmResponse->operationalStatus = operationalStatus;
997 break;
998 }
999 }
1000 } while (response == _REQUEST_NOT_PROCESSED);
1001
1002 if (msgType == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1003 {
1004 _subscriptionInitComplete = true;
1005 }
1006 else if (msgType ==
1007 s.kodali 1.1 CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE)
1008 {
1009 _subscriptionInitComplete = false;
1010 }
1011
1012 PEG_METHOD_EXIT();
1013 return response;
1014 }
1015
1016 CIMResponseMessage* ProviderAgentContainer::_processMessage(
1017 CIMRequestMessage* request, RespAggCounter *respAggregator)
1018 {
1019 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1020 "ProviderAgentContainer::_processMessage");
1021
1022 CIMResponseMessage* response;
1023 String originalMessageId = request->messageId;
1024
1025 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL3,
1026 "ProviderAgentContainer, process message ID %s",
1027 (const char*)request->messageId.getCString()));
1028 s.kodali 1.1
1029 // These three variables are used for the provider module optimization.
1030 // See the _providerModuleCache member description for more information.
1031 AutoPtr<ProviderIdContainer> origProviderId;
1032 Boolean doProviderModuleOptimization = false;
1033 Boolean updateProviderModuleCache = false;
1034
1035 try
1036 {
1037 // The messageId attribute is used to correlate response messages
1038 // from the Provider Agent with request messages, so it is imperative
1039 // that the ID is unique for each request. The incoming ID cannot be
1040 // trusted to be unique, so we substitute a unique one. The memory
1041 // address of the request is used as the source of a unique piece of
1042 // data. (The message ID is only required to be unique while the
1043 // request is outstanding.)
1044 char messagePtrString[20];
1045 sprintf(messagePtrString, "%p", request);
1046 String uniqueMessageId = messagePtrString;
1047
1048 //
1049 s.kodali 1.1 // Set up the OutstandingRequestEntry for this request
1050 //
1051 SharedPtr<OutstandingRequestEntry> outstandingRequestEntry(
1052 new OutstandingRequestEntry(
1053 originalMessageId,
1054 request,
1055 response,
1056 respAggregator));
1057
1058 //
1059 // Lock the Provider Agent Container while initializing the
1060 // agent and writing the request to the connection
1061 //
1062 {
1063 AutoMutex lock(_agentMutex);
1064
1065 //
1066 // Initialize the Provider Agent, if necessary
1067 //
1068 if (!_isInitialized)
1069 {
1070 s.kodali 1.1 _initialize();
1071 }
1072
1073 //
1074 // Add an entry to the OutstandingRequestTable for this request
1075 //
1076 {
1077 AutoMutex tableLock(_outstandingRequestTableMutex);
1078
1079 _outstandingRequestTable.insert(
1080 uniqueMessageId, outstandingRequestEntry);
1081 }
1082
1083 // Get the provider module from the ProviderIdContainer to see if
1084 // we can optimize out the transmission of this instance to the
1085 // Provider Agent. (See the _providerModuleCache description.)
1086 if (request->operationContext.contains(ProviderIdContainer::NAME))
1087 {
1088 ProviderIdContainer pidc = request->operationContext.get(
1089 ProviderIdContainer::NAME);
1090 origProviderId.reset(new ProviderIdContainer(
1091 s.kodali 1.1 pidc.getModule(), pidc.getProvider(),
1092 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1093 if (_providerModuleCache.isUninitialized() ||
1094 (!pidc.getModule().identical(_providerModuleCache)))
1095 {
1096 // We haven't sent this provider module instance to the
1097 // Provider Agent yet. Update our cache after we send it.
1098 updateProviderModuleCache = true;
1099 }
1100 else
1101 {
1102 // Replace the provider module in the ProviderIdContainer
1103 // with an uninitialized instance. We'll need to put the
1104 // original one back after the message is sent.
1105 ProviderIdContainer newpidc = ProviderIdContainer(
1106 CIMInstance(), pidc.getProvider(),
1107 pidc.isRemoteNameSpace(), pidc.getRemoteInfo());
1108 newpidc.setProvMgrPath(pidc.getProvMgrPath());
1109 request->operationContext.set(newpidc);
1110
1111 doProviderModuleOptimization = true;
1112 s.kodali 1.1 }
1113 }
1114
1115 //
1116 // Write the message to the pipe
1117 //
1118 try
1119 {
1120 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL3,
1121 "Sending request to agent with messageId %s",
1122 (const char*)uniqueMessageId.getCString()));
1123
1124 request->messageId = uniqueMessageId;
1125 AnonymousPipe::Status writeStatus =
1126 _pipeToAgent->writeMessage(request);
1127 request->messageId = originalMessageId;
1128
1129 if (doProviderModuleOptimization)
1130 {
1131 request->operationContext.set(*origProviderId.get());
1132 }
1133 s.kodali 1.1
1134 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
1135 {
1136 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1137 "Failed to write message to pipe. writeStatus = %d.",
1138 writeStatus));
1139
1140 request->messageId = originalMessageId;
1141
1142 if (doProviderModuleOptimization)
1143 {
1144 request->operationContext.set(*origProviderId.get());
1145 }
1146
1147 // Remove this OutstandingRequestTable entry
1148 {
1149 AutoMutex tableLock(_outstandingRequestTableMutex);
1150 Boolean removed =
1151 _outstandingRequestTable.remove(uniqueMessageId);
1152 PEGASUS_ASSERT(removed);
1153 }
1154 s.kodali 1.1
1155 // A response value of _REQUEST_NOT_PROCESSED indicates
1156 // that the request was not processed by the provider
1157 // agent, so it can be retried safely.
1158 PEG_METHOD_EXIT();
1159 return _REQUEST_NOT_PROCESSED;
1160 }
1161
1162 if (updateProviderModuleCache)
1163 {
1164 _providerModuleCache = origProviderId->getModule();
1165 }
1166
1167 response = request->buildResponse();
1168 response->isAsyncResponsePending = true;
1169 PEG_METHOD_EXIT();
1170
1171 return response;
1172 }
1173 catch (...)
1174 {
1175 s.kodali 1.1 request->messageId = originalMessageId;
1176
1177 if (doProviderModuleOptimization)
1178 {
1179 request->operationContext.set(*origProviderId.get());
1180 }
1181
1182 PEG_TRACE_CSTRING(TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1183 "Failed to write message to pipe.");
1184 // Remove the OutstandingRequestTable entry for this request
1185 {
1186 AutoMutex tableLock(_outstandingRequestTableMutex);
1187 Boolean removed =
1188 _outstandingRequestTable.remove(uniqueMessageId);
1189 PEGASUS_ASSERT(removed);
1190 }
1191 PEG_METHOD_EXIT();
1192 throw;
1193 }
1194 }
1195 }
1196 s.kodali 1.1 catch (CIMException& e)
1197 {
1198 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1199 "Caught CIMException: %s",
1200 (const char*)e.getMessage().getCString()));
1201 response = request->buildResponse();
1202 response->cimException = e;
1203 }
1204 catch (Exception& e)
1205 {
1206 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1207 "Caught Exception: %s",
1208 (const char*)e.getMessage().getCString()));
1209 response = request->buildResponse();
1210 response->cimException = PEGASUS_CIM_EXCEPTION(
1211 CIM_ERR_FAILED, e.getMessage());
1212 }
1213 catch (...)
1214 {
1215 PEG_TRACE_CSTRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1216 "Caught unknown exception");
1217 s.kodali 1.1 response = request->buildResponse();
1218 response->cimException = PEGASUS_CIM_EXCEPTION(
1219 CIM_ERR_FAILED, String());
1220 }
1221
1222 response->messageId = originalMessageId;
1223 response->syncAttributes(request);
1224
1225 PEG_METHOD_EXIT();
1226 return response;
1227 }
1228
1229 void ProviderAgentContainer::unloadIdleProviders()
1230 {
1231 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1232 "ProviderAgentContainer::unloadIdleProviders");
1233
1234 AutoMutex lock(_agentMutex);
1235 if (_isInitialized)
1236 {
1237 // Send a "wake up" message to the Provider Agent.
1238 s.kodali 1.1 // Don't bother checking whether the operation is successful.
1239 Uint32 messageLength = 0;
1240 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
1241 }
1242
1243 PEG_METHOD_EXIT();
1244 }
1245
1246 void ProviderAgentContainer::cleanDisconnectedClientRequests()
1247 {
1248 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1249 "ProviderAgentContainer::cleanDisconnectedClientRequests");
1250
1251 // Array to store the keys which need to be remvoed.
1252 Array<String> keys;
1253
1254 AutoMutex tableLock(_outstandingRequestTableMutex);
1255 for (OutstandingRequestTable::Iterator i = _outstandingRequestTable.start();
1256 i != 0; i++)
1257 {
1258 if(!_isClientActive(i.value()->requestMessage))
1259 s.kodali 1.1 {
1260 // create empty response and set isComplete to true.
1261 AutoPtr<CIMResponseMessage> response;
1262 SharedPtr<OutstandingRequestEntry> entry = i.value();
1263 response.reset(i.value()->requestMessage->buildResponse());
1264 response->setComplete(true);
1265 response->messageId = i.value()->originalMessageId;
1266 _asyncResponseCallback(
1267 i.value()->requestMessage,
1268 response.release());
1269 keys.append(i.key());
1270 }
1271 }
1272
1273 for(Uint32 j=0; j<keys.size();j++)
1274 {
1275 _outstandingRequestTable.remove(keys[j]);
1276 }
1277 PEG_METHOD_EXIT();
1278 }
1279
1280 s.kodali 1.1 void ProviderAgentContainer::_processGetSCMOClassRequest(
1281 ProvAgtGetScmoClassRequestMessage* request)
1282 {
1283 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1284 "ProviderAgentContainer::_processGetSCMOClassRequest");
1285
1286 AutoPtr<ProvAgtGetScmoClassResponseMessage> response(
1287 new ProvAgtGetScmoClassResponseMessage(
1288 XmlWriter::getNextMessageId(),
1289 CIMException(),
1290 QueueIdStack(),
1291 SCMOClass("","")));
1292
1293 CString ns = request->nameSpace.getString().getCString();
1294 CString cn = request->className.getString().getCString();
1295
1296 delete request;
1297
1298 response->scmoClass = SCMOClassCache::getInstance()->getSCMOClass(
1299 ns,strlen(ns),
1300 cn,strlen(cn));
1301 s.kodali 1.1
1302 //
1303 // Lock the Provider Agent Container and
1304 // writing the response to the connection
1305 //
1306 {
1307
1308 AutoMutex lock(_agentMutex);
1309
1310 //
1311 // Write the message to the pipe
1312 //
1313 try
1314 {
1315
1316 AnonymousPipe::Status writeStatus =
1317 _pipeToAgent->writeMessage(response.get());
1318
1319 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
1320 {
1321 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1322 s.kodali 1.1 "Failed to write message to pipe. writeStatus = %d.",
1323 writeStatus));
1324
1325 PEG_METHOD_EXIT();
1326 return;
1327 }
1328
1329 }
1330 catch (Exception & e)
1331 {
1332 PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1333 "Exception: Failed to write message to pipe. Error: %s",
1334 (const char*)e.getMessage().getCString()));
1335 PEG_METHOD_EXIT();
1336 throw;
1337 }
1338 catch (...)
1339 {
1340
1341 PEG_TRACE_CSTRING(TRC_PROVIDERMANAGER, Tracer::LEVEL1,
1342 "Unkonwn exception. Failed to write message to pipe.");
1343 s.kodali 1.1 PEG_METHOD_EXIT();
1344 throw;
1345 }
1346 }
1347
1348 PEG_METHOD_EXIT();
1349 return;
1350 }
1351
1352 Boolean ProviderAgentContainer::_isClientActive(CIMRequestMessage *request_)
1353 {
1354 MessageQueue *connectionMQ = MessageQueue::lookup(request_->queueIds[0]);
1355 return connectionMQ->isActive();
1356 }
1357
1358 // Note: This method should not throw an exception. It is used as a thread
1359 // entry point, and any exceptions thrown are ignored.
1360 ThreadReturnType PEGASUS_THREAD_CDECL
1361 ProviderAgentContainer::_retryRequestHandler(void* arg)
1362 {
1363 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1364 s.kodali 1.1 "ProviderAgentContainer::_retryRequestHandler");
1365
1366 PEGASUS_ASSERT(arg != 0);
1367 RetryThreadParam *threadParams=
1368 reinterpret_cast<RetryThreadParam *>(arg);
1369 Array<CIMRequestMessage *> retryRequests = threadParams->retryRequestArray;
1370
1371 try
1372 {
1373 for(Uint32 i=0; i<retryRequests.size(); i++)
1374 {
1375 threadParams->pac->processMessage(retryRequests[i]);
1376 }
1377 }
1378 catch(Exception &e)
1379 {
1380 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
1381 "Unexpected exception in _retryRequestHandler: %s",
1382 (const char*)e.getMessage().getCString()));
1383 }
1384 catch (...)
1385 s.kodali 1.1 {
1386 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
1387 "Unexpected exception in _retryRequestHandler.");
1388 }
1389 PEG_METHOD_EXIT();
1390
1391 return ThreadReturnType(0);
1392 }
1393
1394
1395 void ProviderAgentContainer::_processResponses()
1396 {
1397 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1398 "ProviderAgentContainer::_processResponses");
1399
1400 //
1401 // Process responses until the pipe is closed
1402 //
1403 while (1)
1404 {
1405 try
1406 s.kodali 1.1 {
1407 CIMMessage* message;
1408
1409 //
1410 // Read a response from the Provider Agent
1411 //
1412 AnonymousPipe::Status readStatus =
1413 _pipeFromAgent->readMessage(message);
1414
1415 // Ignore interrupts
1416 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
1417 {
1418 continue;
1419 }
1420
1421 // Handle an error the same way as a closed connection
1422 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
1423 (readStatus == AnonymousPipe::STATUS_CLOSED))
1424 {
1425 _uninitialize(false);
1426 return;
1427 s.kodali 1.1 }
1428
1429 // A null message indicates that the provider agent process has
1430 // finished its processing and is ready to exit.
1431 if (message == 0)
1432 {
1433 _uninitialize(true);
1434 return;
1435 }
1436
1437 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
1438 {
1439 // Process an indication message
1440
1441 _indicationCallback(
1442 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
1443 message));
1444 }
1445 else if (message->getType()==PROVAGT_GET_SCMOCLASS_REQUEST_MESSAGE)
1446 {
1447
1448 s.kodali 1.1 _processGetSCMOClassRequest(
1449 reinterpret_cast<ProvAgtGetScmoClassRequestMessage*>(
1450 message));
1451 }
1452 else if (!message->isComplete())
1453 {
1454 // Process an incomplete response chunk
1455
1456 CIMResponseMessage* response;
1457 response = dynamic_cast<CIMResponseMessage*>(message);
1458 PEGASUS_ASSERT(response != 0);
1459
1460 Boolean foundEntry = false;
1461 // Get the OutstandingRequestEntry for this response chunk
1462 SharedPtr<OutstandingRequestEntry> _outstandingRequestEntry;
1463 {
1464 AutoMutex tableLock(_outstandingRequestTableMutex);
1465 foundEntry = _outstandingRequestTable.lookup(
1466 response->messageId, _outstandingRequestEntry);
1467 }
1468
1469 s.kodali 1.1 if(foundEntry)
1470 {
1471 // Put the original message ID into the response
1472 response->messageId =
1473 _outstandingRequestEntry->originalMessageId;
1474
1475 // Call the response chunk callback to process the chunk
1476 // if the client connection is active.
1477 // No need to acquire _agentMutex since this a chunk
1478 // response callback. The request object will not be
1479 // deleted here.
1480 _responseChunkCallback(
1481 _outstandingRequestEntry->requestMessage, response);
1482 }
1483 }
1484 else
1485 {
1486 // Process a completed response
1487 CIMResponseMessage* response;
1488 response = dynamic_cast<CIMResponseMessage*>(message);
1489 PEGASUS_ASSERT(response != 0);
1490 s.kodali 1.1
1491 Boolean foundEntry = false;
1492 // Give the response to the waiting OutstandingRequestEntry
1493 SharedPtr<OutstandingRequestEntry> _outstandingRequestEntry;
1494 {
1495 AutoMutex tableLock(_outstandingRequestTableMutex);
1496 foundEntry = _outstandingRequestTable.lookup(
1497 response->messageId, _outstandingRequestEntry);
1498
1499 if(foundEntry)
1500 {
1501 // Remove the completed request from the table
1502 Boolean removed = _outstandingRequestTable.remove( \
1503 response->messageId);
1504 PEGASUS_ASSERT(removed);
1505 }
1506
1507 }
1508
1509 if(foundEntry)
1510 {
1511 s.kodali 1.1 if(_outstandingRequestEntry->respAggregator == NULL)
1512 {
1513 response->messageId =
1514 _outstandingRequestEntry->originalMessageId;
1515
1516 _sendResponse(_outstandingRequestEntry->requestMessage,
1517 response);
1518 }
1519 else
1520 {
1521 if(_outstandingRequestEntry->respAggregator-> \
1522 isComplete(response->cimException))
1523 {
1524 response->messageId =
1525 _outstandingRequestEntry->originalMessageId;
1526
1527 _sendResponse(
1528 _outstandingRequestEntry->requestMessage,
1529 response);
1530
1531 // delete respAggregator pointer now
1532 s.kodali 1.1 delete _outstandingRequestEntry->respAggregator;
1533 }
1534 else
1535 {
1536 // this is not the last response for this request.
1537 // Its job is done and it can be deleted now.
1538 delete response;
1539 }
1540 }
1541 }
1542 else
1543 {
1544 PEG_TRACE((TRC_DISCARDED_DATA,Tracer::LEVEL4,
1545 "The response for message id %s arrived after the " \
1546 "client disconnected.",
1547 (const char *)response->messageId.getCString()));
1548 }
1549 }
1550 }
1551 catch (Exception& e)
1552 {
1553 s.kodali 1.1 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL2,
1554 "Ignoring exception: %s",
1555 (const char*)e.getMessage().getCString()));
1556 }
1557 catch (...)
1558 {
1559 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1560 "Ignoring exception");
1561 }
1562 }
1563 }
1564
1565 void ProviderAgentContainer::_sendResponse(CIMRequestMessage *request,
1566 CIMResponseMessage *response)
1567 {
1568 response->syncAttributes(request);
1569 {
1570 // acquire the _agentMutex to make sure that
1571 // _processMessage thread has finished
1572 // processing the request.
1573 AutoMutex agentLock(_agentMutex);
1574 s.kodali 1.1 }
1575
1576 // Call the asyncResponseCallback to process
1577 // the completed response.
1578 _asyncResponseCallback(
1579 request,
1580 response);
1581 }
1582
1583 ThreadReturnType PEGASUS_THREAD_CDECL
1584 ProviderAgentContainer::_responseProcessor(void* arg)
1585 {
1586 ProviderAgentContainer* pa =
1587 reinterpret_cast<ProviderAgentContainer*>(arg);
1588
1589 pa->_processResponses();
1590
1591 return ThreadReturnType(0);
1592 }
1593
1594 /////////////////////////////////////////////////////////////////////////////
1595 s.kodali 1.1 // OOPProviderManagerRouter
1596 /////////////////////////////////////////////////////////////////////////////
1597
1598 OOPProviderManagerRouter::OOPProviderManagerRouter(
1599 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
1600 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
1601 PEGASUS_PROVIDERMODULEGROUPFAIL_CALLBACK_T providerModuleGroupFailCallback,
1602 PEGASUS_ASYNC_RESPONSE_CALLBACK_T asyncResponseCallback)
1603 {
1604 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1605 "OOPProviderManagerRouter::OOPProviderManagerRouter");
1606
1607 _indicationCallback = indicationCallback;
1608 _responseChunkCallback = responseChunkCallback;
1609 _providerModuleGroupFailCallback = providerModuleGroupFailCallback;
1610 _asyncResponseCallback = asyncResponseCallback;
1611 _subscriptionInitComplete = false;
1612 _threadPool =
1613 new ThreadPool(0, "OOPProviderManagerRouter", 0, 0, deallocateWait);;
1614 PEG_METHOD_EXIT();
1615 }
1616 s.kodali 1.1
1617 OOPProviderManagerRouter::~OOPProviderManagerRouter()
1618 {
1619 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1620 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
1621
1622 try
1623 {
1624 // Clean up the ProviderAgentContainers
1625 AutoMutex lock(_providerAgentTableMutex);
1626 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1627 for (; i != 0; i++)
1628 {
1629 delete i.value();
1630 }
1631
1632 delete _threadPool;
1633 }
1634 catch (...) {}
1635
1636 PEG_METHOD_EXIT();
1637 s.kodali 1.1 }
1638
1639 Message* OOPProviderManagerRouter::processMessage(Message* message)
1640 {
1641 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1642 "OOPProviderManagerRouter::processMessage");
1643
1644 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
1645 PEGASUS_ASSERT(request != 0);
1646
1647 AutoPtr<CIMResponseMessage> response;
1648
1649 //
1650 // Get the provider information from the request
1651 //
1652 CIMInstance providerModule;
1653
1654 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
1655 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
1656 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
1657 {
1658 s.kodali 1.1 // Provider information is in the OperationContext
1659 ProviderIdContainer pidc = (ProviderIdContainer)
1660 request->operationContext.get(ProviderIdContainer::NAME);
1661 providerModule = pidc.getModule();
1662 }
1663 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1664 {
1665 CIMEnableModuleRequestMessage* emReq =
1666 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
1667 providerModule = emReq->providerModule;
1668 }
1669 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1670 {
1671 CIMDisableModuleRequestMessage* dmReq =
1672 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
1673 providerModule = dmReq->providerModule;
1674 }
1675 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
1676 (request->getType() ==
1677 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
1678 (request->getType() ==
1679 s.kodali 1.1 CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE) ||
1680 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1681 {
1682 // This operation is not provider-specific
1683 }
1684 else
1685 {
1686 // Unrecognized message type. This should never happen.
1687 PEGASUS_ASSERT(0);
1688 response.reset(request->buildResponse());
1689 response->cimException = PEGASUS_CIM_EXCEPTION(
1690 CIM_ERR_FAILED, "Unrecognized message type.");
1691 PEG_METHOD_EXIT();
1692 return response.release();
1693 }
1694
1695 //
1696 // Process the request message
1697 //
1698 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1699 {
1700 s.kodali 1.1 // Forward the CIMStopAllProvidersRequest to all providers
1701 response.reset(_forwardRequestToAllAgents(request));
1702
1703 // Note: Do not uninitialize the ProviderAgentContainers here.
1704 // Just let the selecting thread notice when the agent connections
1705 // are closed.
1706 }
1707 else if (request->getType () ==
1708 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1709 {
1710 _subscriptionInitComplete = true;
1711
1712 //
1713 // Forward the CIMSubscriptionInitCompleteRequestMessage to
1714 // all providers
1715 //
1716 response.reset (_forwardRequestToAllAgents (request));
1717 }
1718 else if (request->getType () ==
1719 CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE)
1720 {
1721 s.kodali 1.1 _subscriptionInitComplete = false;
1722
1723 //
1724 // Forward the CIMIndicationServiceDisabledRequestMessage to
1725 // all providers
1726 //
1727 response.reset (_forwardRequestToAllAgents (request));
1728 }
1729 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1730 {
1731 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1732 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1733 PEGASUS_ASSERT(notifyRequest != 0);
1734
1735 if (notifyRequest->currentValueModified)
1736 {
1737 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1738 response.reset(_forwardRequestToAllAgents(request));
1739 }
1740 else
1741 {
1742 s.kodali 1.1 // No need to notify provider agents about changes to planned value
1743 response.reset(request->buildResponse());
1744 }
1745 }
1746 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1747 {
1748 // Fan out the request to all Provider Agent processes for this module
1749
1750 // Retrieve the provider group name.
1751 String groupNameWithType;
1752 _getGroupNameWithType(providerModule, groupNameWithType);
1753
1754
1755 // Look up the Provider Agents for this module
1756 Array<ProviderAgentContainer*> paArray =
1757 _lookupProviderAgents(groupNameWithType);
1758
1759 Array<ProviderAgentContainer*> paInit;
1760
1761 for (Uint32 i=0; i<paArray.size(); i++)
1762 {
1763 s.kodali 1.1 //
1764 // Do not start up an agent process just to disable the module
1765 //
1766 if (paArray[i]->isInitialized())
1767 {
1768 paInit.append(paArray[i]);
1769 }
1770 }
1771
1772 if(paInit.size() > 0)
1773 {
1774 RespAggCounter *respAggregator =
1775 new RespAggCounter(paInit.size());
1776
1777 for (Uint32 i=0; i<paInit.size(); i++)
1778 {
1779 // Forward the request to the provider agent
1780 //
1781 response.reset(
1782 paInit[i]->processMessage(request,respAggregator));
1783
1784 s.kodali 1.1 // Note: Do not uninitialize the ProviderAgentContainer here
1785 // when a disable module operation is successful. Just let the
1786 // selecting thread notice when the agent connection is closed.
1787 }
1788 }
1789
1790 // Use a default response if no Provider Agents were called
1791 if (!response.get())
1792 {
1793 response.reset(request->buildResponse());
1794
1795 CIMDisableModuleResponseMessage* dmResponse =
1796 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1797 PEGASUS_ASSERT(dmResponse != 0);
1798
1799 Array<Uint16> operationalStatus;
1800 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1801 dmResponse->operationalStatus = operationalStatus;
1802 }
1803 }
1804 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1805 s.kodali 1.1 {
1806 // Fan out the request to all Provider Agent processes for this module
1807
1808 // Retrieve the provider module group name.
1809 String groupNameWithType;
1810 _getGroupNameWithType(providerModule, groupNameWithType);
1811
1812 // Look up the Provider Agents for this module
1813 Array<ProviderAgentContainer*> paArray =
1814 _lookupProviderAgents(groupNameWithType);
1815
1816 // Create an array of initialized provider agents.
1817 Array<ProviderAgentContainer*> paInit;
1818
1819 // create an array of initialized provider agents.
1820 for (Uint32 i=0; i<paArray.size(); i++)
1821 {
1822 if (paArray[i]->isInitialized())
1823 {
1824 paInit.append(paArray[i]);
1825 }
1826 s.kodali 1.1 }
1827
1828 if(paInit.size() > 0 )
1829 {
1830 RespAggCounter *respAggregator =
1831 new RespAggCounter(paInit.size());
1832
1833 for (Uint32 i=0; i<paInit.size(); i++)
1834 {
1835 //
1836 // Forward the request to the provider agent
1837 //
1838 response.reset(
1839 paInit[i]->processMessage(request,respAggregator));
1840 }
1841 }
1842
1843 // Use a default response if no Provider Agents were called
1844 if (!response.get())
1845 {
1846 response.reset(request->buildResponse());
1847 s.kodali 1.1
1848 CIMEnableModuleResponseMessage* emResponse =
1849 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1850 PEGASUS_ASSERT(emResponse != 0);
1851
1852 Array<Uint16> operationalStatus;
1853 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1854 emResponse->operationalStatus = operationalStatus;
1855 }
1856 }
1857 else
1858 {
1859 //
1860 // Look up the Provider Agent for this module instance and requesting
1861 // user
1862 //
1863 ProviderAgentContainer* pa = _lookupProviderAgent(providerModule,
1864 request);
1865 PEGASUS_ASSERT(pa != 0);
1866
1867 //
1868 s.kodali 1.1 // Forward the request to the provider agent
1869 //
1870 response.reset(pa->processMessage(request));
1871 }
1872
1873 PEG_METHOD_EXIT();
1874 return response.release();
1875 }
1876
1877 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
1878 const CIMInstance& providerModule,
1879 CIMRequestMessage* request)
1880 {
1881 // Retrieve the provider module group name
1882 String groupNameWithType;
1883 _getGroupNameWithType(providerModule, groupNameWithType);
1884
|