1 kumpf 1.1 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 kumpf 1.1 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include <Pegasus/Common/Signal.h>
35 #include <Pegasus/Common/Config.h>
36 #include <Pegasus/Common/Constants.h>
37 #include <Pegasus/Common/AutoPtr.h>
38 #include <Pegasus/Common/ArrayInternal.h>
39 #include <Pegasus/Common/CIMMessage.h>
40 #include <Pegasus/Common/CIMMessageSerializer.h>
41 #include <Pegasus/Common/CIMMessageDeserializer.h>
42 #include <Pegasus/Common/OperationContextInternal.h>
43 kumpf 1.1 #include <Pegasus/Common/System.h>
44 #include <Pegasus/Common/AnonymousPipe.h>
45 #include <Pegasus/Common/Tracer.h>
46 #include <Pegasus/Common/Logger.h>
47 #include <Pegasus/Common/Thread.h>
48 #include <Pegasus/Common/MessageQueueService.h>
49 #include <Pegasus/Config/ConfigManager.h>
|
50 mike 1.6.2.6 #include <Pegasus/Common/Executor.h>
|
51 mike 1.6.2.1
|
52 kumpf 1.1 #if defined (PEGASUS_OS_TYPE_WINDOWS)
53 # include <windows.h> // For CreateProcess()
54 #elif defined (PEGASUS_OS_OS400)
55 # include <unistd.cleinc>
56 #elif defined (PEGASUS_OS_VMS)
57 # include <perror.h>
58 # include <climsgdef.h>
59 # include <stdio.h>
60 # include <stdlib.h>
61 # include <string.h>
62 # include <processes.h>
63 # include <unixio.h>
64 #else
65 # include <unistd.h> // For fork(), exec(), and _exit()
66 # include <errno.h>
67 # include <sys/types.h>
|
68 kumpf 1.5 # include <sys/resource.h>
|
69 kumpf 1.1 # if defined(PEGASUS_HAS_SIGNALS)
70 # include <sys/wait.h>
71 # endif
72 #endif
73
74 #include "OOPProviderManagerRouter.h"
75
76 PEGASUS_USING_STD;
77
78 PEGASUS_NAMESPACE_BEGIN
79
80 /////////////////////////////////////////////////////////////////////////////
81 // OutstandingRequestTable and OutstandingRequestEntry
82 /////////////////////////////////////////////////////////////////////////////
83
84 /**
85 An OutstandingRequestEntry represents a request message sent to a
86 Provider Agent for which no response has been received. The request
87 sender provides the message ID and a location for the response to be
88 returned, and then waits on the semaphore. When a response matching
89 the message ID is received, it is placed into the specified location
90 kumpf 1.1 and the semaphore is signaled.
91 */
92 class OutstandingRequestEntry
93 {
94 public:
95 OutstandingRequestEntry(
96 String originalMessageId_,
97 CIMRequestMessage* requestMessage_,
98 CIMResponseMessage*& responseMessage_,
99 Semaphore* responseReady_)
100 : originalMessageId(originalMessageId_),
101 requestMessage(requestMessage_),
102 responseMessage(responseMessage_),
103 responseReady(responseReady_)
104 {
105 }
106
107 /**
108 A unique value is substituted as the request messageId attribute to
109 allow responses to be definitively correllated with requests.
110 The original messageId value is stored here to avoid a race condition
111 kumpf 1.1 between the processing of a response chunk and the resetting of the
112 original messageId in the request message.
113 */
114 String originalMessageId;
115 CIMRequestMessage* requestMessage;
116 CIMResponseMessage*& responseMessage;
117 Semaphore* responseReady;
118 };
119
120 typedef HashTable<String, OutstandingRequestEntry*, EqualFunc<String>,
121 HashFunc<String> > OutstandingRequestTable;
122
123
124 /////////////////////////////////////////////////////////////////////////////
125 // ProviderAgentContainer
126 /////////////////////////////////////////////////////////////////////////////
127
128 class ProviderAgentContainer
129 {
130 public:
131 ProviderAgentContainer(
|
132 mike 1.6.2.8 const SessionKey& sessionKey,
|
133 kumpf 1.1 const String & moduleName,
134 const String & userName,
135 Uint16 userContext,
136 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
137 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
138 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback,
139 Boolean subscriptionInitComplete);
140
141 ~ProviderAgentContainer();
142
143 Boolean isInitialized();
144
145 String getModuleName() const;
146
147 CIMResponseMessage* processMessage(CIMRequestMessage* request);
148 void unloadIdleProviders();
149
150 private:
151 //
152 // Private methods
153 //
154 kumpf 1.1
155 /** Unimplemented */
156 ProviderAgentContainer();
157 /** Unimplemented */
158 ProviderAgentContainer(const ProviderAgentContainer& pa);
159 /** Unimplemented */
160 ProviderAgentContainer& operator=(const ProviderAgentContainer& pa);
161
162 /**
163 Start a Provider Agent process and establish a pipe connection with it.
164 Note: The caller must lock the _agentMutex.
165 */
166 void _startAgentProcess();
167
168 /**
169 Send initialization data to the Provider Agent.
170 Note: The caller must lock the _agentMutex.
171 */
172 void _sendInitializationData();
173
174 /**
175 kumpf 1.1 Initialize the ProviderAgentContainer if it is not already
176 initialized. Initialization includes starting the Provider Agent
177 process, establishing a pipe connection with it, and starting a
178 thread to read response messages from the Provider Agent.
179
180 Note: The caller must lock the _agentMutex.
181 */
182 void _initialize();
183
184 /**
185 Uninitialize the ProviderAgentContainer if it is initialized.
186 The connection is closed and outstanding requests are completed
187 with an error result.
188
189 Note: The caller must lock the _agentMutex.
190
191 @param cleanShutdown Indicates whether the provider agent process
192 exited cleanly. A value of true indicates that responses have been
193 sent for all requests that have been processed. A value of false
194 indicates that one or more requests may have been partially processed.
195 */
196 kumpf 1.1 void _uninitialize(Boolean cleanShutdown);
197
198 /**
199 Performs the processMessage work, but does not retry on a transient
200 error.
201 */
202 CIMResponseMessage* _processMessage(CIMRequestMessage* request);
203
204 /**
205 Read and process response messages from the Provider Agent until
206 the connection is closed.
207 */
208 void _processResponses();
209 static ThreadReturnType PEGASUS_THREAD_CDECL
210 _responseProcessor(void* arg);
211
212 //
213 // Private data
214 //
215
216 /**
217 kumpf 1.1 The _agentMutex must be locked whenever writing to the Provider
218 Agent connection, accessing the _isInitialized flag, or changing
219 the Provider Agent state.
220 */
221 Mutex _agentMutex;
222
223 /**
|
224 mike 1.6.2.8 Session key of the user on whose behalf this provider agent was loaded.
225 */
226 SessionKey _sessionKey;
227
228 /**
|
229 kumpf 1.1 Name of the provider module served by this Provider Agent.
230 */
231 String _moduleName;
232
233 /**
234 The user context in which this Provider Agent operates.
235 */
236 String _userName;
237
238 /**
239 User Context setting of the provider module served by this Provider
240 Agent.
241 */
242 Uint16 _userContext;
243
244 /**
245 Callback function to which all generated indications are sent for
246 processing.
247 */
248 PEGASUS_INDICATION_CALLBACK_T _indicationCallback;
249
250 kumpf 1.1 /**
251 Callback function to which response chunks are sent for processing.
252 */
253 PEGASUS_RESPONSE_CHUNK_CALLBACK_T _responseChunkCallback;
254
255 /**
256 Callback function to be called upon detection of failure of a
257 provider module.
258 */
259 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T _providerModuleFailCallback;
260
261 /**
262 Indicates whether the Provider Agent is active.
263 */
264 Boolean _isInitialized;
265
266 /**
267 Pipe connection used to read responses from the Provider Agent.
268 */
269 AutoPtr<AnonymousPipe> _pipeFromAgent;
270 /**
271 kumpf 1.1 Pipe connection used to write requests to the Provider Agent.
272 */
273 AutoPtr<AnonymousPipe> _pipeToAgent;
274
275 #if defined(PEGASUS_HAS_SIGNALS)
276 /**
277 Process ID of the active Provider Agent.
278 */
279 pid_t _pid;
280 #endif
281
282 /**
283 The _outstandingRequestTable holds an entry for each request that has
284 been sent to this Provider Agent for which no response has been
285 received. Entries are added (by the writing thread) when a request
286 is sent, and are removed (by the reading thread) when the response is
287 received (or when it is determined that no response is forthcoming).
288 */
289 OutstandingRequestTable _outstandingRequestTable;
290 /**
291 The _outstandingRequestTableMutex must be locked whenever reading or
292 kumpf 1.1 updating the _outstandingRequestTable.
293 */
294 Mutex _outstandingRequestTableMutex;
295
296 /**
297 Holds the last provider module instance sent to the Provider Agent in
298 a ProviderIdContainer. Since the provider module instance rarely
299 changes, an optimization is used to send it only when it differs from
300 the last provider module instance sent.
301 */
302 CIMInstance _providerModuleCache;
303
304 /**
305 The number of Provider Agent processes that are currently initialized
306 (active).
307 */
308 static Uint32 _numProviderProcesses;
309
310 /**
311 The _numProviderProcessesMutex must be locked whenever reading or
312 updating the _numProviderProcesses count.
313 kumpf 1.1 */
314 static Mutex _numProviderProcessesMutex;
315
316 /**
317 The maximum number of Provider Agent processes that may be initialized
318 (active) at one time.
319 */
320 static Uint32 _maxProviderProcesses;
321
322 /**
323 A value indicating that a request message has not been processed.
324 A CIMResponseMessage pointer with this value indicates that the
325 corresponding CIMRequestMessage has not been processed. This is
326 used to indicate that a provider agent exited without starting to
327 process the request, and that the request should be retried.
328 */
329 static CIMResponseMessage* _REQUEST_NOT_PROCESSED;
330
331 /**
332 Indicates whether the Indication Service has completed initialization.
333
334 kumpf 1.1 For more information, please see the description of the
335 ProviderManagerRouter::_subscriptionInitComplete member variable.
336 */
337 Boolean _subscriptionInitComplete;
338 };
339
340 Uint32 ProviderAgentContainer::_numProviderProcesses = 0;
341 Mutex ProviderAgentContainer::_numProviderProcessesMutex;
342 Uint32 ProviderAgentContainer::_maxProviderProcesses = PEG_NOT_FOUND;
343
344 // Set this to a value that no valid CIMResponseMessage* will have.
345 CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED =
346 reinterpret_cast<CIMResponseMessage*>(&_REQUEST_NOT_PROCESSED);
347
348 ProviderAgentContainer::ProviderAgentContainer(
|
349 mike 1.6.2.8 const SessionKey& sessionKey,
|
350 kumpf 1.1 const String & moduleName,
351 const String & userName,
352 Uint16 userContext,
353 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
354 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
355 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback,
356 Boolean subscriptionInitComplete)
|
357 mike 1.6.2.8 :
358 _sessionKey(sessionKey),
359 _moduleName(moduleName),
|
360 kumpf 1.1 _userName(userName),
361 _userContext(userContext),
362 _indicationCallback(indicationCallback),
363 _responseChunkCallback(responseChunkCallback),
364 _providerModuleFailCallback(providerModuleFailCallback),
365 _isInitialized(false),
366 _subscriptionInitComplete(subscriptionInitComplete)
367 {
|
368 mike 1.6.2.9
|
369 kumpf 1.1 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
370 "ProviderAgentContainer::ProviderAgentContainer");
371 PEG_METHOD_EXIT();
372 }
373
374 ProviderAgentContainer::~ProviderAgentContainer()
375 {
376 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
377 "ProviderAgentContainer::~ProviderAgentContainer");
378
379 // Ensure the destructor does not throw an exception
380 try
381 {
382 if (isInitialized())
383 {
384 // Stop the responseProcessor thread by closing its connection
385 _pipeFromAgent->closeReadHandle();
386
387 // Wait for the responseProcessor thread to exit
388 while (isInitialized())
389 {
390 kumpf 1.1 Threads::yield();
391 }
392 }
393 }
394 catch (...)
395 {
396 }
397
398 PEG_METHOD_EXIT();
399 }
400
|
401 mike 1.6.2.4 void ProviderAgentContainer::_startAgentProcess()
|
402 mike 1.6.2.1 {
|
403 mike 1.6.2.4 PEG_METHOD_ENTER(
404 TRC_PROVIDERMANAGER, "ProviderAgentContainer::_startAgentProcess");
|
405 mike 1.6.2.1
|
406 mike 1.6.2.4 PEGASUS_UID_T newUid = (PEGASUS_UID_T)-1;
407 PEGASUS_GID_T newGid = (PEGASUS_UID_T)-1;
|
408 mike 1.6.2.1
409 # ifndef PEGASUS_DISABLE_PROV_USERCTXT
410
|
411 mike 1.6.2.4 newUid = getuid();
412 newGid = getgid();
413
|
414 mike 1.6.2.1 // Get and save the effective user name and the uid/gid for the user
415 // context of the agent process
416
417 String effectiveUserName = System::getEffectiveUserName();
418
419 if (_userName != effectiveUserName)
420 {
421 if (!System::lookupUserId(_userName.getCString(), newUid, newGid))
422 {
423 throw PEGASUS_CIM_EXCEPTION_L(
424 CIM_ERR_FAILED,
425 MessageLoaderParms(
426 "ProviderManager.OOPProviderManagerRouter."
427 "USER_CONTEXT_CHANGE_FAILED",
428 "Unable to change user context to \"$0\".", _userName));
429 }
430 }
431
432 # endif /* PEGASUS_DISABLE_PROV_USERCTXT */
433
|
434 mike 1.6.2.10 // Start the provider agent.
435
|
436 mike 1.6.2.1 int pid;
|
437 mike 1.6.2.10 SessionKey providerAgentSessionKey;
|
438 mike 1.6.2.4 AnonymousPipe* readPipe;
439 AnonymousPipe* writePipe;
|
440 mike 1.6.2.1
|
441 mike 1.6.2.6 int status = Executor::startProviderAgent(
|
442 mike 1.6.2.8 _sessionKey,
|
443 mike 1.6.2.1 (const char*)_moduleName.getCString(),
444 newUid,
445 newGid,
446 pid,
|
447 mike 1.6.2.10 providerAgentSessionKey,
|
448 mike 1.6.2.4 readPipe,
449 writePipe);
|
450 mike 1.6.2.1
451 if (status != 0)
452 {
453 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
|
454 mike 1.6.2.6 "Executor::createProviderAgent() failed");
|
455 mike 1.6.2.1 PEG_METHOD_EXIT();
456 throw Exception(MessageLoaderParms(
457 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
458 "Failed to start cimprovagt \"$0\".",
459 _moduleName));
460 }
461
|
462 mike 1.6.2.10 // Set the session key to be used for requests emanating from this read
463 // pipe (i.e., the provider agent). Examples include requests made by the
464 // provider with the CIMOMHandle or indications delivered by the provider.
|
465 mike 1.6.2.9
466 readPipe->setSessionKey(providerAgentSessionKey);
467
|
468 mike 1.6.2.1 # if defined(PEGASUS_HAS_SIGNALS)
469 _pid = pid;
470 # endif
471
|
472 mike 1.6.2.4 _pipeFromAgent.reset(readPipe);
473 _pipeToAgent.reset(writePipe);
|
474 kumpf 1.1
|
475 mike 1.6.2.9
|
476 kumpf 1.1 PEG_METHOD_EXIT();
477 }
478
479 // Note: Caller must lock _agentMutex
480 void ProviderAgentContainer::_sendInitializationData()
481 {
482 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
483 "ProviderAgentContainer::_sendInitializationData");
484
485 //
486 // Gather config properties to pass to the Provider Agent
487 //
488 ConfigManager* configManager = ConfigManager::getInstance();
489 Array<Pair<String, String> > configProperties;
490
491 Array<String> configPropertyNames;
492 configManager->getAllPropertyNames(configPropertyNames, true);
493 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
494 {
495 String configPropertyValue =
496 configManager->getCurrentValue(configPropertyNames[i]);
497 kumpf 1.1 String configPropertyDefaultValue =
498 configManager->getDefaultValue(configPropertyNames[i]);
499 if (configPropertyValue != configPropertyDefaultValue)
500 {
501 configProperties.append(Pair<String, String>(
502 configPropertyNames[i], configPropertyValue));
503 }
504 }
505
506 //
507 // Create a Provider Agent initialization message
508 //
509 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
510 new CIMInitializeProviderAgentRequestMessage(
511 String("0"), // messageId
512 configManager->getPegasusHome(),
513 configProperties,
514 System::bindVerbose,
515 _subscriptionInitComplete,
516 QueueIdStack()));
517
518 kumpf 1.1 //
519 // Write the initialization message to the pipe
520 //
521 AnonymousPipe::Status writeStatus =
522 _pipeToAgent->writeMessage(request.get());
523
524 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
525 {
526 PEG_METHOD_EXIT();
527 throw Exception(MessageLoaderParms(
528 "ProviderManager.OOPProviderManagerRouter."
529 "CIMPROVAGT_COMMUNICATION_FAILED",
530 "Failed to communicate with cimprovagt \"$0\".",
531 _moduleName));
532 }
533
534 // Wait for a null response from the Provider Agent indicating it has
535 // initialized successfully.
536
537 CIMMessage* message;
538 AnonymousPipe::Status readStatus;
539 kumpf 1.1 do
540 {
541 readStatus = _pipeFromAgent->readMessage(message);
|
542 mike 1.6.2.9
|
543 kumpf 1.1 } while (readStatus == AnonymousPipe::STATUS_INTERRUPT);
544
545 if (readStatus != AnonymousPipe::STATUS_SUCCESS)
546 {
547 PEG_METHOD_EXIT();
548 throw Exception(MessageLoaderParms(
549 "ProviderManager.OOPProviderManagerRouter."
550 "CIMPROVAGT_COMMUNICATION_FAILED",
551 "Failed to communicate with cimprovagt \"$0\".",
552 _moduleName));
553 }
554
555 PEGASUS_ASSERT(message == 0);
556
|
557 mike 1.6.2.9 // Request messages must bear the session key of the originating pipe.
558 {
559 CIMRequestMessage* m = dynamic_cast<CIMRequestMessage*>(message);
560
561 if (m)
562 m->sessionKey = _pipeFromAgent->getSessionKey();
563 }
564
|
565 kumpf 1.1 PEG_METHOD_EXIT();
566 }
567
568 // Note: Caller must lock _agentMutex
569 void ProviderAgentContainer::_initialize()
570 {
571 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
572 "ProviderAgentContainer::_initialize");
573
574 if (_isInitialized)
575 {
576 PEGASUS_ASSERT(0);
577 PEG_METHOD_EXIT();
578 return;
579 }
580
581 if (_maxProviderProcesses == PEG_NOT_FOUND)
582 {
583 String maxProviderProcesses = ConfigManager::getInstance()->
584 getCurrentValue("maxProviderProcesses");
585 CString maxProviderProcessesString = maxProviderProcesses.getCString();
586 kumpf 1.1 char* end = 0;
587 _maxProviderProcesses = strtol(maxProviderProcessesString, &end, 10);
588 }
589
590 {
591 AutoMutex lock(_numProviderProcessesMutex);
592 if ((_maxProviderProcesses != 0) &&
593 (_numProviderProcesses >= _maxProviderProcesses))
594 {
595 throw PEGASUS_CIM_EXCEPTION(
596 CIM_ERR_FAILED,
597 MessageLoaderParms(
598 "ProviderManager.OOPProviderManagerRouter."
599 "MAX_PROVIDER_PROCESSES_REACHED",
600 "The maximum number of cimprovagt processes has been "
601 "reached."));
602 }
603 else
604 {
605 _numProviderProcesses++;
606 }
607 kumpf 1.1 }
608
609 try
610 {
611 _startAgentProcess();
612 _isInitialized = true;
613 _sendInitializationData();
614
615 // Start a thread to read and process responses from the Provider Agent
616 ThreadStatus rtn = PEGASUS_THREAD_OK;
617 while ((rtn = MessageQueueService::get_thread_pool()->
618 allocate_and_awaken(this, _responseProcessor)) !=
619 PEGASUS_THREAD_OK)
620 {
621 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
622 {
623 Threads::yield();
624 }
625 else
626 {
627 Logger::put(
628 kumpf 1.1 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
629 "Not enough threads to process responses from the "
630 "provider agent.");
|
631 kumpf 1.6
|
632 kumpf 1.1 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
633 "Could not allocate thread to process responses from the "
634 "provider agent.");
635
636 throw Exception(MessageLoaderParms(
637 "ProviderManager.OOPProviderManagerRouter."
638 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
639 "Failed to allocate thread for cimprovagt \"$0\".",
640 _moduleName));
641 }
642 }
643 }
644 catch (...)
645 {
|
646 mike 1.6.2.11 SessionKey sessionKey = _pipeFromAgent->getSessionKey();
|
647 mike 1.6.2.10
|
648 kumpf 1.1 // Closing the connection causes the agent process to exit
649 _pipeToAgent.reset();
650 _pipeFromAgent.reset();
651
652 #if defined(PEGASUS_HAS_SIGNALS)
653 if (_isInitialized)
654 {
655 // Harvest the status of the agent process to prevent a zombie
|
656 mike 1.6.2.11 pid_t status = Executor::reapProviderAgent(sessionKey, _pid);
|
657 kumpf 1.4
658 if (status == -1)
659 {
660 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
661 "ProviderAgentContainer::_initialize(): "
|
662 mike 1.6.2.6 "Executor::waitPid() failed");
|
663 kumpf 1.4 }
|
664 kumpf 1.1 }
665 #endif
666
667 _isInitialized = false;
668
669 {
670 AutoMutex lock(_numProviderProcessesMutex);
671 _numProviderProcesses--;
672 }
673
674 PEG_METHOD_EXIT();
675 throw;
676 }
677
678 PEG_METHOD_EXIT();
679 }
680
681 Boolean ProviderAgentContainer::isInitialized()
682 {
683 AutoMutex lock(_agentMutex);
684 return _isInitialized;
685 kumpf 1.1 }
686
687 // Note: Caller must lock _agentMutex
688 void ProviderAgentContainer::_uninitialize(Boolean cleanShutdown)
689 {
690 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
691 "ProviderAgentContainer::_uninitialize");
692
693 if (!_isInitialized)
694 {
695 PEGASUS_ASSERT(0);
696 PEG_METHOD_EXIT();
697 return;
698 }
699
700 try
701 {
|
702 mike 1.6.2.11 SessionKey sessionKey = _pipeFromAgent->getSessionKey();
703
|
704 kumpf 1.1 // Close the connection with the Provider Agent
705 _pipeFromAgent.reset();
706 _pipeToAgent.reset();
707
708 _providerModuleCache = CIMInstance();
709
710 {
711 AutoMutex lock(_numProviderProcessesMutex);
712 _numProviderProcesses--;
713 }
714
715 #if defined(PEGASUS_HAS_SIGNALS)
716 // Harvest the status of the agent process to prevent a zombie
|
717 mike 1.6.2.11 pid_t status = Executor::reapProviderAgent(sessionKey, _pid);
|
718 kumpf 1.4
719 if (status == -1)
720 {
721 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
722 "ProviderAgentContainer::_uninitialize(): "
|
723 mike 1.6.2.6 "Executor::waitPid() failed.");
|
724 kumpf 1.4 }
|
725 kumpf 1.1 #endif
726
727 _isInitialized = false;
728
729 //
730 // Complete with null responses all outstanding requests on this
731 // connection
732 //
733 {
734 AutoMutex tableLock(_outstandingRequestTableMutex);
735
736 CIMResponseMessage* response =
737 cleanShutdown ? _REQUEST_NOT_PROCESSED : 0;
738
739 for (OutstandingRequestTable::Iterator i =
740 _outstandingRequestTable.start();
741 i != 0; i++)
742 {
743 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
744 String("Completing messageId \"") + i.key() +
745 "\" with a null response.");
746 kumpf 1.1 i.value()->responseMessage = response;
747 i.value()->responseReady->signal();
748 }
749
750 _outstandingRequestTable.clear();
|
751 kumpf 1.3 }
|
752 kumpf 1.1
|
753 kumpf 1.3 //
754 // If not a clean shutdown, call the provider module failure callback
755 //
756 if (!cleanShutdown)
757 {
|
758 kumpf 1.1 //
|
759 kumpf 1.3 // Call the provider module failure callback to communicate
760 // the failure to the Provider Manager Service. The Provider
761 // Manager Service will inform the Indication Service.
|
762 kumpf 1.1 //
|
763 kumpf 1.3 _providerModuleFailCallback(_moduleName, _userName, _userContext);
|
764 kumpf 1.1 }
765 }
766 catch (...)
767 {
768 // We're uninitializing, so do not propagate the exception
769 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
770 "Ignoring _uninitialize() exception.");
771 }
772
773 PEG_METHOD_EXIT();
774 }
775
776 String ProviderAgentContainer::getModuleName() const
777 {
778 return _moduleName;
779 }
780
781 CIMResponseMessage* ProviderAgentContainer::processMessage(
782 CIMRequestMessage* request)
783 {
784 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
785 kumpf 1.1 "ProviderAgentContainer::processMessage");
786
787 CIMResponseMessage* response;
788
789 do
790 {
791 response = _processMessage(request);
792
793 if (response == _REQUEST_NOT_PROCESSED)
794 {
795 // Check for request message types that should not be retried.
796 if ((request->getType() ==
797 CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
798 (request->getType() ==
799 CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) ||
800 (request->getType() ==
801 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
802 (request->getType() ==
803 CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE))
804 {
805 response = request->buildResponse();
806 kumpf 1.1 break;
807 }
808 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
809 {
810 CIMDisableModuleResponseMessage* dmResponse =
811 dynamic_cast<CIMDisableModuleResponseMessage*>(response);
812 PEGASUS_ASSERT(dmResponse != 0);
813
814 Array<Uint16> operationalStatus;
815 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
816 dmResponse->operationalStatus = operationalStatus;
817 break;
818 }
819 }
820 } while (response == _REQUEST_NOT_PROCESSED);
821
|
822 kumpf 1.2 if (request->getType() == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
823 {
824 _subscriptionInitComplete = true;
825 }
826
|
827 kumpf 1.1 PEG_METHOD_EXIT();
828 return response;
829 }
830
831 CIMResponseMessage* ProviderAgentContainer::_processMessage(
832 CIMRequestMessage* request)
833 {
834 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
835 "ProviderAgentContainer::_processMessage");
836
837 CIMResponseMessage* response;
838 String originalMessageId = request->messageId;
839
840 // These three variables are used for the provider module optimization.
841 // See the _providerModuleCache member description for more information.
842 AutoPtr<ProviderIdContainer> origProviderId;
843 Boolean doProviderModuleOptimization = false;
844 Boolean updateProviderModuleCache = false;
845
846 try
847 {
848 kumpf 1.1 // The messageId attribute is used to correlate response messages
849 // from the Provider Agent with request messages, so it is imperative
850 // that the ID is unique for each request. The incoming ID cannot be
851 // trusted to be unique, so we substitute a unique one. The memory
852 // address of the request is used as the source of a unique piece of
853 // data. (The message ID is only required to be unique while the
854 // request is outstanding.)
855 char messagePtrString[20];
856 sprintf(messagePtrString, "%p", request);
857 String uniqueMessageId = messagePtrString;
858
859 //
860 // Set up the OutstandingRequestEntry for this request
861 //
862 Semaphore waitSemaphore(0);
863 OutstandingRequestEntry outstandingRequestEntry(
864 originalMessageId, request, response, &waitSemaphore);
865
866 //
867 // Lock the Provider Agent Container while initializing the
868 // agent and writing the request to the connection
869 kumpf 1.1 //
870 {
871 AutoMutex lock(_agentMutex);
872
873 //
874 // Initialize the Provider Agent, if necessary
875 //
876 if (!_isInitialized)
877 {
878 _initialize();
879 }
880
881 //
882 // Add an entry to the OutstandingRequestTable for this request
883 //
884 {
885 AutoMutex tableLock(_outstandingRequestTableMutex);
886
887 _outstandingRequestTable.insert(
888 uniqueMessageId, &outstandingRequestEntry);
889 }
890 kumpf 1.1
891 // Get the provider module from the ProviderIdContainer to see if
892 // we can optimize out the transmission of this instance to the
893 // Provider Agent. (See the _providerModuleCache description.)
|
894 kumpf 1.6 if (request->operationContext.contains(ProviderIdContainer::NAME))
|
895 kumpf 1.1 {
896 ProviderIdContainer pidc = request->operationContext.get(
897 ProviderIdContainer::NAME);
898 origProviderId.reset(new ProviderIdContainer(
899 pidc.getModule(), pidc.getProvider(),
900 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
901 if (_providerModuleCache.isUninitialized() ||
902 (!pidc.getModule().identical(_providerModuleCache)))
903 {
904 // We haven't sent this provider module instance to the
905 // Provider Agent yet. Update our cache after we send it.
906 updateProviderModuleCache = true;
907 }
908 else
909 {
910 // Replace the provider module in the ProviderIdContainer
911 // with an uninitialized instance. We'll need to put the
912 // original one back after the message is sent.
913 request->operationContext.set(ProviderIdContainer(
914 CIMInstance(), pidc.getProvider(),
915 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
916 kumpf 1.1 doProviderModuleOptimization = true;
917 }
918 }
919
920 //
921 // Write the message to the pipe
922 //
923 try
924 {
925 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3,
926 String("Sending request to agent with messageId ") +
927 uniqueMessageId);
928
929 request->messageId = uniqueMessageId;
930 AnonymousPipe::Status writeStatus =
931 _pipeToAgent->writeMessage(request);
932 request->messageId = originalMessageId;
933
934 if (doProviderModuleOptimization)
935 {
936 request->operationContext.set(*origProviderId.get());
937 kumpf 1.1 }
938
939 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
940 {
941 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
942 "Failed to write message to pipe. writeStatus = %d.",
943 writeStatus);
944
945 request->messageId = originalMessageId;
946
947 if (doProviderModuleOptimization)
948 {
949 request->operationContext.set(*origProviderId.get());
950 }
951
952 // Remove this OutstandingRequestTable entry
953 {
954 AutoMutex tableLock(_outstandingRequestTableMutex);
955 Boolean removed =
956 _outstandingRequestTable.remove(uniqueMessageId);
957 PEGASUS_ASSERT(removed);
958 kumpf 1.1 }
959
960 // A response value of _REQUEST_NOT_PROCESSED indicates
961 // that the request was not processed by the provider
962 // agent, so it can be retried safely.
963 PEG_METHOD_EXIT();
964 return _REQUEST_NOT_PROCESSED;
965 }
966
967 if (updateProviderModuleCache)
968 {
969 _providerModuleCache = origProviderId->getModule();
970 }
971 }
972 catch (...)
973 {
974 request->messageId = originalMessageId;
975
976 if (doProviderModuleOptimization)
977 {
978 request->operationContext.set(*origProviderId.get());
979 kumpf 1.1 }
980
981 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
982 "Failed to write message to pipe.");
983 // Remove the OutstandingRequestTable entry for this request
984 {
985 AutoMutex tableLock(_outstandingRequestTableMutex);
986 Boolean removed =
987 _outstandingRequestTable.remove(uniqueMessageId);
988 PEGASUS_ASSERT(removed);
989 }
990 PEG_METHOD_EXIT();
991 throw;
992 }
993 }
994
995 //
996 // Wait for the response
997 //
998 try
999 {
1000 kumpf 1.1 // Must not hold _agentMutex while waiting for the response
1001 waitSemaphore.wait();
1002 }
1003 catch (...)
1004 {
1005 // Remove the OutstandingRequestTable entry for this request
1006 {
1007 AutoMutex tableLock(_outstandingRequestTableMutex);
1008 Boolean removed =
1009 _outstandingRequestTable.remove(uniqueMessageId);
1010 PEGASUS_ASSERT(removed);
1011 }
1012 PEG_METHOD_EXIT();
1013 throw;
1014 }
1015
1016 // A response value of _REQUEST_NOT_PROCESSED indicates that the
1017 // provider agent process was terminating when the request was sent.
|
1018 kumpf 1.6 // The request was not processed by the provider agent, so it can be
|
1019 kumpf 1.1 // retried safely.
1020 if (response == _REQUEST_NOT_PROCESSED)
1021 {
1022 PEG_METHOD_EXIT();
1023 return response;
1024 }
1025
1026 // A null response is returned when an agent connection is closed
1027 // while requests remain outstanding.
1028 if (response == 0)
1029 {
1030 response = request->buildResponse();
1031 response->cimException = PEGASUS_CIM_EXCEPTION(
1032 CIM_ERR_FAILED,
1033 MessageLoaderParms(
1034 "ProviderManager.OOPProviderManagerRouter."
1035 "CIMPROVAGT_CONNECTION_LOST",
1036 "Lost connection with cimprovagt \"$0\".",
1037 _moduleName));
1038 }
1039 }
1040 kumpf 1.1 catch (CIMException& e)
1041 {
1042 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1043 String("Caught exception: ") + e.getMessage());
1044 response = request->buildResponse();
1045 response->cimException = e;
1046 }
1047 catch (Exception& e)
1048 {
1049 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1050 String("Caught exception: ") + e.getMessage());
1051 response = request->buildResponse();
1052 response->cimException = PEGASUS_CIM_EXCEPTION(
1053 CIM_ERR_FAILED, e.getMessage());
1054 }
1055 catch (...)
1056 {
1057 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1058 "Caught unknown exception");
1059 response = request->buildResponse();
1060 response->cimException = PEGASUS_CIM_EXCEPTION(
1061 kumpf 1.1 CIM_ERR_FAILED, String::EMPTY);
1062 }
1063
1064 response->messageId = originalMessageId;
1065
1066 PEG_METHOD_EXIT();
1067 return response;
1068 }
1069
1070 void ProviderAgentContainer::unloadIdleProviders()
1071 {
1072 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1073 "ProviderAgentContainer::unloadIdleProviders");
1074
1075 AutoMutex lock(_agentMutex);
1076 if (_isInitialized)
1077 {
1078 // Send a "wake up" message to the Provider Agent.
1079 // Don't bother checking whether the operation is successful.
1080 Uint32 messageLength = 0;
1081 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
1082 kumpf 1.1 }
1083
1084 PEG_METHOD_EXIT();
1085 }
1086
1087 void ProviderAgentContainer::_processResponses()
1088 {
1089 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1090 "ProviderAgentContainer::_processResponses");
1091
1092 //
1093 // Process responses until the pipe is closed
1094 //
1095 while (1)
1096 {
1097 try
1098 {
1099 CIMMessage* message;
1100
1101 //
1102 // Read a response from the Provider Agent
1103 kumpf 1.1 //
1104 AnonymousPipe::Status readStatus =
1105 _pipeFromAgent->readMessage(message);
1106
1107 // Ignore interrupts
1108 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
1109 {
1110 continue;
1111 }
1112
1113 // Handle an error the same way as a closed connection
1114 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
1115 (readStatus == AnonymousPipe::STATUS_CLOSED))
1116 {
1117 AutoMutex lock(_agentMutex);
1118 _uninitialize(false);
1119 return;
1120 }
1121
1122 // A null message indicates that the provider agent process has
1123 // finished its processing and is ready to exit.
1124 kumpf 1.1 if (message == 0)
1125 {
1126 AutoMutex lock(_agentMutex);
1127 _uninitialize(true);
1128 return;
1129 }
1130
|
1131 mike 1.6.2.9 // Request messages must bear the session key of the
1132 // originating pipe.
1133 {
1134 CIMRequestMessage* m =
1135 dynamic_cast<CIMRequestMessage*>(message);
1136
1137 if (m)
1138 m->sessionKey = _pipeFromAgent->getSessionKey();
1139 }
1140
1141 // It is a CIM_PROCESS_INDICATION_REQUEST_MESSAGE?
1142
|
1143 kumpf 1.1 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
1144 {
1145 // Forward indications to the indication callback
1146 _indicationCallback(
1147 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
1148 message));
1149 }
1150 else if (!message->isComplete())
1151 {
1152 CIMResponseMessage* response;
1153 response = dynamic_cast<CIMResponseMessage*>(message);
1154 PEGASUS_ASSERT(response != 0);
1155
1156 // Get the OutstandingRequestEntry for this response chunk
1157 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1158 {
1159 AutoMutex tableLock(_outstandingRequestTableMutex);
1160 Boolean foundEntry = _outstandingRequestTable.lookup(
1161 response->messageId, _outstandingRequestEntry);
1162 PEGASUS_ASSERT(foundEntry);
1163 }
1164 kumpf 1.1
1165 // Put the original message ID into the response
1166 response->messageId =
1167 _outstandingRequestEntry->originalMessageId;
1168
1169 // Call the response chunk callback to process the chunk
1170 _responseChunkCallback(
1171 _outstandingRequestEntry->requestMessage, response);
1172 }
1173 else
1174 {
1175 CIMResponseMessage* response;
1176 response = dynamic_cast<CIMResponseMessage*>(message);
1177 PEGASUS_ASSERT(response != 0);
1178
1179 // Give the response to the waiting OutstandingRequestEntry
1180 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1181 {
1182 AutoMutex tableLock(_outstandingRequestTableMutex);
1183 Boolean foundEntry = _outstandingRequestTable.lookup(
1184 response->messageId, _outstandingRequestEntry);
1185 kumpf 1.1 PEGASUS_ASSERT(foundEntry);
1186
1187 // Remove the completed request from the table
1188 Boolean removed =
1189 _outstandingRequestTable.remove(response->messageId);
1190 PEGASUS_ASSERT(removed);
1191 }
1192
1193 _outstandingRequestEntry->responseMessage = response;
1194 _outstandingRequestEntry->responseReady->signal();
1195 }
1196 }
1197 catch (Exception& e)
1198 {
1199 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1200 String("Ignoring exception: ") + e.getMessage());
1201 }
1202 catch (...)
1203 {
1204 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1205 "Ignoring exception");
1206 kumpf 1.1 }
1207 }
1208
1209 }
1210
1211 ThreadReturnType PEGASUS_THREAD_CDECL
1212 ProviderAgentContainer::_responseProcessor(void* arg)
1213 {
1214 ProviderAgentContainer* pa =
1215 reinterpret_cast<ProviderAgentContainer*>(arg);
1216
1217 pa->_processResponses();
1218
|
1219 kumpf 1.6 return ThreadReturnType(0);
|
1220 kumpf 1.1 }
1221
1222 /////////////////////////////////////////////////////////////////////////////
1223 // OOPProviderManagerRouter
1224 /////////////////////////////////////////////////////////////////////////////
1225
1226 OOPProviderManagerRouter::OOPProviderManagerRouter(
1227 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
1228 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
1229 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback)
1230 {
1231 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1232 "OOPProviderManagerRouter::OOPProviderManagerRouter");
1233
1234 _indicationCallback = indicationCallback;
1235 _responseChunkCallback = responseChunkCallback;
1236 _providerModuleFailCallback = providerModuleFailCallback;
1237 _subscriptionInitComplete = false;
1238
1239 PEG_METHOD_EXIT();
1240 }
1241 kumpf 1.1
1242 OOPProviderManagerRouter::~OOPProviderManagerRouter()
1243 {
1244 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1245 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
1246
1247 try
1248 {
1249 // Clean up the ProviderAgentContainers
1250 AutoMutex lock(_providerAgentTableMutex);
1251 ProviderAgentTable::Iterator i = _providerAgentTable.start();
|
1252 kumpf 1.6 for (; i != 0; i++)
|
1253 kumpf 1.1 {
1254 delete i.value();
1255 }
1256 }
1257 catch (...) {}
1258
1259 PEG_METHOD_EXIT();
1260 }
1261
1262 Message* OOPProviderManagerRouter::processMessage(Message* message)
1263 {
1264 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1265 "OOPProviderManagerRouter::processMessage");
1266
1267 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
1268 PEGASUS_ASSERT(request != 0);
1269
1270 AutoPtr<CIMResponseMessage> response;
1271
1272 //
1273 // Get the provider information from the request
1274 kumpf 1.1 //
1275 CIMInstance providerModule;
1276
1277 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
1278 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
1279 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
1280 {
1281 // Provider information is in the OperationContext
1282 ProviderIdContainer pidc = (ProviderIdContainer)
1283 request->operationContext.get(ProviderIdContainer::NAME);
1284 providerModule = pidc.getModule();
1285 }
1286 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1287 {
1288 CIMEnableModuleRequestMessage* emReq =
1289 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
1290 providerModule = emReq->providerModule;
1291 }
1292 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1293 {
1294 CIMDisableModuleRequestMessage* dmReq =
1295 kumpf 1.1 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
1296 providerModule = dmReq->providerModule;
1297 }
1298 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
1299 (request->getType() ==
1300 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
1301 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1302 {
1303 // This operation is not provider-specific
1304 }
1305 else
1306 {
1307 // Unrecognized message type. This should never happen.
1308 PEGASUS_ASSERT(0);
1309 response.reset(request->buildResponse());
1310 response->cimException = PEGASUS_CIM_EXCEPTION(
1311 CIM_ERR_FAILED, "Unrecognized message type.");
1312 PEG_METHOD_EXIT();
1313 return response.release();
1314 }
1315
1316 kumpf 1.1 //
1317 // Process the request message
1318 //
1319 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1320 {
1321 // Forward the CIMStopAllProvidersRequest to all providers
1322 response.reset(_forwardRequestToAllAgents(request));
1323
1324 // Note: Do not uninitialize the ProviderAgentContainers here.
1325 // Just let the selecting thread notice when the agent connections
1326 // are closed.
1327 }
|
1328 kumpf 1.6 else if (request->getType () ==
|
1329 kumpf 1.1 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1330 {
1331 _subscriptionInitComplete = true;
1332
1333 //
|
1334 kumpf 1.6 // Forward the CIMSubscriptionInitCompleteRequestMessage to
|
1335 kumpf 1.1 // all providers
1336 //
1337 response.reset (_forwardRequestToAllAgents (request));
1338 }
1339 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1340 {
1341 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1342 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1343 PEGASUS_ASSERT(notifyRequest != 0);
1344
1345 if (notifyRequest->currentValueModified)
1346 {
1347 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1348 response.reset(_forwardRequestToAllAgents(request));
1349 }
1350 else
1351 {
1352 // No need to notify provider agents about changes to planned value
1353 response.reset(request->buildResponse());
1354 }
1355 }
1356 kumpf 1.1 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1357 {
1358 // Fan out the request to all Provider Agent processes for this module
1359
1360 // Retrieve the provider module name
1361 String moduleName;
1362 CIMValue nameValue = providerModule.getProperty(
1363 providerModule.findProperty("Name")).getValue();
1364 nameValue.get(moduleName);
1365
1366 // Look up the Provider Agents for this module
1367 Array<ProviderAgentContainer*> paArray =
1368 _lookupProviderAgents(moduleName);
1369
1370 for (Uint32 i=0; i<paArray.size(); i++)
1371 {
1372 //
1373 // Do not start up an agent process just to disable the module
1374 //
1375 if (paArray[i]->isInitialized())
1376 {
1377 kumpf 1.1 //
1378 // Forward the request to the provider agent
1379 //
1380 response.reset(paArray[i]->processMessage(request));
1381
1382 // Note: Do not uninitialize the ProviderAgentContainer here
1383 // when a disable module operation is successful. Just let the
1384 // selecting thread notice when the agent connection is closed.
1385
1386 // Determine the success of the disable module operation
1387 CIMDisableModuleResponseMessage* dmResponse =
1388 dynamic_cast<CIMDisableModuleResponseMessage*>(
1389 response.get());
1390 PEGASUS_ASSERT(dmResponse != 0);
1391
1392 Boolean isStopped = false;
1393 for (Uint32 i=0; i < dmResponse->operationalStatus.size(); i++)
1394 {
1395 if (dmResponse->operationalStatus[i] ==
1396 CIM_MSE_OPSTATUS_VALUE_STOPPED)
1397 {
1398 kumpf 1.1 isStopped = true;
1399 break;
1400 }
1401 }
1402
1403 // If the operation is unsuccessful, stop and return the error
1404 if ((dmResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1405 !isStopped)
1406 {
1407 break;
1408 }
1409 }
1410 }
1411
1412 // Use a default response if no Provider Agents were called
1413 if (!response.get())
1414 {
1415 response.reset(request->buildResponse());
1416
1417 CIMDisableModuleResponseMessage* dmResponse =
1418 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1419 kumpf 1.1 PEGASUS_ASSERT(dmResponse != 0);
1420
1421 Array<Uint16> operationalStatus;
1422 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1423 dmResponse->operationalStatus = operationalStatus;
1424 }
1425 }
1426 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1427 {
1428 // Fan out the request to all Provider Agent processes for this module
1429
1430 // Retrieve the provider module name
1431 String moduleName;
1432 CIMValue nameValue = providerModule.getProperty(
1433 providerModule.findProperty("Name")).getValue();
1434 nameValue.get(moduleName);
1435
1436 // Look up the Provider Agents for this module
1437 Array<ProviderAgentContainer*> paArray =
1438 _lookupProviderAgents(moduleName);
1439
1440 kumpf 1.1 for (Uint32 i=0; i<paArray.size(); i++)
1441 {
1442 //
1443 // Do not start up an agent process just to enable the module
1444 //
1445 if (paArray[i]->isInitialized())
1446 {
1447 //
1448 // Forward the request to the provider agent
1449 //
1450 response.reset(paArray[i]->processMessage(request));
1451
1452 // Determine the success of the enable module operation
1453 CIMEnableModuleResponseMessage* emResponse =
1454 dynamic_cast<CIMEnableModuleResponseMessage*>(
1455 response.get());
1456 PEGASUS_ASSERT(emResponse != 0);
1457
1458 Boolean isOk = false;
1459 for (Uint32 i=0; i < emResponse->operationalStatus.size(); i++)
1460 {
1461 kumpf 1.1 if (emResponse->operationalStatus[i] ==
1462 CIM_MSE_OPSTATUS_VALUE_OK)
1463 {
1464 isOk = true;
1465 break;
1466 }
1467 }
1468
1469 // If the operation is unsuccessful, stop and return the error
1470 if ((emResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1471 !isOk)
1472 {
1473 break;
1474 }
1475 }
1476 }
1477
1478 // Use a default response if no Provider Agents were called
1479 if (!response.get())
1480 {
1481 response.reset(request->buildResponse());
1482 kumpf 1.1
1483 CIMEnableModuleResponseMessage* emResponse =
1484 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1485 PEGASUS_ASSERT(emResponse != 0);
1486
1487 Array<Uint16> operationalStatus;
1488 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1489 emResponse->operationalStatus = operationalStatus;
1490 }
1491 }
1492 else
1493 {
1494 //
1495 // Look up the Provider Agent for this module instance and requesting
1496 // user
1497 //
1498 ProviderAgentContainer* pa = _lookupProviderAgent(providerModule,
1499 request);
1500 PEGASUS_ASSERT(pa != 0);
1501
1502 //
1503 kumpf 1.1 // Forward the request to the provider agent
1504 //
1505 response.reset(pa->processMessage(request));
1506 }
1507
1508 response->syncAttributes(request);
1509
1510 PEG_METHOD_EXIT();
1511 return response.release();
1512 }
1513
1514 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
1515 const CIMInstance& providerModule,
1516 CIMRequestMessage* request)
1517 {
1518 // Retrieve the provider module name
1519 String moduleName;
1520 CIMValue nameValue = providerModule.getProperty(
1521 providerModule.findProperty("Name")).getValue();
1522 nameValue.get(moduleName);
1523
1524 kumpf 1.1 // Retrieve the provider user context configuration
1525 Uint16 userContext = 0;
1526 Uint32 pos = providerModule.findProperty(
1527 PEGASUS_PROPERTYNAME_MODULE_USERCONTEXT);
1528 if (pos != PEG_NOT_FOUND)
1529 {
1530 CIMValue userContextValue =
1531 providerModule.getProperty(pos).getValue();
1532 if (!userContextValue.isNull())
1533 {
1534 userContextValue.get(userContext);
1535 }
1536 }
1537
1538 if (userContext == 0)
1539 {
1540 userContext = PEGASUS_DEFAULT_PROV_USERCTXT;
1541 }
1542
1543 String userName;
1544
1545 kumpf 1.1 if (userContext == PG_PROVMODULE_USERCTXT_REQUESTOR)
1546 {
|
1547 kumpf 1.6 if (request->operationContext.contains(IdentityContainer::NAME))
|
1548 kumpf 1.1 {
1549 // User Name is in the OperationContext
1550 IdentityContainer ic = (IdentityContainer)
1551 request->operationContext.get(IdentityContainer::NAME);
1552 userName = ic.getUserName();
1553 }
1554 //else
1555 //{
1556 // If no IdentityContainer is present, default to the CIM
1557 // Server's user context
1558 //}
1559
1560 // If authentication is disabled, use the CIM Server's user context
1561 if (!userName.size())
1562 {
1563 userName = System::getEffectiveUserName();
1564 }
1565 }
1566 else if (userContext == PG_PROVMODULE_USERCTXT_DESIGNATED)
1567 {
1568 // Retrieve the provider module designated user property value
1569 kumpf 1.1 providerModule.getProperty(providerModule.findProperty(
1570 PEGASUS_PROPERTYNAME_MODULE_DESIGNATEDUSER)).getValue().
1571 get(userName);
1572 }
1573 else if (userContext == PG_PROVMODULE_USERCTXT_CIMSERVER)
1574 {
1575 userName = System::getEffectiveUserName();
1576 }
1577 else // Privileged User
1578 {
1579 PEGASUS_ASSERT(userContext == PG_PROVMODULE_USERCTXT_PRIVILEGED);
1580 userName = System::getPrivilegedUserName();
1581 }
1582
1583 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1584 "Module name = " + moduleName);
1585 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1586 "User context = %hd.", userContext);
1587 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1588 "User name = " + userName);
1589
1590 kumpf 1.1 ProviderAgentContainer* pa = 0;
1591 String key = moduleName + ":" + userName;
1592
1593 AutoMutex lock(_providerAgentTableMutex);
1594 if (!_providerAgentTable.lookup(key, pa))
1595 {
1596 pa = new ProviderAgentContainer(
|
1597 mike 1.6.2.8 request->sessionKey, moduleName, userName, userContext,
|
1598 kumpf 1.1 _indicationCallback, _responseChunkCallback,
1599 _providerModuleFailCallback,
1600 _subscriptionInitComplete);
1601 _providerAgentTable.insert(key, pa);
1602 }
1603 return pa;
1604 }
1605
1606 Array<ProviderAgentContainer*> OOPProviderManagerRouter::_lookupProviderAgents(
1607 const String& moduleName)
1608 {
1609 Array<ProviderAgentContainer*> paArray;
1610
1611 AutoMutex lock(_providerAgentTableMutex);
1612 for (ProviderAgentTable::Iterator i = _providerAgentTable.start(); i; i++)
1613 {
1614 if (i.value()->getModuleName() == moduleName)
1615 {
1616 paArray.append(i.value());
1617 }
1618 }
1619 kumpf 1.1 return paArray;
1620 }
1621
1622 CIMResponseMessage* OOPProviderManagerRouter::_forwardRequestToAllAgents(
1623 CIMRequestMessage* request)
1624 {
1625 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1626 "OOPProviderManagerRouter::_forwardRequestToAllAgents");
1627
1628 // Get a list of the ProviderAgentContainers. We need our own array copy
1629 // because we cannot hold the _providerAgentTableMutex while calling
1630 // _ProviderAgentContainer::processMessage().
1631 Array<ProviderAgentContainer*> paContainerArray;
1632 {
1633 AutoMutex tableLock(_providerAgentTableMutex);
1634 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1635 i != 0; i++)
1636 {
1637 paContainerArray.append(i.value());
1638 }
1639 }
1640 kumpf 1.1
1641 CIMException responseException;
1642
1643 // Forward the request to each of the initialized provider agents
1644 for (Uint32 j = 0; j < paContainerArray.size(); j++)
1645 {
1646 ProviderAgentContainer* pa = paContainerArray[j];
1647 if (pa->isInitialized())
1648 {
1649 // Note: The ProviderAgentContainer could become uninitialized
1650 // before _ProviderAgentContainer::processMessage() processes
1651 // this request. In this case, the Provider Agent process will
1652 // (unfortunately) be started to process this message.
1653 AutoPtr<CIMResponseMessage> response;
1654 response.reset(pa->processMessage(request));
1655 if (response.get() != 0)
1656 {
1657 // If the operation failed, save the exception data
1658 if ((response->cimException.getCode() != CIM_ERR_SUCCESS) &&
1659 (responseException.getCode() == CIM_ERR_SUCCESS))
1660 {
1661 kumpf 1.1 responseException = response->cimException;
1662 }
1663 }
1664 }
1665 }
1666
1667 CIMResponseMessage* response = request->buildResponse();
1668 response->cimException = responseException;
1669
1670 PEG_METHOD_EXIT();
1671 return response;
1672 }
1673
1674 Boolean OOPProviderManagerRouter::hasActiveProviders()
1675 {
1676 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1677 "OOPProviderManagerRouter::hasActiveProviders");
1678
1679 // Iterate through the _providerAgentTable looking for initialized agents
1680 AutoMutex lock(_providerAgentTableMutex);
1681 ProviderAgentTable::Iterator i = _providerAgentTable.start();
|
1682 kumpf 1.6 for (; i != 0; i++)
|
1683 kumpf 1.1 {
1684 if (i.value()->isInitialized())
1685 {
1686 PEG_METHOD_EXIT();
1687 return true;
1688 }
1689 }
1690
1691 // No initialized Provider Agents were found
1692 PEG_METHOD_EXIT();
1693 return false;
1694 }
1695
1696 void OOPProviderManagerRouter::unloadIdleProviders()
1697 {
1698 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1699 "OOPProviderManagerRouter::unloadIdleProviders");
1700
1701 // Iterate through the _providerAgentTable unloading idle providers
1702 AutoMutex lock(_providerAgentTableMutex);
1703 ProviderAgentTable::Iterator i = _providerAgentTable.start();
|
1704 kumpf 1.6 for (; i != 0; i++)
|
1705 kumpf 1.1 {
1706 i.value()->unloadIdleProviders();
1707 }
1708
1709 PEG_METHOD_EXIT();
1710 }
1711
1712 PEGASUS_NAMESPACE_END
|