1 kumpf 1.1 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 kumpf 1.1 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include <Pegasus/Common/Signal.h>
35 #include <Pegasus/Common/Config.h>
36 #include <Pegasus/Common/Constants.h>
37 #include <Pegasus/Common/AutoPtr.h>
38 #include <Pegasus/Common/ArrayInternal.h>
39 #include <Pegasus/Common/CIMMessage.h>
40 #include <Pegasus/Common/CIMMessageSerializer.h>
41 #include <Pegasus/Common/CIMMessageDeserializer.h>
42 #include <Pegasus/Common/OperationContextInternal.h>
43 kumpf 1.1 #include <Pegasus/Common/System.h>
44 #include <Pegasus/Common/AnonymousPipe.h>
45 #include <Pegasus/Common/Tracer.h>
46 #include <Pegasus/Common/Logger.h>
47 #include <Pegasus/Common/Thread.h>
48 #include <Pegasus/Common/MessageQueueService.h>
49 #include <Pegasus/Config/ConfigManager.h>
50
51 #if defined (PEGASUS_OS_TYPE_WINDOWS)
52 # include <windows.h> // For CreateProcess()
53 #elif defined (PEGASUS_OS_OS400)
54 # include <unistd.cleinc>
55 #elif defined (PEGASUS_OS_VMS)
56 # include <perror.h>
57 # include <climsgdef.h>
58 # include <stdio.h>
59 # include <stdlib.h>
60 # include <string.h>
61 # include <processes.h>
62 # include <unixio.h>
63 #else
64 kumpf 1.1 # include <unistd.h> // For fork(), exec(), and _exit()
65 # include <errno.h>
66 # include <sys/types.h>
|
67 kumpf 1.5 # include <sys/resource.h>
|
68 kumpf 1.1 # if defined(PEGASUS_HAS_SIGNALS)
69 # include <sys/wait.h>
70 # endif
71 #endif
72
73 #include "OOPProviderManagerRouter.h"
74
75 PEGASUS_USING_STD;
76
77 PEGASUS_NAMESPACE_BEGIN
78
79 /////////////////////////////////////////////////////////////////////////////
80 // OutstandingRequestTable and OutstandingRequestEntry
81 /////////////////////////////////////////////////////////////////////////////
82
83 /**
84 An OutstandingRequestEntry represents a request message sent to a
85 Provider Agent for which no response has been received. The request
86 sender provides the message ID and a location for the response to be
87 returned, and then waits on the semaphore. When a response matching
88 the message ID is received, it is placed into the specified location
89 kumpf 1.1 and the semaphore is signaled.
90 */
91 class OutstandingRequestEntry
92 {
93 public:
94 OutstandingRequestEntry(
95 String originalMessageId_,
96 CIMRequestMessage* requestMessage_,
97 CIMResponseMessage*& responseMessage_,
98 Semaphore* responseReady_)
99 : originalMessageId(originalMessageId_),
100 requestMessage(requestMessage_),
101 responseMessage(responseMessage_),
102 responseReady(responseReady_)
103 {
104 }
105
106 /**
107 A unique value is substituted as the request messageId attribute to
108 allow responses to be definitively correllated with requests.
109 The original messageId value is stored here to avoid a race condition
110 kumpf 1.1 between the processing of a response chunk and the resetting of the
111 original messageId in the request message.
112 */
113 String originalMessageId;
114 CIMRequestMessage* requestMessage;
115 CIMResponseMessage*& responseMessage;
116 Semaphore* responseReady;
117 };
118
119 typedef HashTable<String, OutstandingRequestEntry*, EqualFunc<String>,
120 HashFunc<String> > OutstandingRequestTable;
121
122
123 /////////////////////////////////////////////////////////////////////////////
124 // ProviderAgentContainer
125 /////////////////////////////////////////////////////////////////////////////
126
127 class ProviderAgentContainer
128 {
129 public:
130 ProviderAgentContainer(
131 kumpf 1.1 const String & moduleName,
132 const String & userName,
133 Uint16 userContext,
134 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
135 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
136 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback,
137 Boolean subscriptionInitComplete);
138
139 ~ProviderAgentContainer();
140
141 Boolean isInitialized();
142
143 String getModuleName() const;
144
145 CIMResponseMessage* processMessage(CIMRequestMessage* request);
146 void unloadIdleProviders();
147
148 private:
149 //
150 // Private methods
151 //
152 kumpf 1.1
153 /** Unimplemented */
154 ProviderAgentContainer();
155 /** Unimplemented */
156 ProviderAgentContainer(const ProviderAgentContainer& pa);
157 /** Unimplemented */
158 ProviderAgentContainer& operator=(const ProviderAgentContainer& pa);
159
160 /**
161 Start a Provider Agent process and establish a pipe connection with it.
162 Note: The caller must lock the _agentMutex.
163 */
164 void _startAgentProcess();
165
166 /**
167 Send initialization data to the Provider Agent.
168 Note: The caller must lock the _agentMutex.
169 */
170 void _sendInitializationData();
171
172 /**
173 kumpf 1.1 Initialize the ProviderAgentContainer if it is not already
174 initialized. Initialization includes starting the Provider Agent
175 process, establishing a pipe connection with it, and starting a
176 thread to read response messages from the Provider Agent.
177
178 Note: The caller must lock the _agentMutex.
179 */
180 void _initialize();
181
182 /**
183 Uninitialize the ProviderAgentContainer if it is initialized.
184 The connection is closed and outstanding requests are completed
185 with an error result.
186
187 @param cleanShutdown Indicates whether the provider agent process
188 exited cleanly. A value of true indicates that responses have been
189 sent for all requests that have been processed. A value of false
190 indicates that one or more requests may have been partially processed.
191 */
192 void _uninitialize(Boolean cleanShutdown);
193
194 kumpf 1.1 /**
195 Performs the processMessage work, but does not retry on a transient
196 error.
197 */
198 CIMResponseMessage* _processMessage(CIMRequestMessage* request);
199
200 /**
201 Read and process response messages from the Provider Agent until
202 the connection is closed.
203 */
204 void _processResponses();
205 static ThreadReturnType PEGASUS_THREAD_CDECL
206 _responseProcessor(void* arg);
207
208 //
209 // Private data
210 //
211
212 /**
213 The _agentMutex must be locked whenever writing to the Provider
214 Agent connection, accessing the _isInitialized flag, or changing
215 kumpf 1.1 the Provider Agent state.
216 */
217 Mutex _agentMutex;
218
219 /**
220 Name of the provider module served by this Provider Agent.
221 */
222 String _moduleName;
223
224 /**
225 The user context in which this Provider Agent operates.
226 */
227 String _userName;
228
229 /**
230 User Context setting of the provider module served by this Provider
231 Agent.
232 */
233 Uint16 _userContext;
234
235 /**
236 kumpf 1.1 Callback function to which all generated indications are sent for
237 processing.
238 */
239 PEGASUS_INDICATION_CALLBACK_T _indicationCallback;
240
241 /**
242 Callback function to which response chunks are sent for processing.
243 */
244 PEGASUS_RESPONSE_CHUNK_CALLBACK_T _responseChunkCallback;
245
246 /**
247 Callback function to be called upon detection of failure of a
248 provider module.
249 */
250 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T _providerModuleFailCallback;
251
252 /**
253 Indicates whether the Provider Agent is active.
254 */
255 Boolean _isInitialized;
256
257 kumpf 1.1 /**
258 Pipe connection used to read responses from the Provider Agent.
259 */
260 AutoPtr<AnonymousPipe> _pipeFromAgent;
261 /**
262 Pipe connection used to write requests to the Provider Agent.
263 */
264 AutoPtr<AnonymousPipe> _pipeToAgent;
265
266 #if defined(PEGASUS_HAS_SIGNALS)
267 /**
268 Process ID of the active Provider Agent.
269 */
270 pid_t _pid;
271 #endif
272
273 /**
274 The _outstandingRequestTable holds an entry for each request that has
275 been sent to this Provider Agent for which no response has been
276 received. Entries are added (by the writing thread) when a request
277 is sent, and are removed (by the reading thread) when the response is
278 kumpf 1.1 received (or when it is determined that no response is forthcoming).
279 */
280 OutstandingRequestTable _outstandingRequestTable;
281 /**
282 The _outstandingRequestTableMutex must be locked whenever reading or
283 updating the _outstandingRequestTable.
284 */
285 Mutex _outstandingRequestTableMutex;
286
287 /**
288 Holds the last provider module instance sent to the Provider Agent in
289 a ProviderIdContainer. Since the provider module instance rarely
290 changes, an optimization is used to send it only when it differs from
291 the last provider module instance sent.
292 */
293 CIMInstance _providerModuleCache;
294
295 /**
296 The number of Provider Agent processes that are currently initialized
297 (active).
298 */
299 kumpf 1.1 static Uint32 _numProviderProcesses;
300
301 /**
302 The _numProviderProcessesMutex must be locked whenever reading or
303 updating the _numProviderProcesses count.
304 */
305 static Mutex _numProviderProcessesMutex;
306
307 /**
308 The maximum number of Provider Agent processes that may be initialized
309 (active) at one time.
310 */
311 static Uint32 _maxProviderProcesses;
312
313 /**
314 A value indicating that a request message has not been processed.
315 A CIMResponseMessage pointer with this value indicates that the
316 corresponding CIMRequestMessage has not been processed. This is
317 used to indicate that a provider agent exited without starting to
318 process the request, and that the request should be retried.
319 */
320 kumpf 1.1 static CIMResponseMessage* _REQUEST_NOT_PROCESSED;
321
322 /**
323 Indicates whether the Indication Service has completed initialization.
324
325 For more information, please see the description of the
326 ProviderManagerRouter::_subscriptionInitComplete member variable.
327 */
328 Boolean _subscriptionInitComplete;
329 };
330
331 Uint32 ProviderAgentContainer::_numProviderProcesses = 0;
332 Mutex ProviderAgentContainer::_numProviderProcessesMutex;
333 Uint32 ProviderAgentContainer::_maxProviderProcesses = PEG_NOT_FOUND;
334
335 // Set this to a value that no valid CIMResponseMessage* will have.
336 CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED =
337 reinterpret_cast<CIMResponseMessage*>(&_REQUEST_NOT_PROCESSED);
338
339 ProviderAgentContainer::ProviderAgentContainer(
340 const String & moduleName,
341 kumpf 1.1 const String & userName,
342 Uint16 userContext,
343 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
344 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
345 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback,
346 Boolean subscriptionInitComplete)
347 : _moduleName(moduleName),
348 _userName(userName),
349 _userContext(userContext),
350 _indicationCallback(indicationCallback),
351 _responseChunkCallback(responseChunkCallback),
352 _providerModuleFailCallback(providerModuleFailCallback),
353 _isInitialized(false),
354 _subscriptionInitComplete(subscriptionInitComplete)
355 {
356 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
357 "ProviderAgentContainer::ProviderAgentContainer");
358 PEG_METHOD_EXIT();
359 }
360
361 ProviderAgentContainer::~ProviderAgentContainer()
362 kumpf 1.1 {
363 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
364 "ProviderAgentContainer::~ProviderAgentContainer");
365
366 // Ensure the destructor does not throw an exception
367 try
368 {
369 if (isInitialized())
370 {
371 // Stop the responseProcessor thread by closing its connection
372 _pipeFromAgent->closeReadHandle();
373
374 // Wait for the responseProcessor thread to exit
375 while (isInitialized())
376 {
377 Threads::yield();
378 }
379 }
380 }
381 catch (...)
382 {
383 kumpf 1.1 }
384
385 PEG_METHOD_EXIT();
386 }
387
388 void ProviderAgentContainer::_startAgentProcess()
389 {
390 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
391 "ProviderAgentContainer::_startAgentProcess");
392
393 //
394 // Serialize the starting of agent processes. If two agent processes are
395 // started at the same time, they may get copies of each other's pipe
396 // descriptors. If this happens, the cimserver will not get a pipe read
397 // error when one of the agent processes exits, because the pipe will
398 // still be writable by the other process. This locking control needs to
399 // cover the period from where the pipes are created to where the agent
400 // ends of the pipes are closed by the cimserver.
401 //
402 static Mutex agentStartupMutex;
403 AutoMutex lock(agentStartupMutex);
404 kumpf 1.1
405 AutoPtr<AnonymousPipe> pipeFromAgent(new AnonymousPipe());
406 AutoPtr<AnonymousPipe> pipeToAgent(new AnonymousPipe());
407
408 //
409 // Start a cimprovagt process for this provider module
410 //
411
412 #if defined (PEGASUS_OS_TYPE_WINDOWS)
413 //
414 // Set up members of the PROCESS_INFORMATION structure
415 //
416 PROCESS_INFORMATION piProcInfo;
417 ZeroMemory (&piProcInfo, sizeof (PROCESS_INFORMATION));
418
419 //
420 // Set up members of the STARTUPINFO structure
421 //
422 STARTUPINFO siStartInfo;
423 ZeroMemory (&siStartInfo, sizeof (STARTUPINFO));
424 siStartInfo.cb = sizeof (STARTUPINFO);
425 kumpf 1.1
426 //
427 // Generate the command line
428 //
429 char cmdLine[2048];
430 char readHandle[32];
431 char writeHandle[32];
432 pipeToAgent->exportReadHandle(readHandle);
433 pipeFromAgent->exportWriteHandle(writeHandle);
434
435 sprintf(cmdLine, "\"%s\" %s %s \"%s\"",
436 (const char*)ConfigManager::getHomedPath(
437 PEGASUS_PROVIDER_AGENT_PROC_NAME).getCString(),
438 readHandle, writeHandle, (const char*)_moduleName.getCString());
439
440 //
441 // Create the child process
442 //
443 if (!CreateProcess (
444 NULL, //
445 cmdLine, // command line
446 kumpf 1.1 NULL, // process security attributes
447 NULL, // primary thread security attributes
448 TRUE, // handles are inherited
449 0, // creation flags
450 NULL, // use parent's environment
451 NULL, // use parent's current directory
452 &siStartInfo, // STARTUPINFO
453 &piProcInfo)) // PROCESS_INFORMATION
454 {
455 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
456 "CreateProcess() failed. errno = %d.", GetLastError());
457 PEG_METHOD_EXIT();
458 throw Exception(MessageLoaderParms(
459 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
460 "Failed to start cimprovagt \"$0\".",
461 _moduleName));
462 }
463
464 CloseHandle(piProcInfo.hProcess);
465 CloseHandle(piProcInfo.hThread);
466
467 kumpf 1.1 #elif defined (PEGASUS_OS_VMS)
468
469 //
470 // fork and exec the child process
471 //
472 int status;
473
474 status = vfork ();
475 switch (status)
476 {
477 case 0:
478 try
479 {
480 //
481 // Execute the cimprovagt program
482 //
483 String agentCommandPath =
484 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
485 CString agentCommandPathCString = agentCommandPath.getCString();
486
487 char readHandle[32];
488 kumpf 1.1 char writeHandle[32];
489 pipeToAgent->exportReadHandle(readHandle);
490 pipeFromAgent->exportWriteHandle(writeHandle);
491
492 if ((status = execl(agentCommandPathCString, agentCommandPathCString,
493 readHandle, writeHandle,
494 (const char*)_moduleName.getCString(), (char*)0)) == -1);
495 {
496 // If we're still here, there was an error
497 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
498 "execl() failed. errno = %d.", errno);
499 _exit(1);
500 }
501 }
502 catch (...)
503 {
504 // There's not much we can do here in no man's land
505 try
506 {
507 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
508 "Caught exception before calling execl().");
509 kumpf 1.1 }
|
510 kumpf 1.6 catch (...)
|
511 kumpf 1.1 {
512 }
513 _exit(1);
514 }
515 PEG_METHOD_EXIT();
516 return;
517 break;
518
519 case -1:
520 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
521 "fork() failed. errno = %d.", errno);
522 PEG_METHOD_EXIT();
523 throw Exception(MessageLoaderParms(
524 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
525 "Failed to start cimprovagt \"$0\".",
526 _moduleName));
527 break;
528
529 default:
530 // Close our copies of the agent's ends of the pipes
531 pipeToAgent->closeReadHandle();
532 kumpf 1.1 pipeFromAgent->closeWriteHandle();
533
534 _pipeToAgent.reset(pipeToAgent.release());
535 _pipeFromAgent.reset(pipeFromAgent.release());
536
537 PEG_METHOD_EXIT();
538 }
539 #elif defined (PEGASUS_OS_OS400)
540
541 //Out of process provider support for OS400 goes here when needed.
542
543 #else
544
545 # ifndef PEGASUS_DISABLE_PROV_USERCTXT
546 // Get and save the effective user name and the uid/gid for the user
547 // context of the agent process
|
548 kumpf 1.6
|
549 kumpf 1.1 String effectiveUserName = System::getEffectiveUserName();
550 PEGASUS_UID_T newUid = (PEGASUS_UID_T) -1;
551 PEGASUS_GID_T newGid = (PEGASUS_GID_T) -1;
552 if (_userName != effectiveUserName)
553 {
554 if (!System::lookupUserId(_userName.getCString(), newUid, newGid))
555 {
556 throw PEGASUS_CIM_EXCEPTION_L(
557 CIM_ERR_FAILED,
558 MessageLoaderParms(
559 "ProviderManager.OOPProviderManagerRouter."
560 "USER_CONTEXT_CHANGE_FAILED",
561 "Unable to change user context to \"$0\".", _userName));
562 }
563 }
564 # endif
565
566 pid_t pid = fork();
567 if (pid < 0)
568 {
569 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
570 kumpf 1.1 "fork() failed. errno = %d.", errno);
571 PEG_METHOD_EXIT();
572 throw Exception(MessageLoaderParms(
573 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
574 "Failed to start cimprovagt \"$0\".",
575 _moduleName));
576 }
577 else if (pid == 0)
578 {
579 //
580 // Child side of the fork
581 //
582
583 try
584 {
585 // Close our copies of the parent's ends of the pipes
586 pipeToAgent->closeWriteHandle();
587 pipeFromAgent->closeReadHandle();
588
589 //
590 // Execute the cimprovagt program
591 kumpf 1.1 //
592 String agentCommandPath =
593 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
594 CString agentCommandPathCString = agentCommandPath.getCString();
595
596 char readHandle[32];
597 char writeHandle[32];
598 pipeToAgent->exportReadHandle(readHandle);
599 pipeFromAgent->exportWriteHandle(writeHandle);
600
601 # ifndef PEGASUS_DISABLE_PROV_USERCTXT
602 // Set the user context of the Provider Agent process
603 if (_userName != effectiveUserName)
604 {
605 if (!System::changeUserContext(newUid, newGid))
606 {
607 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
608 "System::changeUserContext() failed. userName = %s.",
609 (const char*)_userName.getCString());
610 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
611 Logger::WARNING,
612 kumpf 1.1 "ProviderManager.OOPProviderManagerRouter."
613 "USER_CONTEXT_CHANGE_FAILED",
614 "Unable to change user context to \"$0\".", _userName);
615 _exit(1);
616 }
617 }
618 # endif
619
|
620 kumpf 1.5 // Close all file descriptors except stdin/stdout/stderr
621 // and the pipe handles needed by the Provider Agent process.
622
623 Uint32 readFd = atoi(readHandle);
624 Uint32 writeFd = atoi(writeHandle);
625 struct rlimit fileLimit;
626
627 if (getrlimit(RLIMIT_NOFILE, &fileLimit) == 0)
628 {
629 Uint32 maxFd = (Uint32)fileLimit.rlim_cur;
630 for (Uint32 i = 3; i < maxFd - 1; i++)
631 {
632 if ((i != readFd) && (i != writeFd))
633 {
634 close(i);
635 }
636 }
637 }
638
|
639 kumpf 1.1 execl(agentCommandPathCString, agentCommandPathCString,
640 readHandle, writeHandle,
641 (const char*)_moduleName.getCString(), (char*)0);
642
643 // If we're still here, there was an error
644 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
645 "execl() failed. errno = %d.", errno);
646 _exit(1);
647 }
648 catch (...)
649 {
650 // There's not much we can do here in no man's land
651 try
652 {
653 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
654 "Caught exception before calling execl().");
655 }
656 catch (...) {}
657 _exit(1);
658 }
659 }
660 kumpf 1.1 # if defined(PEGASUS_HAS_SIGNALS)
661 _pid = pid;
662 # endif
663 #endif
664
665 //
666 // CIM Server process
667 //
668
669 // Close our copies of the agent's ends of the pipes
670 pipeToAgent->closeReadHandle();
671 pipeFromAgent->closeWriteHandle();
672
673 _pipeToAgent.reset(pipeToAgent.release());
674 _pipeFromAgent.reset(pipeFromAgent.release());
675
676 PEG_METHOD_EXIT();
677 }
678
679 // Note: Caller must lock _agentMutex
680 void ProviderAgentContainer::_sendInitializationData()
681 kumpf 1.1 {
682 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
683 "ProviderAgentContainer::_sendInitializationData");
684
685 //
686 // Gather config properties to pass to the Provider Agent
687 //
688 ConfigManager* configManager = ConfigManager::getInstance();
689 Array<Pair<String, String> > configProperties;
690
691 Array<String> configPropertyNames;
692 configManager->getAllPropertyNames(configPropertyNames, true);
693 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
694 {
695 String configPropertyValue =
696 configManager->getCurrentValue(configPropertyNames[i]);
697 String configPropertyDefaultValue =
698 configManager->getDefaultValue(configPropertyNames[i]);
699 if (configPropertyValue != configPropertyDefaultValue)
700 {
701 configProperties.append(Pair<String, String>(
702 kumpf 1.1 configPropertyNames[i], configPropertyValue));
703 }
704 }
705
706 //
707 // Create a Provider Agent initialization message
708 //
709 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
710 new CIMInitializeProviderAgentRequestMessage(
711 String("0"), // messageId
712 configManager->getPegasusHome(),
713 configProperties,
714 System::bindVerbose,
715 _subscriptionInitComplete,
716 QueueIdStack()));
717
718 //
719 // Write the initialization message to the pipe
720 //
721 AnonymousPipe::Status writeStatus =
722 _pipeToAgent->writeMessage(request.get());
723 kumpf 1.1
724 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
725 {
726 PEG_METHOD_EXIT();
727 throw Exception(MessageLoaderParms(
728 "ProviderManager.OOPProviderManagerRouter."
729 "CIMPROVAGT_COMMUNICATION_FAILED",
730 "Failed to communicate with cimprovagt \"$0\".",
731 _moduleName));
732 }
733
734 // Wait for a null response from the Provider Agent indicating it has
735 // initialized successfully.
736
737 CIMMessage* message;
738 AnonymousPipe::Status readStatus;
739 do
740 {
741 readStatus = _pipeFromAgent->readMessage(message);
742 } while (readStatus == AnonymousPipe::STATUS_INTERRUPT);
743
744 kumpf 1.1 if (readStatus != AnonymousPipe::STATUS_SUCCESS)
745 {
746 PEG_METHOD_EXIT();
747 throw Exception(MessageLoaderParms(
748 "ProviderManager.OOPProviderManagerRouter."
749 "CIMPROVAGT_COMMUNICATION_FAILED",
750 "Failed to communicate with cimprovagt \"$0\".",
751 _moduleName));
752 }
753
754 PEGASUS_ASSERT(message == 0);
755
756 PEG_METHOD_EXIT();
757 }
758
759 // Note: Caller must lock _agentMutex
760 void ProviderAgentContainer::_initialize()
761 {
762 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
763 "ProviderAgentContainer::_initialize");
764
765 kumpf 1.1 if (_isInitialized)
766 {
767 PEGASUS_ASSERT(0);
768 PEG_METHOD_EXIT();
769 return;
770 }
771
772 if (_maxProviderProcesses == PEG_NOT_FOUND)
773 {
774 String maxProviderProcesses = ConfigManager::getInstance()->
775 getCurrentValue("maxProviderProcesses");
776 CString maxProviderProcessesString = maxProviderProcesses.getCString();
777 char* end = 0;
778 _maxProviderProcesses = strtol(maxProviderProcessesString, &end, 10);
779 }
780
781 {
782 AutoMutex lock(_numProviderProcessesMutex);
783 if ((_maxProviderProcesses != 0) &&
784 (_numProviderProcesses >= _maxProviderProcesses))
785 {
786 kumpf 1.1 throw PEGASUS_CIM_EXCEPTION(
787 CIM_ERR_FAILED,
788 MessageLoaderParms(
789 "ProviderManager.OOPProviderManagerRouter."
790 "MAX_PROVIDER_PROCESSES_REACHED",
791 "The maximum number of cimprovagt processes has been "
792 "reached."));
793 }
794 else
795 {
796 _numProviderProcesses++;
797 }
798 }
799
800 try
801 {
802 _startAgentProcess();
803
804 _isInitialized = true;
805
806 _sendInitializationData();
807 kumpf 1.1
808 // Start a thread to read and process responses from the Provider Agent
809 ThreadStatus rtn = PEGASUS_THREAD_OK;
810 while ((rtn = MessageQueueService::get_thread_pool()->
811 allocate_and_awaken(this, _responseProcessor)) !=
812 PEGASUS_THREAD_OK)
813 {
814 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
815 {
816 Threads::yield();
817 }
818 else
819 {
820 Logger::put(
821 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
822 "Not enough threads to process responses from the "
823 "provider agent.");
|
824 kumpf 1.6
|
825 kumpf 1.1 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
826 "Could not allocate thread to process responses from the "
827 "provider agent.");
828
829 throw Exception(MessageLoaderParms(
830 "ProviderManager.OOPProviderManagerRouter."
831 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
832 "Failed to allocate thread for cimprovagt \"$0\".",
833 _moduleName));
834 }
835 }
836 }
837 catch (...)
838 {
839 // Closing the connection causes the agent process to exit
840 _pipeToAgent.reset();
841 _pipeFromAgent.reset();
842
843 #if defined(PEGASUS_HAS_SIGNALS)
844 if (_isInitialized)
845 {
846 kumpf 1.1 // Harvest the status of the agent process to prevent a zombie
|
847 kumpf 1.4 pid_t status = 0;
|
848 kumpf 1.1 do
849 {
|
850 kumpf 1.4 status = waitpid(_pid, 0, 0);
851 } while ((status == -1) && (errno == EINTR));
852
853 if (status == -1)
854 {
855 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
856 "ProviderAgentContainer::_initialize(): "
857 "waitpid failed; errno = %d.", errno);
858 }
|
859 kumpf 1.1 }
860 #endif
861
862 _isInitialized = false;
863
864 {
865 AutoMutex lock(_numProviderProcessesMutex);
866 _numProviderProcesses--;
867 }
868
869 PEG_METHOD_EXIT();
870 throw;
871 }
872
873 PEG_METHOD_EXIT();
874 }
875
876 Boolean ProviderAgentContainer::isInitialized()
877 {
878 AutoMutex lock(_agentMutex);
879 return _isInitialized;
880 kumpf 1.1 }
881
882 void ProviderAgentContainer::_uninitialize(Boolean cleanShutdown)
883 {
884 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
885 "ProviderAgentContainer::_uninitialize");
886
|
887 kumpf 1.7 #if defined(PEGASUS_HAS_SIGNALS)
888 pid_t pid;
889 #endif
|
890 kumpf 1.1
891 try
892 {
|
893 kumpf 1.7 AutoMutex lock(_agentMutex);
894
895 PEGASUS_ASSERT(_isInitialized);
896
|
897 kumpf 1.1 // Close the connection with the Provider Agent
898 _pipeFromAgent.reset();
899 _pipeToAgent.reset();
900
901 _providerModuleCache = CIMInstance();
902
903 {
904 AutoMutex lock(_numProviderProcessesMutex);
905 _numProviderProcesses--;
906 }
907
|
908 kumpf 1.7 _isInitialized = false;
909
|
910 kumpf 1.1 #if defined(PEGASUS_HAS_SIGNALS)
|
911 kumpf 1.7 // Save the _pid so we can use it after we've released the _agentMutex
912 pid = _pid;
|
913 kumpf 1.1 #endif
914
915 //
916 // Complete with null responses all outstanding requests on this
917 // connection
918 //
919 {
920 AutoMutex tableLock(_outstandingRequestTableMutex);
921
922 CIMResponseMessage* response =
923 cleanShutdown ? _REQUEST_NOT_PROCESSED : 0;
924
925 for (OutstandingRequestTable::Iterator i =
926 _outstandingRequestTable.start();
927 i != 0; i++)
928 {
929 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
930 String("Completing messageId \"") + i.key() +
931 "\" with a null response.");
932 i.value()->responseMessage = response;
933 i.value()->responseReady->signal();
934 kumpf 1.1 }
935
936 _outstandingRequestTable.clear();
|
937 kumpf 1.3 }
|
938 kumpf 1.1
|
939 kumpf 1.3 //
940 // If not a clean shutdown, call the provider module failure callback
941 //
942 if (!cleanShutdown)
943 {
|
944 kumpf 1.1 //
|
945 kumpf 1.3 // Call the provider module failure callback to communicate
946 // the failure to the Provider Manager Service. The Provider
947 // Manager Service will inform the Indication Service.
|
948 kumpf 1.1 //
|
949 kumpf 1.3 _providerModuleFailCallback(_moduleName, _userName, _userContext);
|
950 kumpf 1.1 }
951 }
952 catch (...)
953 {
954 // We're uninitializing, so do not propagate the exception
955 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
956 "Ignoring _uninitialize() exception.");
957 }
958
|
959 kumpf 1.7 #if defined(PEGASUS_HAS_SIGNALS)
960 // Harvest the status of the agent process to prevent a zombie. Do not
961 // hold the _agentMutex during this operation.
962 pid_t status = 0;
963 do
964 {
965 status = waitpid(pid, 0, 0);
966 } while ((status == -1) && (errno == EINTR));
967
968 if (status == -1)
969 {
970 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
971 "ProviderAgentContainer::_uninitialize(): "
972 "waitpid failed; errno = %d.", errno);
973 }
974 #endif
975
|
976 kumpf 1.1 PEG_METHOD_EXIT();
977 }
978
979 String ProviderAgentContainer::getModuleName() const
980 {
981 return _moduleName;
982 }
983
984 CIMResponseMessage* ProviderAgentContainer::processMessage(
985 CIMRequestMessage* request)
986 {
987 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
988 "ProviderAgentContainer::processMessage");
989
990 CIMResponseMessage* response;
991
992 do
993 {
994 response = _processMessage(request);
995
996 if (response == _REQUEST_NOT_PROCESSED)
997 kumpf 1.1 {
998 // Check for request message types that should not be retried.
999 if ((request->getType() ==
1000 CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
1001 (request->getType() ==
1002 CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) ||
1003 (request->getType() ==
1004 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
1005 (request->getType() ==
1006 CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE))
1007 {
1008 response = request->buildResponse();
1009 break;
1010 }
1011 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1012 {
1013 CIMDisableModuleResponseMessage* dmResponse =
1014 dynamic_cast<CIMDisableModuleResponseMessage*>(response);
1015 PEGASUS_ASSERT(dmResponse != 0);
1016
1017 Array<Uint16> operationalStatus;
1018 kumpf 1.1 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1019 dmResponse->operationalStatus = operationalStatus;
1020 break;
1021 }
1022 }
1023 } while (response == _REQUEST_NOT_PROCESSED);
1024
|
1025 kumpf 1.2 if (request->getType() == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1026 {
1027 _subscriptionInitComplete = true;
1028 }
1029
|
1030 kumpf 1.1 PEG_METHOD_EXIT();
1031 return response;
1032 }
1033
1034 CIMResponseMessage* ProviderAgentContainer::_processMessage(
1035 CIMRequestMessage* request)
1036 {
1037 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1038 "ProviderAgentContainer::_processMessage");
1039
1040 CIMResponseMessage* response;
1041 String originalMessageId = request->messageId;
1042
1043 // These three variables are used for the provider module optimization.
1044 // See the _providerModuleCache member description for more information.
1045 AutoPtr<ProviderIdContainer> origProviderId;
1046 Boolean doProviderModuleOptimization = false;
1047 Boolean updateProviderModuleCache = false;
1048
1049 try
1050 {
1051 kumpf 1.1 // The messageId attribute is used to correlate response messages
1052 // from the Provider Agent with request messages, so it is imperative
1053 // that the ID is unique for each request. The incoming ID cannot be
1054 // trusted to be unique, so we substitute a unique one. The memory
1055 // address of the request is used as the source of a unique piece of
1056 // data. (The message ID is only required to be unique while the
1057 // request is outstanding.)
1058 char messagePtrString[20];
1059 sprintf(messagePtrString, "%p", request);
1060 String uniqueMessageId = messagePtrString;
1061
1062 //
1063 // Set up the OutstandingRequestEntry for this request
1064 //
1065 Semaphore waitSemaphore(0);
1066 OutstandingRequestEntry outstandingRequestEntry(
1067 originalMessageId, request, response, &waitSemaphore);
1068
1069 //
1070 // Lock the Provider Agent Container while initializing the
1071 // agent and writing the request to the connection
1072 kumpf 1.1 //
1073 {
1074 AutoMutex lock(_agentMutex);
1075
1076 //
1077 // Initialize the Provider Agent, if necessary
1078 //
1079 if (!_isInitialized)
1080 {
1081 _initialize();
1082 }
1083
1084 //
1085 // Add an entry to the OutstandingRequestTable for this request
1086 //
1087 {
1088 AutoMutex tableLock(_outstandingRequestTableMutex);
1089
1090 _outstandingRequestTable.insert(
1091 uniqueMessageId, &outstandingRequestEntry);
1092 }
1093 kumpf 1.1
1094 // Get the provider module from the ProviderIdContainer to see if
1095 // we can optimize out the transmission of this instance to the
1096 // Provider Agent. (See the _providerModuleCache description.)
|
1097 kumpf 1.6 if (request->operationContext.contains(ProviderIdContainer::NAME))
|
1098 kumpf 1.1 {
1099 ProviderIdContainer pidc = request->operationContext.get(
1100 ProviderIdContainer::NAME);
1101 origProviderId.reset(new ProviderIdContainer(
1102 pidc.getModule(), pidc.getProvider(),
1103 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1104 if (_providerModuleCache.isUninitialized() ||
1105 (!pidc.getModule().identical(_providerModuleCache)))
1106 {
1107 // We haven't sent this provider module instance to the
1108 // Provider Agent yet. Update our cache after we send it.
1109 updateProviderModuleCache = true;
1110 }
1111 else
1112 {
1113 // Replace the provider module in the ProviderIdContainer
1114 // with an uninitialized instance. We'll need to put the
1115 // original one back after the message is sent.
1116 request->operationContext.set(ProviderIdContainer(
1117 CIMInstance(), pidc.getProvider(),
1118 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1119 kumpf 1.1 doProviderModuleOptimization = true;
1120 }
1121 }
1122
1123 //
1124 // Write the message to the pipe
1125 //
1126 try
1127 {
1128 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3,
1129 String("Sending request to agent with messageId ") +
1130 uniqueMessageId);
1131
1132 request->messageId = uniqueMessageId;
1133 AnonymousPipe::Status writeStatus =
1134 _pipeToAgent->writeMessage(request);
1135 request->messageId = originalMessageId;
1136
1137 if (doProviderModuleOptimization)
1138 {
1139 request->operationContext.set(*origProviderId.get());
1140 kumpf 1.1 }
1141
1142 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
1143 {
1144 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1145 "Failed to write message to pipe. writeStatus = %d.",
1146 writeStatus);
1147
1148 request->messageId = originalMessageId;
1149
1150 if (doProviderModuleOptimization)
1151 {
1152 request->operationContext.set(*origProviderId.get());
1153 }
1154
1155 // Remove this OutstandingRequestTable entry
1156 {
1157 AutoMutex tableLock(_outstandingRequestTableMutex);
1158 Boolean removed =
1159 _outstandingRequestTable.remove(uniqueMessageId);
1160 PEGASUS_ASSERT(removed);
1161 kumpf 1.1 }
1162
1163 // A response value of _REQUEST_NOT_PROCESSED indicates
1164 // that the request was not processed by the provider
1165 // agent, so it can be retried safely.
1166 PEG_METHOD_EXIT();
1167 return _REQUEST_NOT_PROCESSED;
1168 }
1169
1170 if (updateProviderModuleCache)
1171 {
1172 _providerModuleCache = origProviderId->getModule();
1173 }
1174 }
1175 catch (...)
1176 {
1177 request->messageId = originalMessageId;
1178
1179 if (doProviderModuleOptimization)
1180 {
1181 request->operationContext.set(*origProviderId.get());
1182 kumpf 1.1 }
1183
1184 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1185 "Failed to write message to pipe.");
1186 // Remove the OutstandingRequestTable entry for this request
1187 {
1188 AutoMutex tableLock(_outstandingRequestTableMutex);
1189 Boolean removed =
1190 _outstandingRequestTable.remove(uniqueMessageId);
1191 PEGASUS_ASSERT(removed);
1192 }
1193 PEG_METHOD_EXIT();
1194 throw;
1195 }
1196 }
1197
1198 //
1199 // Wait for the response
1200 //
1201 try
1202 {
1203 kumpf 1.1 // Must not hold _agentMutex while waiting for the response
1204 waitSemaphore.wait();
1205 }
1206 catch (...)
1207 {
1208 // Remove the OutstandingRequestTable entry for this request
1209 {
1210 AutoMutex tableLock(_outstandingRequestTableMutex);
1211 Boolean removed =
1212 _outstandingRequestTable.remove(uniqueMessageId);
1213 PEGASUS_ASSERT(removed);
1214 }
1215 PEG_METHOD_EXIT();
1216 throw;
1217 }
1218
1219 // A response value of _REQUEST_NOT_PROCESSED indicates that the
1220 // provider agent process was terminating when the request was sent.
|
1221 kumpf 1.6 // The request was not processed by the provider agent, so it can be
|
1222 kumpf 1.1 // retried safely.
1223 if (response == _REQUEST_NOT_PROCESSED)
1224 {
1225 PEG_METHOD_EXIT();
1226 return response;
1227 }
1228
1229 // A null response is returned when an agent connection is closed
1230 // while requests remain outstanding.
1231 if (response == 0)
1232 {
1233 response = request->buildResponse();
1234 response->cimException = PEGASUS_CIM_EXCEPTION(
1235 CIM_ERR_FAILED,
1236 MessageLoaderParms(
1237 "ProviderManager.OOPProviderManagerRouter."
1238 "CIMPROVAGT_CONNECTION_LOST",
1239 "Lost connection with cimprovagt \"$0\".",
1240 _moduleName));
1241 }
1242 }
1243 kumpf 1.1 catch (CIMException& e)
1244 {
1245 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1246 String("Caught exception: ") + e.getMessage());
1247 response = request->buildResponse();
1248 response->cimException = e;
1249 }
1250 catch (Exception& e)
1251 {
1252 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1253 String("Caught exception: ") + e.getMessage());
1254 response = request->buildResponse();
1255 response->cimException = PEGASUS_CIM_EXCEPTION(
1256 CIM_ERR_FAILED, e.getMessage());
1257 }
1258 catch (...)
1259 {
1260 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1261 "Caught unknown exception");
1262 response = request->buildResponse();
1263 response->cimException = PEGASUS_CIM_EXCEPTION(
1264 kumpf 1.1 CIM_ERR_FAILED, String::EMPTY);
1265 }
1266
1267 response->messageId = originalMessageId;
1268
1269 PEG_METHOD_EXIT();
1270 return response;
1271 }
1272
1273 void ProviderAgentContainer::unloadIdleProviders()
1274 {
1275 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1276 "ProviderAgentContainer::unloadIdleProviders");
1277
1278 AutoMutex lock(_agentMutex);
1279 if (_isInitialized)
1280 {
1281 // Send a "wake up" message to the Provider Agent.
1282 // Don't bother checking whether the operation is successful.
1283 Uint32 messageLength = 0;
1284 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
1285 kumpf 1.1 }
1286
1287 PEG_METHOD_EXIT();
1288 }
1289
1290 void ProviderAgentContainer::_processResponses()
1291 {
1292 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1293 "ProviderAgentContainer::_processResponses");
1294
1295 //
1296 // Process responses until the pipe is closed
1297 //
1298 while (1)
1299 {
1300 try
1301 {
1302 CIMMessage* message;
1303
1304 //
1305 // Read a response from the Provider Agent
1306 kumpf 1.1 //
1307 AnonymousPipe::Status readStatus =
1308 _pipeFromAgent->readMessage(message);
1309
1310 // Ignore interrupts
1311 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
1312 {
1313 continue;
1314 }
1315
1316 // Handle an error the same way as a closed connection
1317 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
1318 (readStatus == AnonymousPipe::STATUS_CLOSED))
1319 {
1320 _uninitialize(false);
1321 return;
1322 }
1323
1324 // A null message indicates that the provider agent process has
1325 // finished its processing and is ready to exit.
1326 if (message == 0)
1327 kumpf 1.1 {
1328 _uninitialize(true);
1329 return;
1330 }
1331
1332 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
1333 {
1334 // Forward indications to the indication callback
1335 _indicationCallback(
1336 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
1337 message));
1338 }
1339 else if (!message->isComplete())
1340 {
1341 CIMResponseMessage* response;
1342 response = dynamic_cast<CIMResponseMessage*>(message);
1343 PEGASUS_ASSERT(response != 0);
1344
1345 // Get the OutstandingRequestEntry for this response chunk
1346 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1347 {
1348 kumpf 1.1 AutoMutex tableLock(_outstandingRequestTableMutex);
1349 Boolean foundEntry = _outstandingRequestTable.lookup(
1350 response->messageId, _outstandingRequestEntry);
1351 PEGASUS_ASSERT(foundEntry);
1352 }
1353
1354 // Put the original message ID into the response
1355 response->messageId =
1356 _outstandingRequestEntry->originalMessageId;
1357
1358 // Call the response chunk callback to process the chunk
1359 _responseChunkCallback(
1360 _outstandingRequestEntry->requestMessage, response);
1361 }
1362 else
1363 {
1364 CIMResponseMessage* response;
1365 response = dynamic_cast<CIMResponseMessage*>(message);
1366 PEGASUS_ASSERT(response != 0);
1367
1368 // Give the response to the waiting OutstandingRequestEntry
1369 kumpf 1.1 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1370 {
1371 AutoMutex tableLock(_outstandingRequestTableMutex);
1372 Boolean foundEntry = _outstandingRequestTable.lookup(
1373 response->messageId, _outstandingRequestEntry);
1374 PEGASUS_ASSERT(foundEntry);
1375
1376 // Remove the completed request from the table
1377 Boolean removed =
1378 _outstandingRequestTable.remove(response->messageId);
1379 PEGASUS_ASSERT(removed);
1380 }
1381
1382 _outstandingRequestEntry->responseMessage = response;
1383 _outstandingRequestEntry->responseReady->signal();
1384 }
1385 }
1386 catch (Exception& e)
1387 {
1388 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1389 String("Ignoring exception: ") + e.getMessage());
1390 kumpf 1.1 }
1391 catch (...)
1392 {
1393 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1394 "Ignoring exception");
1395 }
1396 }
1397
1398 }
1399
1400 ThreadReturnType PEGASUS_THREAD_CDECL
1401 ProviderAgentContainer::_responseProcessor(void* arg)
1402 {
1403 ProviderAgentContainer* pa =
1404 reinterpret_cast<ProviderAgentContainer*>(arg);
1405
1406 pa->_processResponses();
1407
|
1408 kumpf 1.6 return ThreadReturnType(0);
|
1409 kumpf 1.1 }
1410
1411 /////////////////////////////////////////////////////////////////////////////
1412 // OOPProviderManagerRouter
1413 /////////////////////////////////////////////////////////////////////////////
1414
1415 OOPProviderManagerRouter::OOPProviderManagerRouter(
1416 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
1417 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
1418 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback)
1419 {
1420 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1421 "OOPProviderManagerRouter::OOPProviderManagerRouter");
1422
1423 _indicationCallback = indicationCallback;
1424 _responseChunkCallback = responseChunkCallback;
1425 _providerModuleFailCallback = providerModuleFailCallback;
1426 _subscriptionInitComplete = false;
1427
1428 PEG_METHOD_EXIT();
1429 }
1430 kumpf 1.1
1431 OOPProviderManagerRouter::~OOPProviderManagerRouter()
1432 {
1433 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1434 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
1435
1436 try
1437 {
1438 // Clean up the ProviderAgentContainers
1439 AutoMutex lock(_providerAgentTableMutex);
1440 ProviderAgentTable::Iterator i = _providerAgentTable.start();
|
1441 kumpf 1.6 for (; i != 0; i++)
|
1442 kumpf 1.1 {
1443 delete i.value();
1444 }
1445 }
1446 catch (...) {}
1447
1448 PEG_METHOD_EXIT();
1449 }
1450
1451 Message* OOPProviderManagerRouter::processMessage(Message* message)
1452 {
1453 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1454 "OOPProviderManagerRouter::processMessage");
1455
1456 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
1457 PEGASUS_ASSERT(request != 0);
1458
1459 AutoPtr<CIMResponseMessage> response;
1460
1461 //
1462 // Get the provider information from the request
1463 kumpf 1.1 //
1464 CIMInstance providerModule;
1465
1466 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
1467 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
1468 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
1469 {
1470 // Provider information is in the OperationContext
1471 ProviderIdContainer pidc = (ProviderIdContainer)
1472 request->operationContext.get(ProviderIdContainer::NAME);
1473 providerModule = pidc.getModule();
1474 }
1475 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1476 {
1477 CIMEnableModuleRequestMessage* emReq =
1478 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
1479 providerModule = emReq->providerModule;
1480 }
1481 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1482 {
1483 CIMDisableModuleRequestMessage* dmReq =
1484 kumpf 1.1 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
1485 providerModule = dmReq->providerModule;
1486 }
1487 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
1488 (request->getType() ==
1489 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
1490 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1491 {
1492 // This operation is not provider-specific
1493 }
1494 else
1495 {
1496 // Unrecognized message type. This should never happen.
1497 PEGASUS_ASSERT(0);
1498 response.reset(request->buildResponse());
1499 response->cimException = PEGASUS_CIM_EXCEPTION(
1500 CIM_ERR_FAILED, "Unrecognized message type.");
1501 PEG_METHOD_EXIT();
1502 return response.release();
1503 }
1504
1505 kumpf 1.1 //
1506 // Process the request message
1507 //
1508 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1509 {
1510 // Forward the CIMStopAllProvidersRequest to all providers
1511 response.reset(_forwardRequestToAllAgents(request));
1512
1513 // Note: Do not uninitialize the ProviderAgentContainers here.
1514 // Just let the selecting thread notice when the agent connections
1515 // are closed.
1516 }
|
1517 kumpf 1.6 else if (request->getType () ==
|
1518 kumpf 1.1 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1519 {
1520 _subscriptionInitComplete = true;
1521
1522 //
|
1523 kumpf 1.6 // Forward the CIMSubscriptionInitCompleteRequestMessage to
|
1524 kumpf 1.1 // all providers
1525 //
1526 response.reset (_forwardRequestToAllAgents (request));
1527 }
1528 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1529 {
1530 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1531 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1532 PEGASUS_ASSERT(notifyRequest != 0);
1533
1534 if (notifyRequest->currentValueModified)
1535 {
1536 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1537 response.reset(_forwardRequestToAllAgents(request));
1538 }
1539 else
1540 {
1541 // No need to notify provider agents about changes to planned value
1542 response.reset(request->buildResponse());
1543 }
1544 }
1545 kumpf 1.1 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1546 {
1547 // Fan out the request to all Provider Agent processes for this module
1548
1549 // Retrieve the provider module name
1550 String moduleName;
1551 CIMValue nameValue = providerModule.getProperty(
1552 providerModule.findProperty("Name")).getValue();
1553 nameValue.get(moduleName);
1554
1555 // Look up the Provider Agents for this module
1556 Array<ProviderAgentContainer*> paArray =
1557 _lookupProviderAgents(moduleName);
1558
1559 for (Uint32 i=0; i<paArray.size(); i++)
1560 {
1561 //
1562 // Do not start up an agent process just to disable the module
1563 //
1564 if (paArray[i]->isInitialized())
1565 {
1566 kumpf 1.1 //
1567 // Forward the request to the provider agent
1568 //
1569 response.reset(paArray[i]->processMessage(request));
1570
1571 // Note: Do not uninitialize the ProviderAgentContainer here
1572 // when a disable module operation is successful. Just let the
1573 // selecting thread notice when the agent connection is closed.
1574
1575 // Determine the success of the disable module operation
1576 CIMDisableModuleResponseMessage* dmResponse =
1577 dynamic_cast<CIMDisableModuleResponseMessage*>(
1578 response.get());
1579 PEGASUS_ASSERT(dmResponse != 0);
1580
1581 Boolean isStopped = false;
1582 for (Uint32 i=0; i < dmResponse->operationalStatus.size(); i++)
1583 {
1584 if (dmResponse->operationalStatus[i] ==
1585 CIM_MSE_OPSTATUS_VALUE_STOPPED)
1586 {
1587 kumpf 1.1 isStopped = true;
1588 break;
1589 }
1590 }
1591
1592 // If the operation is unsuccessful, stop and return the error
1593 if ((dmResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1594 !isStopped)
1595 {
1596 break;
1597 }
1598 }
1599 }
1600
1601 // Use a default response if no Provider Agents were called
1602 if (!response.get())
1603 {
1604 response.reset(request->buildResponse());
1605
1606 CIMDisableModuleResponseMessage* dmResponse =
1607 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1608 kumpf 1.1 PEGASUS_ASSERT(dmResponse != 0);
1609
1610 Array<Uint16> operationalStatus;
1611 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1612 dmResponse->operationalStatus = operationalStatus;
1613 }
1614 }
1615 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1616 {
1617 // Fan out the request to all Provider Agent processes for this module
1618
1619 // Retrieve the provider module name
1620 String moduleName;
1621 CIMValue nameValue = providerModule.getProperty(
1622 providerModule.findProperty("Name")).getValue();
1623 nameValue.get(moduleName);
1624
1625 // Look up the Provider Agents for this module
1626 Array<ProviderAgentContainer*> paArray =
1627 _lookupProviderAgents(moduleName);
1628
1629 kumpf 1.1 for (Uint32 i=0; i<paArray.size(); i++)
1630 {
1631 //
1632 // Do not start up an agent process just to enable the module
1633 //
1634 if (paArray[i]->isInitialized())
1635 {
1636 //
1637 // Forward the request to the provider agent
1638 //
1639 response.reset(paArray[i]->processMessage(request));
1640
1641 // Determine the success of the enable module operation
1642 CIMEnableModuleResponseMessage* emResponse =
1643 dynamic_cast<CIMEnableModuleResponseMessage*>(
1644 response.get());
1645 PEGASUS_ASSERT(emResponse != 0);
1646
1647 Boolean isOk = false;
1648 for (Uint32 i=0; i < emResponse->operationalStatus.size(); i++)
1649 {
1650 kumpf 1.1 if (emResponse->operationalStatus[i] ==
1651 CIM_MSE_OPSTATUS_VALUE_OK)
1652 {
1653 isOk = true;
1654 break;
1655 }
1656 }
1657
1658 // If the operation is unsuccessful, stop and return the error
1659 if ((emResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1660 !isOk)
1661 {
1662 break;
1663 }
1664 }
1665 }
1666
1667 // Use a default response if no Provider Agents were called
1668 if (!response.get())
1669 {
1670 response.reset(request->buildResponse());
1671 kumpf 1.1
1672 CIMEnableModuleResponseMessage* emResponse =
1673 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1674 PEGASUS_ASSERT(emResponse != 0);
1675
1676 Array<Uint16> operationalStatus;
1677 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1678 emResponse->operationalStatus = operationalStatus;
1679 }
1680 }
1681 else
1682 {
1683 //
1684 // Look up the Provider Agent for this module instance and requesting
1685 // user
1686 //
1687 ProviderAgentContainer* pa = _lookupProviderAgent(providerModule,
1688 request);
1689 PEGASUS_ASSERT(pa != 0);
1690
1691 //
1692 kumpf 1.1 // Forward the request to the provider agent
1693 //
1694 response.reset(pa->processMessage(request));
1695 }
1696
1697 response->syncAttributes(request);
1698
1699 PEG_METHOD_EXIT();
1700 return response.release();
1701 }
1702
1703 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
1704 const CIMInstance& providerModule,
1705 CIMRequestMessage* request)
1706 {
1707 // Retrieve the provider module name
1708 String moduleName;
1709 CIMValue nameValue = providerModule.getProperty(
1710 providerModule.findProperty("Name")).getValue();
1711 nameValue.get(moduleName);
1712
1713 kumpf 1.1 // Retrieve the provider user context configuration
1714 Uint16 userContext = 0;
1715 Uint32 pos = providerModule.findProperty(
1716 PEGASUS_PROPERTYNAME_MODULE_USERCONTEXT);
1717 if (pos != PEG_NOT_FOUND)
1718 {
1719 CIMValue userContextValue =
1720 providerModule.getProperty(pos).getValue();
1721 if (!userContextValue.isNull())
1722 {
1723 userContextValue.get(userContext);
1724 }
1725 }
1726
1727 if (userContext == 0)
1728 {
1729 userContext = PEGASUS_DEFAULT_PROV_USERCTXT;
1730 }
1731
1732 String userName;
1733
1734 kumpf 1.1 if (userContext == PG_PROVMODULE_USERCTXT_REQUESTOR)
1735 {
|
1736 kumpf 1.6 if (request->operationContext.contains(IdentityContainer::NAME))
|
1737 kumpf 1.1 {
1738 // User Name is in the OperationContext
1739 IdentityContainer ic = (IdentityContainer)
1740 request->operationContext.get(IdentityContainer::NAME);
1741 userName = ic.getUserName();
1742 }
1743 //else
1744 //{
1745 // If no IdentityContainer is present, default to the CIM
1746 // Server's user context
1747 //}
1748
1749 // If authentication is disabled, use the CIM Server's user context
1750 if (!userName.size())
1751 {
1752 userName = System::getEffectiveUserName();
1753 }
1754 }
1755 else if (userContext == PG_PROVMODULE_USERCTXT_DESIGNATED)
1756 {
1757 // Retrieve the provider module designated user property value
1758 kumpf 1.1 providerModule.getProperty(providerModule.findProperty(
1759 PEGASUS_PROPERTYNAME_MODULE_DESIGNATEDUSER)).getValue().
1760 get(userName);
1761 }
1762 else if (userContext == PG_PROVMODULE_USERCTXT_CIMSERVER)
1763 {
1764 userName = System::getEffectiveUserName();
1765 }
1766 else // Privileged User
1767 {
1768 PEGASUS_ASSERT(userContext == PG_PROVMODULE_USERCTXT_PRIVILEGED);
1769 userName = System::getPrivilegedUserName();
1770 }
1771
1772 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1773 "Module name = " + moduleName);
1774 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1775 "User context = %hd.", userContext);
1776 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1777 "User name = " + userName);
1778
1779 kumpf 1.1 ProviderAgentContainer* pa = 0;
1780 String key = moduleName + ":" + userName;
1781
1782 AutoMutex lock(_providerAgentTableMutex);
1783 if (!_providerAgentTable.lookup(key, pa))
1784 {
1785 pa = new ProviderAgentContainer(
1786 moduleName, userName, userContext,
1787 _indicationCallback, _responseChunkCallback,
1788 _providerModuleFailCallback,
1789 _subscriptionInitComplete);
1790 _providerAgentTable.insert(key, pa);
1791 }
1792 return pa;
1793 }
1794
1795 Array<ProviderAgentContainer*> OOPProviderManagerRouter::_lookupProviderAgents(
1796 const String& moduleName)
1797 {
1798 Array<ProviderAgentContainer*> paArray;
1799
1800 kumpf 1.1 AutoMutex lock(_providerAgentTableMutex);
1801 for (ProviderAgentTable::Iterator i = _providerAgentTable.start(); i; i++)
1802 {
1803 if (i.value()->getModuleName() == moduleName)
1804 {
1805 paArray.append(i.value());
1806 }
1807 }
1808 return paArray;
1809 }
1810
1811 CIMResponseMessage* OOPProviderManagerRouter::_forwardRequestToAllAgents(
1812 CIMRequestMessage* request)
1813 {
1814 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1815 "OOPProviderManagerRouter::_forwardRequestToAllAgents");
1816
1817 // Get a list of the ProviderAgentContainers. We need our own array copy
1818 // because we cannot hold the _providerAgentTableMutex while calling
1819 // _ProviderAgentContainer::processMessage().
1820 Array<ProviderAgentContainer*> paContainerArray;
1821 kumpf 1.1 {
1822 AutoMutex tableLock(_providerAgentTableMutex);
1823 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1824 i != 0; i++)
1825 {
1826 paContainerArray.append(i.value());
1827 }
1828 }
1829
1830 CIMException responseException;
1831
1832 // Forward the request to each of the initialized provider agents
1833 for (Uint32 j = 0; j < paContainerArray.size(); j++)
1834 {
1835 ProviderAgentContainer* pa = paContainerArray[j];
1836 if (pa->isInitialized())
1837 {
1838 // Note: The ProviderAgentContainer could become uninitialized
1839 // before _ProviderAgentContainer::processMessage() processes
1840 // this request. In this case, the Provider Agent process will
1841 // (unfortunately) be started to process this message.
1842 kumpf 1.1 AutoPtr<CIMResponseMessage> response;
1843 response.reset(pa->processMessage(request));
1844 if (response.get() != 0)
1845 {
1846 // If the operation failed, save the exception data
1847 if ((response->cimException.getCode() != CIM_ERR_SUCCESS) &&
1848 (responseException.getCode() == CIM_ERR_SUCCESS))
1849 {
1850 responseException = response->cimException;
1851 }
1852 }
1853 }
1854 }
1855
1856 CIMResponseMessage* response = request->buildResponse();
1857 response->cimException = responseException;
1858
1859 PEG_METHOD_EXIT();
1860 return response;
1861 }
1862
1863 kumpf 1.1 void OOPProviderManagerRouter::unloadIdleProviders()
1864 {
1865 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1866 "OOPProviderManagerRouter::unloadIdleProviders");
1867
|
1868 kumpf 1.7 // Get a list of the ProviderAgentContainers. We need our own array copy
1869 // because we cannot hold the _providerAgentTableMutex while calling
1870 // ProviderAgentContainer::unloadIdleProviders().
1871 Array<ProviderAgentContainer*> paContainerArray;
1872 {
1873 AutoMutex tableLock(_providerAgentTableMutex);
1874 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1875 i != 0; i++)
1876 {
1877 paContainerArray.append(i.value());
1878 }
1879 }
1880
|
1881 kumpf 1.1 // Iterate through the _providerAgentTable unloading idle providers
|
1882 kumpf 1.7 for (Uint32 j = 0; j < paContainerArray.size(); j++)
|
1883 kumpf 1.1 {
|
1884 kumpf 1.7 paContainerArray[j]->unloadIdleProviders();
|
1885 kumpf 1.1 }
1886
1887 PEG_METHOD_EXIT();
1888 }
1889
1890 PEGASUS_NAMESPACE_END
|