1 kumpf 1.1 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 kumpf 1.1 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include <Pegasus/Common/Signal.h>
35 #include <Pegasus/Common/Config.h>
36 #include <Pegasus/Common/Constants.h>
37 #include <Pegasus/Common/AutoPtr.h>
38 #include <Pegasus/Common/ArrayInternal.h>
39 #include <Pegasus/Common/CIMMessage.h>
40 #include <Pegasus/Common/CIMMessageSerializer.h>
41 #include <Pegasus/Common/CIMMessageDeserializer.h>
42 #include <Pegasus/Common/OperationContextInternal.h>
43 kumpf 1.1 #include <Pegasus/Common/System.h>
44 #include <Pegasus/Common/AnonymousPipe.h>
45 #include <Pegasus/Common/Tracer.h>
46 #include <Pegasus/Common/Logger.h>
47 #include <Pegasus/Common/Thread.h>
48 #include <Pegasus/Common/MessageQueueService.h>
49 #include <Pegasus/Config/ConfigManager.h>
50
51 #if defined (PEGASUS_OS_TYPE_WINDOWS)
52 # include <windows.h> // For CreateProcess()
53 #elif defined (PEGASUS_OS_OS400)
54 # include <unistd.cleinc>
55 #elif defined (PEGASUS_OS_VMS)
56 # include <perror.h>
57 # include <climsgdef.h>
58 # include <stdio.h>
59 # include <stdlib.h>
60 # include <string.h>
61 # include <processes.h>
62 # include <unixio.h>
63 #else
64 kumpf 1.1 # include <unistd.h> // For fork(), exec(), and _exit()
65 # include <errno.h>
66 # include <sys/types.h>
|
67 kumpf 1.5 # include <sys/resource.h>
|
68 kumpf 1.1 # if defined(PEGASUS_HAS_SIGNALS)
69 # include <sys/wait.h>
70 # endif
71 #endif
72
73 #include "OOPProviderManagerRouter.h"
74
75 PEGASUS_USING_STD;
76
77 PEGASUS_NAMESPACE_BEGIN
78
79 /////////////////////////////////////////////////////////////////////////////
80 // OutstandingRequestTable and OutstandingRequestEntry
81 /////////////////////////////////////////////////////////////////////////////
82
83 /**
84 An OutstandingRequestEntry represents a request message sent to a
85 Provider Agent for which no response has been received. The request
86 sender provides the message ID and a location for the response to be
87 returned, and then waits on the semaphore. When a response matching
88 the message ID is received, it is placed into the specified location
89 kumpf 1.1 and the semaphore is signaled.
90 */
91 class OutstandingRequestEntry
92 {
93 public:
94 OutstandingRequestEntry(
95 String originalMessageId_,
96 CIMRequestMessage* requestMessage_,
97 CIMResponseMessage*& responseMessage_,
98 Semaphore* responseReady_)
99 : originalMessageId(originalMessageId_),
100 requestMessage(requestMessage_),
101 responseMessage(responseMessage_),
102 responseReady(responseReady_)
103 {
104 }
105
106 /**
107 A unique value is substituted as the request messageId attribute to
108 allow responses to be definitively correllated with requests.
109 The original messageId value is stored here to avoid a race condition
110 kumpf 1.1 between the processing of a response chunk and the resetting of the
111 original messageId in the request message.
112 */
113 String originalMessageId;
114 CIMRequestMessage* requestMessage;
115 CIMResponseMessage*& responseMessage;
116 Semaphore* responseReady;
117 };
118
119 typedef HashTable<String, OutstandingRequestEntry*, EqualFunc<String>,
120 HashFunc<String> > OutstandingRequestTable;
121
122
123 /////////////////////////////////////////////////////////////////////////////
124 // ProviderAgentContainer
125 /////////////////////////////////////////////////////////////////////////////
126
127 class ProviderAgentContainer
128 {
129 public:
130 ProviderAgentContainer(
131 kumpf 1.1 const String & moduleName,
132 const String & userName,
133 Uint16 userContext,
134 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
135 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
136 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback,
137 Boolean subscriptionInitComplete);
138
139 ~ProviderAgentContainer();
140
141 Boolean isInitialized();
142
143 String getModuleName() const;
144
145 CIMResponseMessage* processMessage(CIMRequestMessage* request);
146 void unloadIdleProviders();
147
148 private:
149 //
150 // Private methods
151 //
152 kumpf 1.1
153 /** Unimplemented */
154 ProviderAgentContainer();
155 /** Unimplemented */
156 ProviderAgentContainer(const ProviderAgentContainer& pa);
157 /** Unimplemented */
158 ProviderAgentContainer& operator=(const ProviderAgentContainer& pa);
159
160 /**
161 Start a Provider Agent process and establish a pipe connection with it.
162 Note: The caller must lock the _agentMutex.
163 */
164 void _startAgentProcess();
165
166 /**
167 Send initialization data to the Provider Agent.
168 Note: The caller must lock the _agentMutex.
169 */
170 void _sendInitializationData();
171
172 /**
173 kumpf 1.1 Initialize the ProviderAgentContainer if it is not already
174 initialized. Initialization includes starting the Provider Agent
175 process, establishing a pipe connection with it, and starting a
176 thread to read response messages from the Provider Agent.
177
178 Note: The caller must lock the _agentMutex.
179 */
180 void _initialize();
181
182 /**
183 Uninitialize the ProviderAgentContainer if it is initialized.
184 The connection is closed and outstanding requests are completed
185 with an error result.
186
187 Note: The caller must lock the _agentMutex.
188
189 @param cleanShutdown Indicates whether the provider agent process
190 exited cleanly. A value of true indicates that responses have been
191 sent for all requests that have been processed. A value of false
192 indicates that one or more requests may have been partially processed.
193 */
194 kumpf 1.1 void _uninitialize(Boolean cleanShutdown);
195
196 /**
197 Performs the processMessage work, but does not retry on a transient
198 error.
199 */
200 CIMResponseMessage* _processMessage(CIMRequestMessage* request);
201
202 /**
203 Read and process response messages from the Provider Agent until
204 the connection is closed.
205 */
206 void _processResponses();
207 static ThreadReturnType PEGASUS_THREAD_CDECL
208 _responseProcessor(void* arg);
209
210 //
211 // Private data
212 //
213
214 /**
215 kumpf 1.1 The _agentMutex must be locked whenever writing to the Provider
216 Agent connection, accessing the _isInitialized flag, or changing
217 the Provider Agent state.
218 */
219 Mutex _agentMutex;
220
221 /**
222 Name of the provider module served by this Provider Agent.
223 */
224 String _moduleName;
225
226 /**
227 The user context in which this Provider Agent operates.
228 */
229 String _userName;
230
231 /**
232 User Context setting of the provider module served by this Provider
233 Agent.
234 */
235 Uint16 _userContext;
236 kumpf 1.1
237 /**
238 Callback function to which all generated indications are sent for
239 processing.
240 */
241 PEGASUS_INDICATION_CALLBACK_T _indicationCallback;
242
243 /**
244 Callback function to which response chunks are sent for processing.
245 */
246 PEGASUS_RESPONSE_CHUNK_CALLBACK_T _responseChunkCallback;
247
248 /**
249 Callback function to be called upon detection of failure of a
250 provider module.
251 */
252 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T _providerModuleFailCallback;
253
254 /**
255 Indicates whether the Provider Agent is active.
256 */
257 kumpf 1.1 Boolean _isInitialized;
258
259 /**
260 Pipe connection used to read responses from the Provider Agent.
261 */
262 AutoPtr<AnonymousPipe> _pipeFromAgent;
263 /**
264 Pipe connection used to write requests to the Provider Agent.
265 */
266 AutoPtr<AnonymousPipe> _pipeToAgent;
267
268 #if defined(PEGASUS_HAS_SIGNALS)
269 /**
270 Process ID of the active Provider Agent.
271 */
272 pid_t _pid;
273 #endif
274
275 /**
276 The _outstandingRequestTable holds an entry for each request that has
277 been sent to this Provider Agent for which no response has been
278 kumpf 1.1 received. Entries are added (by the writing thread) when a request
279 is sent, and are removed (by the reading thread) when the response is
280 received (or when it is determined that no response is forthcoming).
281 */
282 OutstandingRequestTable _outstandingRequestTable;
283 /**
284 The _outstandingRequestTableMutex must be locked whenever reading or
285 updating the _outstandingRequestTable.
286 */
287 Mutex _outstandingRequestTableMutex;
288
289 /**
290 Holds the last provider module instance sent to the Provider Agent in
291 a ProviderIdContainer. Since the provider module instance rarely
292 changes, an optimization is used to send it only when it differs from
293 the last provider module instance sent.
294 */
295 CIMInstance _providerModuleCache;
296
297 /**
298 The number of Provider Agent processes that are currently initialized
299 kumpf 1.1 (active).
300 */
301 static Uint32 _numProviderProcesses;
302
303 /**
304 The _numProviderProcessesMutex must be locked whenever reading or
305 updating the _numProviderProcesses count.
306 */
307 static Mutex _numProviderProcessesMutex;
308
309 /**
310 The maximum number of Provider Agent processes that may be initialized
311 (active) at one time.
312 */
313 static Uint32 _maxProviderProcesses;
314
315 /**
316 A value indicating that a request message has not been processed.
317 A CIMResponseMessage pointer with this value indicates that the
318 corresponding CIMRequestMessage has not been processed. This is
319 used to indicate that a provider agent exited without starting to
320 kumpf 1.1 process the request, and that the request should be retried.
321 */
322 static CIMResponseMessage* _REQUEST_NOT_PROCESSED;
323
324 /**
325 Indicates whether the Indication Service has completed initialization.
326
327 For more information, please see the description of the
328 ProviderManagerRouter::_subscriptionInitComplete member variable.
329 */
330 Boolean _subscriptionInitComplete;
331 };
332
333 Uint32 ProviderAgentContainer::_numProviderProcesses = 0;
334 Mutex ProviderAgentContainer::_numProviderProcessesMutex;
335 Uint32 ProviderAgentContainer::_maxProviderProcesses = PEG_NOT_FOUND;
336
337 // Set this to a value that no valid CIMResponseMessage* will have.
338 CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED =
339 reinterpret_cast<CIMResponseMessage*>(&_REQUEST_NOT_PROCESSED);
340
341 kumpf 1.1 ProviderAgentContainer::ProviderAgentContainer(
342 const String & moduleName,
343 const String & userName,
344 Uint16 userContext,
345 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
346 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
347 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback,
348 Boolean subscriptionInitComplete)
349 : _moduleName(moduleName),
350 _userName(userName),
351 _userContext(userContext),
352 _indicationCallback(indicationCallback),
353 _responseChunkCallback(responseChunkCallback),
354 _providerModuleFailCallback(providerModuleFailCallback),
355 _isInitialized(false),
356 _subscriptionInitComplete(subscriptionInitComplete)
357 {
358 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
359 "ProviderAgentContainer::ProviderAgentContainer");
360 PEG_METHOD_EXIT();
361 }
362 kumpf 1.1
363 ProviderAgentContainer::~ProviderAgentContainer()
364 {
365 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
366 "ProviderAgentContainer::~ProviderAgentContainer");
367
368 // Ensure the destructor does not throw an exception
369 try
370 {
371 if (isInitialized())
372 {
373 // Stop the responseProcessor thread by closing its connection
374 _pipeFromAgent->closeReadHandle();
375
376 // Wait for the responseProcessor thread to exit
377 while (isInitialized())
378 {
379 Threads::yield();
380 }
381 }
382 }
383 kumpf 1.1 catch (...)
384 {
385 }
386
387 PEG_METHOD_EXIT();
388 }
389
390 void ProviderAgentContainer::_startAgentProcess()
391 {
392 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
393 "ProviderAgentContainer::_startAgentProcess");
394
395 //
396 // Serialize the starting of agent processes. If two agent processes are
397 // started at the same time, they may get copies of each other's pipe
398 // descriptors. If this happens, the cimserver will not get a pipe read
399 // error when one of the agent processes exits, because the pipe will
400 // still be writable by the other process. This locking control needs to
401 // cover the period from where the pipes are created to where the agent
402 // ends of the pipes are closed by the cimserver.
403 //
404 kumpf 1.1 static Mutex agentStartupMutex;
405 AutoMutex lock(agentStartupMutex);
406
407 AutoPtr<AnonymousPipe> pipeFromAgent(new AnonymousPipe());
408 AutoPtr<AnonymousPipe> pipeToAgent(new AnonymousPipe());
409
410 //
411 // Start a cimprovagt process for this provider module
412 //
413
414 #if defined (PEGASUS_OS_TYPE_WINDOWS)
415 //
416 // Set up members of the PROCESS_INFORMATION structure
417 //
418 PROCESS_INFORMATION piProcInfo;
419 ZeroMemory (&piProcInfo, sizeof (PROCESS_INFORMATION));
420
421 //
422 // Set up members of the STARTUPINFO structure
423 //
424 STARTUPINFO siStartInfo;
425 kumpf 1.1 ZeroMemory (&siStartInfo, sizeof (STARTUPINFO));
426 siStartInfo.cb = sizeof (STARTUPINFO);
427
428 //
429 // Generate the command line
430 //
431 char cmdLine[2048];
432 char readHandle[32];
433 char writeHandle[32];
434 pipeToAgent->exportReadHandle(readHandle);
435 pipeFromAgent->exportWriteHandle(writeHandle);
436
437 sprintf(cmdLine, "\"%s\" %s %s \"%s\"",
438 (const char*)ConfigManager::getHomedPath(
439 PEGASUS_PROVIDER_AGENT_PROC_NAME).getCString(),
440 readHandle, writeHandle, (const char*)_moduleName.getCString());
441
442 //
443 // Create the child process
444 //
445 if (!CreateProcess (
446 kumpf 1.1 NULL, //
447 cmdLine, // command line
448 NULL, // process security attributes
449 NULL, // primary thread security attributes
450 TRUE, // handles are inherited
451 0, // creation flags
452 NULL, // use parent's environment
453 NULL, // use parent's current directory
454 &siStartInfo, // STARTUPINFO
455 &piProcInfo)) // PROCESS_INFORMATION
456 {
457 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
458 "CreateProcess() failed. errno = %d.", GetLastError());
459 PEG_METHOD_EXIT();
460 throw Exception(MessageLoaderParms(
461 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
462 "Failed to start cimprovagt \"$0\".",
463 _moduleName));
464 }
465
466 CloseHandle(piProcInfo.hProcess);
467 kumpf 1.1 CloseHandle(piProcInfo.hThread);
468
469 #elif defined (PEGASUS_OS_VMS)
470
471 //
472 // fork and exec the child process
473 //
474 int status;
475
476 status = vfork ();
477 switch (status)
478 {
479 case 0:
480 try
481 {
482 //
483 // Execute the cimprovagt program
484 //
485 String agentCommandPath =
486 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
487 CString agentCommandPathCString = agentCommandPath.getCString();
488 kumpf 1.1
489 char readHandle[32];
490 char writeHandle[32];
491 pipeToAgent->exportReadHandle(readHandle);
492 pipeFromAgent->exportWriteHandle(writeHandle);
493
494 if ((status = execl(agentCommandPathCString, agentCommandPathCString,
495 readHandle, writeHandle,
496 (const char*)_moduleName.getCString(), (char*)0)) == -1);
497 {
498 // If we're still here, there was an error
499 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
500 "execl() failed. errno = %d.", errno);
501 _exit(1);
502 }
503 }
504 catch (...)
505 {
506 // There's not much we can do here in no man's land
507 try
508 {
509 kumpf 1.1 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
510 "Caught exception before calling execl().");
511 }
512 catch (...)
513 {
514 }
515 _exit(1);
516 }
517 PEG_METHOD_EXIT();
518 return;
519 break;
520
521 case -1:
522 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
523 "fork() failed. errno = %d.", errno);
524 PEG_METHOD_EXIT();
525 throw Exception(MessageLoaderParms(
526 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
527 "Failed to start cimprovagt \"$0\".",
528 _moduleName));
529 break;
530 kumpf 1.1
531 default:
532 // Close our copies of the agent's ends of the pipes
533 pipeToAgent->closeReadHandle();
534 pipeFromAgent->closeWriteHandle();
535
536 _pipeToAgent.reset(pipeToAgent.release());
537 _pipeFromAgent.reset(pipeFromAgent.release());
538
539 PEG_METHOD_EXIT();
540 }
541 #elif defined (PEGASUS_OS_OS400)
542
543 //Out of process provider support for OS400 goes here when needed.
544
545 #else
546
547 # ifndef PEGASUS_DISABLE_PROV_USERCTXT
548 // Get and save the effective user name and the uid/gid for the user
549 // context of the agent process
550
551 kumpf 1.1 String effectiveUserName = System::getEffectiveUserName();
552 PEGASUS_UID_T newUid = (PEGASUS_UID_T) -1;
553 PEGASUS_GID_T newGid = (PEGASUS_GID_T) -1;
554 if (_userName != effectiveUserName)
555 {
556 if (!System::lookupUserId(_userName.getCString(), newUid, newGid))
557 {
558 throw PEGASUS_CIM_EXCEPTION_L(
559 CIM_ERR_FAILED,
560 MessageLoaderParms(
561 "ProviderManager.OOPProviderManagerRouter."
562 "USER_CONTEXT_CHANGE_FAILED",
563 "Unable to change user context to \"$0\".", _userName));
564 }
565 }
566 # endif
567
568 pid_t pid = fork();
569 if (pid < 0)
570 {
571 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
572 kumpf 1.1 "fork() failed. errno = %d.", errno);
573 PEG_METHOD_EXIT();
574 throw Exception(MessageLoaderParms(
575 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
576 "Failed to start cimprovagt \"$0\".",
577 _moduleName));
578 }
579 else if (pid == 0)
580 {
581 //
582 // Child side of the fork
583 //
584
585 try
586 {
587 // Close our copies of the parent's ends of the pipes
588 pipeToAgent->closeWriteHandle();
589 pipeFromAgent->closeReadHandle();
590
591 //
592 // Execute the cimprovagt program
593 kumpf 1.1 //
594 String agentCommandPath =
595 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
596 CString agentCommandPathCString = agentCommandPath.getCString();
597
598 char readHandle[32];
599 char writeHandle[32];
600 pipeToAgent->exportReadHandle(readHandle);
601 pipeFromAgent->exportWriteHandle(writeHandle);
602
603 # ifndef PEGASUS_DISABLE_PROV_USERCTXT
604 // Set the user context of the Provider Agent process
605 if (_userName != effectiveUserName)
606 {
607 if (!System::changeUserContext(newUid, newGid))
608 {
609 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
610 "System::changeUserContext() failed. userName = %s.",
611 (const char*)_userName.getCString());
612 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
613 Logger::WARNING,
614 kumpf 1.1 "ProviderManager.OOPProviderManagerRouter."
615 "USER_CONTEXT_CHANGE_FAILED",
616 "Unable to change user context to \"$0\".", _userName);
617 _exit(1);
618 }
619 }
620 # endif
621
|
622 kumpf 1.5 // Close all file descriptors except stdin/stdout/stderr
623 // and the pipe handles needed by the Provider Agent process.
624
625 Uint32 readFd = atoi(readHandle);
626 Uint32 writeFd = atoi(writeHandle);
627 struct rlimit fileLimit;
628
629 if (getrlimit(RLIMIT_NOFILE, &fileLimit) == 0)
630 {
631 Uint32 maxFd = (Uint32)fileLimit.rlim_cur;
632 for (Uint32 i = 3; i < maxFd - 1; i++)
633 {
634 if ((i != readFd) && (i != writeFd))
635 {
636 close(i);
637 }
638 }
639 }
640
|
641 kumpf 1.1 execl(agentCommandPathCString, agentCommandPathCString,
642 readHandle, writeHandle,
643 (const char*)_moduleName.getCString(), (char*)0);
644
645 // If we're still here, there was an error
646 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
647 "execl() failed. errno = %d.", errno);
648 _exit(1);
649 }
650 catch (...)
651 {
652 // There's not much we can do here in no man's land
653 try
654 {
655 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
656 "Caught exception before calling execl().");
657 }
658 catch (...) {}
659 _exit(1);
660 }
661 }
662 kumpf 1.1 # if defined(PEGASUS_HAS_SIGNALS)
663 _pid = pid;
664 # endif
665 #endif
666
667 //
668 // CIM Server process
669 //
670
671 // Close our copies of the agent's ends of the pipes
672 pipeToAgent->closeReadHandle();
673 pipeFromAgent->closeWriteHandle();
674
675 _pipeToAgent.reset(pipeToAgent.release());
676 _pipeFromAgent.reset(pipeFromAgent.release());
677
678 PEG_METHOD_EXIT();
679 }
680
681 // Note: Caller must lock _agentMutex
682 void ProviderAgentContainer::_sendInitializationData()
683 kumpf 1.1 {
684 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
685 "ProviderAgentContainer::_sendInitializationData");
686
687 //
688 // Gather config properties to pass to the Provider Agent
689 //
690 ConfigManager* configManager = ConfigManager::getInstance();
691 Array<Pair<String, String> > configProperties;
692
693 Array<String> configPropertyNames;
694 configManager->getAllPropertyNames(configPropertyNames, true);
695 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
696 {
697 String configPropertyValue =
698 configManager->getCurrentValue(configPropertyNames[i]);
699 String configPropertyDefaultValue =
700 configManager->getDefaultValue(configPropertyNames[i]);
701 if (configPropertyValue != configPropertyDefaultValue)
702 {
703 configProperties.append(Pair<String, String>(
704 kumpf 1.1 configPropertyNames[i], configPropertyValue));
705 }
706 }
707
708 //
709 // Create a Provider Agent initialization message
710 //
711 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
712 new CIMInitializeProviderAgentRequestMessage(
713 String("0"), // messageId
714 configManager->getPegasusHome(),
715 configProperties,
716 System::bindVerbose,
717 _subscriptionInitComplete,
718 QueueIdStack()));
719
720 //
721 // Write the initialization message to the pipe
722 //
723 AnonymousPipe::Status writeStatus =
724 _pipeToAgent->writeMessage(request.get());
725 kumpf 1.1
726 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
727 {
728 PEG_METHOD_EXIT();
729 throw Exception(MessageLoaderParms(
730 "ProviderManager.OOPProviderManagerRouter."
731 "CIMPROVAGT_COMMUNICATION_FAILED",
732 "Failed to communicate with cimprovagt \"$0\".",
733 _moduleName));
734 }
735
736 // Wait for a null response from the Provider Agent indicating it has
737 // initialized successfully.
738
739 CIMMessage* message;
740 AnonymousPipe::Status readStatus;
741 do
742 {
743 readStatus = _pipeFromAgent->readMessage(message);
744 } while (readStatus == AnonymousPipe::STATUS_INTERRUPT);
745
746 kumpf 1.1 if (readStatus != AnonymousPipe::STATUS_SUCCESS)
747 {
748 PEG_METHOD_EXIT();
749 throw Exception(MessageLoaderParms(
750 "ProviderManager.OOPProviderManagerRouter."
751 "CIMPROVAGT_COMMUNICATION_FAILED",
752 "Failed to communicate with cimprovagt \"$0\".",
753 _moduleName));
754 }
755
756 PEGASUS_ASSERT(message == 0);
757
758 PEG_METHOD_EXIT();
759 }
760
761 // Note: Caller must lock _agentMutex
762 void ProviderAgentContainer::_initialize()
763 {
764 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
765 "ProviderAgentContainer::_initialize");
766
767 kumpf 1.1 if (_isInitialized)
768 {
769 PEGASUS_ASSERT(0);
770 PEG_METHOD_EXIT();
771 return;
772 }
773
774 if (_maxProviderProcesses == PEG_NOT_FOUND)
775 {
776 String maxProviderProcesses = ConfigManager::getInstance()->
777 getCurrentValue("maxProviderProcesses");
778 CString maxProviderProcessesString = maxProviderProcesses.getCString();
779 char* end = 0;
780 _maxProviderProcesses = strtol(maxProviderProcessesString, &end, 10);
781 }
782
783 {
784 AutoMutex lock(_numProviderProcessesMutex);
785 if ((_maxProviderProcesses != 0) &&
786 (_numProviderProcesses >= _maxProviderProcesses))
787 {
788 kumpf 1.1 throw PEGASUS_CIM_EXCEPTION(
789 CIM_ERR_FAILED,
790 MessageLoaderParms(
791 "ProviderManager.OOPProviderManagerRouter."
792 "MAX_PROVIDER_PROCESSES_REACHED",
793 "The maximum number of cimprovagt processes has been "
794 "reached."));
795 }
796 else
797 {
798 _numProviderProcesses++;
799 }
800 }
801
802 try
803 {
804 _startAgentProcess();
805
806 _isInitialized = true;
807
808 _sendInitializationData();
809 kumpf 1.1
810 // Start a thread to read and process responses from the Provider Agent
811 ThreadStatus rtn = PEGASUS_THREAD_OK;
812 while ((rtn = MessageQueueService::get_thread_pool()->
813 allocate_and_awaken(this, _responseProcessor)) !=
814 PEGASUS_THREAD_OK)
815 {
816 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
817 {
818 Threads::yield();
819 }
820 else
821 {
822 Logger::put(
823 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
824 "Not enough threads to process responses from the "
825 "provider agent.");
826
827 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
828 "Could not allocate thread to process responses from the "
829 "provider agent.");
830 kumpf 1.1
831 throw Exception(MessageLoaderParms(
832 "ProviderManager.OOPProviderManagerRouter."
833 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
834 "Failed to allocate thread for cimprovagt \"$0\".",
835 _moduleName));
836 }
837 }
838 }
839 catch (...)
840 {
841 // Closing the connection causes the agent process to exit
842 _pipeToAgent.reset();
843 _pipeFromAgent.reset();
844
845 #if defined(PEGASUS_HAS_SIGNALS)
846 if (_isInitialized)
847 {
848 // Harvest the status of the agent process to prevent a zombie
|
849 kumpf 1.4 pid_t status = 0;
|
850 kumpf 1.1 do
851 {
|
852 kumpf 1.4 status = waitpid(_pid, 0, 0);
853 } while ((status == -1) && (errno == EINTR));
854
855 if (status == -1)
856 {
857 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
858 "ProviderAgentContainer::_initialize(): "
859 "waitpid failed; errno = %d.", errno);
860 }
|
861 kumpf 1.1 }
862 #endif
863
864 _isInitialized = false;
865
866 {
867 AutoMutex lock(_numProviderProcessesMutex);
868 _numProviderProcesses--;
869 }
870
871 PEG_METHOD_EXIT();
872 throw;
873 }
874
875 PEG_METHOD_EXIT();
876 }
877
878 Boolean ProviderAgentContainer::isInitialized()
879 {
880 AutoMutex lock(_agentMutex);
881 return _isInitialized;
882 kumpf 1.1 }
883
884 // Note: Caller must lock _agentMutex
885 void ProviderAgentContainer::_uninitialize(Boolean cleanShutdown)
886 {
887 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
888 "ProviderAgentContainer::_uninitialize");
889
890 if (!_isInitialized)
891 {
892 PEGASUS_ASSERT(0);
893 PEG_METHOD_EXIT();
894 return;
895 }
896
897 try
898 {
899 // Close the connection with the Provider Agent
900 _pipeFromAgent.reset();
901 _pipeToAgent.reset();
902
903 kumpf 1.1 _providerModuleCache = CIMInstance();
904
905 {
906 AutoMutex lock(_numProviderProcessesMutex);
907 _numProviderProcesses--;
908 }
909
910 #if defined(PEGASUS_HAS_SIGNALS)
911 // Harvest the status of the agent process to prevent a zombie
|
912 kumpf 1.4 pid_t status = 0;
|
913 kumpf 1.1 do
914 {
|
915 kumpf 1.4 status = waitpid(_pid, 0, 0);
916 } while ((status == -1) && (errno == EINTR));
917
918 if (status == -1)
919 {
920 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
921 "ProviderAgentContainer::_uninitialize(): "
922 "waitpid failed; errno = %d.", errno);
923 }
|
924 kumpf 1.1 #endif
925
926 _isInitialized = false;
927
928 //
929 // Complete with null responses all outstanding requests on this
930 // connection
931 //
932 {
933 AutoMutex tableLock(_outstandingRequestTableMutex);
934
935 CIMResponseMessage* response =
936 cleanShutdown ? _REQUEST_NOT_PROCESSED : 0;
937
938 for (OutstandingRequestTable::Iterator i =
939 _outstandingRequestTable.start();
940 i != 0; i++)
941 {
942 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
943 String("Completing messageId \"") + i.key() +
944 "\" with a null response.");
945 kumpf 1.1 i.value()->responseMessage = response;
946 i.value()->responseReady->signal();
947 }
948
949 _outstandingRequestTable.clear();
|
950 kumpf 1.3 }
|
951 kumpf 1.1
|
952 kumpf 1.3 //
953 // If not a clean shutdown, call the provider module failure callback
954 //
955 if (!cleanShutdown)
956 {
|
957 kumpf 1.1 //
|
958 kumpf 1.3 // Call the provider module failure callback to communicate
959 // the failure to the Provider Manager Service. The Provider
960 // Manager Service will inform the Indication Service.
|
961 kumpf 1.1 //
|
962 kumpf 1.3 _providerModuleFailCallback(_moduleName, _userName, _userContext);
|
963 kumpf 1.1 }
964 }
965 catch (...)
966 {
967 // We're uninitializing, so do not propagate the exception
968 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
969 "Ignoring _uninitialize() exception.");
970 }
971
972 PEG_METHOD_EXIT();
973 }
974
975 String ProviderAgentContainer::getModuleName() const
976 {
977 return _moduleName;
978 }
979
980 CIMResponseMessage* ProviderAgentContainer::processMessage(
981 CIMRequestMessage* request)
982 {
983 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
984 kumpf 1.1 "ProviderAgentContainer::processMessage");
985
986 CIMResponseMessage* response;
987
988 do
989 {
990 response = _processMessage(request);
991
992 if (response == _REQUEST_NOT_PROCESSED)
993 {
994 // Check for request message types that should not be retried.
995 if ((request->getType() ==
996 CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
997 (request->getType() ==
998 CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) ||
999 (request->getType() ==
1000 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
1001 (request->getType() ==
1002 CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE))
1003 {
1004 response = request->buildResponse();
1005 kumpf 1.1 break;
1006 }
1007 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1008 {
1009 CIMDisableModuleResponseMessage* dmResponse =
1010 dynamic_cast<CIMDisableModuleResponseMessage*>(response);
1011 PEGASUS_ASSERT(dmResponse != 0);
1012
1013 Array<Uint16> operationalStatus;
1014 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1015 dmResponse->operationalStatus = operationalStatus;
1016 break;
1017 }
1018 }
1019 } while (response == _REQUEST_NOT_PROCESSED);
1020
|
1021 kumpf 1.2 if (request->getType() == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1022 {
1023 _subscriptionInitComplete = true;
1024 }
1025
|
1026 kumpf 1.1 PEG_METHOD_EXIT();
1027 return response;
1028 }
1029
1030 CIMResponseMessage* ProviderAgentContainer::_processMessage(
1031 CIMRequestMessage* request)
1032 {
1033 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1034 "ProviderAgentContainer::_processMessage");
1035
1036 CIMResponseMessage* response;
1037 String originalMessageId = request->messageId;
1038
1039 // These three variables are used for the provider module optimization.
1040 // See the _providerModuleCache member description for more information.
1041 AutoPtr<ProviderIdContainer> origProviderId;
1042 Boolean doProviderModuleOptimization = false;
1043 Boolean updateProviderModuleCache = false;
1044
1045 try
1046 {
1047 kumpf 1.1 // The messageId attribute is used to correlate response messages
1048 // from the Provider Agent with request messages, so it is imperative
1049 // that the ID is unique for each request. The incoming ID cannot be
1050 // trusted to be unique, so we substitute a unique one. The memory
1051 // address of the request is used as the source of a unique piece of
1052 // data. (The message ID is only required to be unique while the
1053 // request is outstanding.)
1054 char messagePtrString[20];
1055 sprintf(messagePtrString, "%p", request);
1056 String uniqueMessageId = messagePtrString;
1057
1058 //
1059 // Set up the OutstandingRequestEntry for this request
1060 //
1061 Semaphore waitSemaphore(0);
1062 OutstandingRequestEntry outstandingRequestEntry(
1063 originalMessageId, request, response, &waitSemaphore);
1064
1065 //
1066 // Lock the Provider Agent Container while initializing the
1067 // agent and writing the request to the connection
1068 kumpf 1.1 //
1069 {
1070 AutoMutex lock(_agentMutex);
1071
1072 //
1073 // Initialize the Provider Agent, if necessary
1074 //
1075 if (!_isInitialized)
1076 {
1077 _initialize();
1078 }
1079
1080 //
1081 // Add an entry to the OutstandingRequestTable for this request
1082 //
1083 {
1084 AutoMutex tableLock(_outstandingRequestTableMutex);
1085
1086 _outstandingRequestTable.insert(
1087 uniqueMessageId, &outstandingRequestEntry);
1088 }
1089 kumpf 1.1
1090 // Get the provider module from the ProviderIdContainer to see if
1091 // we can optimize out the transmission of this instance to the
1092 // Provider Agent. (See the _providerModuleCache description.)
1093 if(request->operationContext.contains(ProviderIdContainer::NAME))
1094 {
1095 ProviderIdContainer pidc = request->operationContext.get(
1096 ProviderIdContainer::NAME);
1097 origProviderId.reset(new ProviderIdContainer(
1098 pidc.getModule(), pidc.getProvider(),
1099 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1100 if (_providerModuleCache.isUninitialized() ||
1101 (!pidc.getModule().identical(_providerModuleCache)))
1102 {
1103 // We haven't sent this provider module instance to the
1104 // Provider Agent yet. Update our cache after we send it.
1105 updateProviderModuleCache = true;
1106 }
1107 else
1108 {
1109 // Replace the provider module in the ProviderIdContainer
1110 kumpf 1.1 // with an uninitialized instance. We'll need to put the
1111 // original one back after the message is sent.
1112 request->operationContext.set(ProviderIdContainer(
1113 CIMInstance(), pidc.getProvider(),
1114 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1115 doProviderModuleOptimization = true;
1116 }
1117 }
1118
1119 //
1120 // Write the message to the pipe
1121 //
1122 try
1123 {
1124 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3,
1125 String("Sending request to agent with messageId ") +
1126 uniqueMessageId);
1127
1128 request->messageId = uniqueMessageId;
1129 AnonymousPipe::Status writeStatus =
1130 _pipeToAgent->writeMessage(request);
1131 kumpf 1.1 request->messageId = originalMessageId;
1132
1133 if (doProviderModuleOptimization)
1134 {
1135 request->operationContext.set(*origProviderId.get());
1136 }
1137
1138 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
1139 {
1140 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1141 "Failed to write message to pipe. writeStatus = %d.",
1142 writeStatus);
1143
1144 request->messageId = originalMessageId;
1145
1146 if (doProviderModuleOptimization)
1147 {
1148 request->operationContext.set(*origProviderId.get());
1149 }
1150
1151 // Remove this OutstandingRequestTable entry
1152 kumpf 1.1 {
1153 AutoMutex tableLock(_outstandingRequestTableMutex);
1154 Boolean removed =
1155 _outstandingRequestTable.remove(uniqueMessageId);
1156 PEGASUS_ASSERT(removed);
1157 }
1158
1159 // A response value of _REQUEST_NOT_PROCESSED indicates
1160 // that the request was not processed by the provider
1161 // agent, so it can be retried safely.
1162 PEG_METHOD_EXIT();
1163 return _REQUEST_NOT_PROCESSED;
1164 }
1165
1166 if (updateProviderModuleCache)
1167 {
1168 _providerModuleCache = origProviderId->getModule();
1169 }
1170 }
1171 catch (...)
1172 {
1173 kumpf 1.1 request->messageId = originalMessageId;
1174
1175 if (doProviderModuleOptimization)
1176 {
1177 request->operationContext.set(*origProviderId.get());
1178 }
1179
1180 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1181 "Failed to write message to pipe.");
1182 // Remove the OutstandingRequestTable entry for this request
1183 {
1184 AutoMutex tableLock(_outstandingRequestTableMutex);
1185 Boolean removed =
1186 _outstandingRequestTable.remove(uniqueMessageId);
1187 PEGASUS_ASSERT(removed);
1188 }
1189 PEG_METHOD_EXIT();
1190 throw;
1191 }
1192 }
1193
1194 kumpf 1.1 //
1195 // Wait for the response
1196 //
1197 try
1198 {
1199 // Must not hold _agentMutex while waiting for the response
1200 waitSemaphore.wait();
1201 }
1202 catch (...)
1203 {
1204 // Remove the OutstandingRequestTable entry for this request
1205 {
1206 AutoMutex tableLock(_outstandingRequestTableMutex);
1207 Boolean removed =
1208 _outstandingRequestTable.remove(uniqueMessageId);
1209 PEGASUS_ASSERT(removed);
1210 }
1211 PEG_METHOD_EXIT();
1212 throw;
1213 }
1214
1215 kumpf 1.1 // A response value of _REQUEST_NOT_PROCESSED indicates that the
1216 // provider agent process was terminating when the request was sent.
1217 // The request was not processed by the provider agent, so it can be
1218 // retried safely.
1219 if (response == _REQUEST_NOT_PROCESSED)
1220 {
1221 PEG_METHOD_EXIT();
1222 return response;
1223 }
1224
1225 // A null response is returned when an agent connection is closed
1226 // while requests remain outstanding.
1227 if (response == 0)
1228 {
1229 response = request->buildResponse();
1230 response->cimException = PEGASUS_CIM_EXCEPTION(
1231 CIM_ERR_FAILED,
1232 MessageLoaderParms(
1233 "ProviderManager.OOPProviderManagerRouter."
1234 "CIMPROVAGT_CONNECTION_LOST",
1235 "Lost connection with cimprovagt \"$0\".",
1236 kumpf 1.1 _moduleName));
1237 }
1238 }
1239 catch (CIMException& e)
1240 {
1241 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1242 String("Caught exception: ") + e.getMessage());
1243 response = request->buildResponse();
1244 response->cimException = e;
1245 }
1246 catch (Exception& e)
1247 {
1248 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1249 String("Caught exception: ") + e.getMessage());
1250 response = request->buildResponse();
1251 response->cimException = PEGASUS_CIM_EXCEPTION(
1252 CIM_ERR_FAILED, e.getMessage());
1253 }
1254 catch (...)
1255 {
1256 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1257 kumpf 1.1 "Caught unknown exception");
1258 response = request->buildResponse();
1259 response->cimException = PEGASUS_CIM_EXCEPTION(
1260 CIM_ERR_FAILED, String::EMPTY);
1261 }
1262
1263 response->messageId = originalMessageId;
1264
1265 PEG_METHOD_EXIT();
1266 return response;
1267 }
1268
1269 void ProviderAgentContainer::unloadIdleProviders()
1270 {
1271 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1272 "ProviderAgentContainer::unloadIdleProviders");
1273
1274 AutoMutex lock(_agentMutex);
1275 if (_isInitialized)
1276 {
1277 // Send a "wake up" message to the Provider Agent.
1278 kumpf 1.1 // Don't bother checking whether the operation is successful.
1279 Uint32 messageLength = 0;
1280 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
1281 }
1282
1283 PEG_METHOD_EXIT();
1284 }
1285
1286 void ProviderAgentContainer::_processResponses()
1287 {
1288 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1289 "ProviderAgentContainer::_processResponses");
1290
1291 //
1292 // Process responses until the pipe is closed
1293 //
1294 while (1)
1295 {
1296 try
1297 {
1298 CIMMessage* message;
1299 kumpf 1.1
1300 //
1301 // Read a response from the Provider Agent
1302 //
1303 AnonymousPipe::Status readStatus =
1304 _pipeFromAgent->readMessage(message);
1305
1306 // Ignore interrupts
1307 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
1308 {
1309 continue;
1310 }
1311
1312 // Handle an error the same way as a closed connection
1313 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
1314 (readStatus == AnonymousPipe::STATUS_CLOSED))
1315 {
1316 AutoMutex lock(_agentMutex);
1317 _uninitialize(false);
1318 return;
1319 }
1320 kumpf 1.1
1321 // A null message indicates that the provider agent process has
1322 // finished its processing and is ready to exit.
1323 if (message == 0)
1324 {
1325 AutoMutex lock(_agentMutex);
1326 _uninitialize(true);
1327 return;
1328 }
1329
1330 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
1331 {
1332 // Forward indications to the indication callback
1333 _indicationCallback(
1334 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
1335 message));
1336 }
1337 else if (!message->isComplete())
1338 {
1339 CIMResponseMessage* response;
1340 response = dynamic_cast<CIMResponseMessage*>(message);
1341 kumpf 1.1 PEGASUS_ASSERT(response != 0);
1342
1343 // Get the OutstandingRequestEntry for this response chunk
1344 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1345 {
1346 AutoMutex tableLock(_outstandingRequestTableMutex);
1347 Boolean foundEntry = _outstandingRequestTable.lookup(
1348 response->messageId, _outstandingRequestEntry);
1349 PEGASUS_ASSERT(foundEntry);
1350 }
1351
1352 // Put the original message ID into the response
1353 response->messageId =
1354 _outstandingRequestEntry->originalMessageId;
1355
1356 // Call the response chunk callback to process the chunk
1357 _responseChunkCallback(
1358 _outstandingRequestEntry->requestMessage, response);
1359 }
1360 else
1361 {
1362 kumpf 1.1 CIMResponseMessage* response;
1363 response = dynamic_cast<CIMResponseMessage*>(message);
1364 PEGASUS_ASSERT(response != 0);
1365
1366 // Give the response to the waiting OutstandingRequestEntry
1367 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1368 {
1369 AutoMutex tableLock(_outstandingRequestTableMutex);
1370 Boolean foundEntry = _outstandingRequestTable.lookup(
1371 response->messageId, _outstandingRequestEntry);
1372 PEGASUS_ASSERT(foundEntry);
1373
1374 // Remove the completed request from the table
1375 Boolean removed =
1376 _outstandingRequestTable.remove(response->messageId);
1377 PEGASUS_ASSERT(removed);
1378 }
1379
1380 _outstandingRequestEntry->responseMessage = response;
1381 _outstandingRequestEntry->responseReady->signal();
1382 }
1383 kumpf 1.1 }
1384 catch (Exception& e)
1385 {
1386 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1387 String("Ignoring exception: ") + e.getMessage());
1388 }
1389 catch (...)
1390 {
1391 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1392 "Ignoring exception");
1393 }
1394 }
1395
1396 }
1397
1398 ThreadReturnType PEGASUS_THREAD_CDECL
1399 ProviderAgentContainer::_responseProcessor(void* arg)
1400 {
1401 ProviderAgentContainer* pa =
1402 reinterpret_cast<ProviderAgentContainer*>(arg);
1403
1404 kumpf 1.1 pa->_processResponses();
1405
1406 return(ThreadReturnType(0));
1407 }
1408
1409 /////////////////////////////////////////////////////////////////////////////
1410 // OOPProviderManagerRouter
1411 /////////////////////////////////////////////////////////////////////////////
1412
1413 OOPProviderManagerRouter::OOPProviderManagerRouter(
1414 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
1415 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
1416 PEGASUS_PROVIDERMODULEFAIL_CALLBACK_T providerModuleFailCallback)
1417 {
1418 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1419 "OOPProviderManagerRouter::OOPProviderManagerRouter");
1420
1421 _indicationCallback = indicationCallback;
1422 _responseChunkCallback = responseChunkCallback;
1423 _providerModuleFailCallback = providerModuleFailCallback;
1424 _subscriptionInitComplete = false;
1425 kumpf 1.1
1426 PEG_METHOD_EXIT();
1427 }
1428
1429 OOPProviderManagerRouter::~OOPProviderManagerRouter()
1430 {
1431 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1432 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
1433
1434 try
1435 {
1436 // Clean up the ProviderAgentContainers
1437 AutoMutex lock(_providerAgentTableMutex);
1438 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1439 for(; i != 0; i++)
1440 {
1441 delete i.value();
1442 }
1443 }
1444 catch (...) {}
1445
1446 kumpf 1.1 PEG_METHOD_EXIT();
1447 }
1448
1449 Message* OOPProviderManagerRouter::processMessage(Message* message)
1450 {
1451 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1452 "OOPProviderManagerRouter::processMessage");
1453
1454 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
1455 PEGASUS_ASSERT(request != 0);
1456
1457 AutoPtr<CIMResponseMessage> response;
1458
1459 //
1460 // Get the provider information from the request
1461 //
1462 CIMInstance providerModule;
1463
1464 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
1465 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
1466 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
1467 kumpf 1.1 {
1468 // Provider information is in the OperationContext
1469 ProviderIdContainer pidc = (ProviderIdContainer)
1470 request->operationContext.get(ProviderIdContainer::NAME);
1471 providerModule = pidc.getModule();
1472 }
1473 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1474 {
1475 CIMEnableModuleRequestMessage* emReq =
1476 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
1477 providerModule = emReq->providerModule;
1478 }
1479 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1480 {
1481 CIMDisableModuleRequestMessage* dmReq =
1482 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
1483 providerModule = dmReq->providerModule;
1484 }
1485 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
1486 (request->getType() ==
1487 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
1488 kumpf 1.1 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1489 {
1490 // This operation is not provider-specific
1491 }
1492 else
1493 {
1494 // Unrecognized message type. This should never happen.
1495 PEGASUS_ASSERT(0);
1496 response.reset(request->buildResponse());
1497 response->cimException = PEGASUS_CIM_EXCEPTION(
1498 CIM_ERR_FAILED, "Unrecognized message type.");
1499 PEG_METHOD_EXIT();
1500 return response.release();
1501 }
1502
1503 //
1504 // Process the request message
1505 //
1506 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1507 {
1508 // Forward the CIMStopAllProvidersRequest to all providers
1509 kumpf 1.1 response.reset(_forwardRequestToAllAgents(request));
1510
1511 // Note: Do not uninitialize the ProviderAgentContainers here.
1512 // Just let the selecting thread notice when the agent connections
1513 // are closed.
1514 }
1515 else if (request->getType () ==
1516 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1517 {
1518 _subscriptionInitComplete = true;
1519
1520 //
1521 // Forward the CIMSubscriptionInitCompleteRequestMessage to
1522 // all providers
1523 //
1524 response.reset (_forwardRequestToAllAgents (request));
1525 }
1526 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1527 {
1528 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1529 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1530 kumpf 1.1 PEGASUS_ASSERT(notifyRequest != 0);
1531
1532 if (notifyRequest->currentValueModified)
1533 {
1534 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1535 response.reset(_forwardRequestToAllAgents(request));
1536 }
1537 else
1538 {
1539 // No need to notify provider agents about changes to planned value
1540 response.reset(request->buildResponse());
1541 }
1542 }
1543 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1544 {
1545 // Fan out the request to all Provider Agent processes for this module
1546
1547 // Retrieve the provider module name
1548 String moduleName;
1549 CIMValue nameValue = providerModule.getProperty(
1550 providerModule.findProperty("Name")).getValue();
1551 kumpf 1.1 nameValue.get(moduleName);
1552
1553 // Look up the Provider Agents for this module
1554 Array<ProviderAgentContainer*> paArray =
1555 _lookupProviderAgents(moduleName);
1556
1557 for (Uint32 i=0; i<paArray.size(); i++)
1558 {
1559 //
1560 // Do not start up an agent process just to disable the module
1561 //
1562 if (paArray[i]->isInitialized())
1563 {
1564 //
1565 // Forward the request to the provider agent
1566 //
1567 response.reset(paArray[i]->processMessage(request));
1568
1569 // Note: Do not uninitialize the ProviderAgentContainer here
1570 // when a disable module operation is successful. Just let the
1571 // selecting thread notice when the agent connection is closed.
1572 kumpf 1.1
1573 // Determine the success of the disable module operation
1574 CIMDisableModuleResponseMessage* dmResponse =
1575 dynamic_cast<CIMDisableModuleResponseMessage*>(
1576 response.get());
1577 PEGASUS_ASSERT(dmResponse != 0);
1578
1579 Boolean isStopped = false;
1580 for (Uint32 i=0; i < dmResponse->operationalStatus.size(); i++)
1581 {
1582 if (dmResponse->operationalStatus[i] ==
1583 CIM_MSE_OPSTATUS_VALUE_STOPPED)
1584 {
1585 isStopped = true;
1586 break;
1587 }
1588 }
1589
1590 // If the operation is unsuccessful, stop and return the error
1591 if ((dmResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1592 !isStopped)
1593 kumpf 1.1 {
1594 break;
1595 }
1596 }
1597 }
1598
1599 // Use a default response if no Provider Agents were called
1600 if (!response.get())
1601 {
1602 response.reset(request->buildResponse());
1603
1604 CIMDisableModuleResponseMessage* dmResponse =
1605 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1606 PEGASUS_ASSERT(dmResponse != 0);
1607
1608 Array<Uint16> operationalStatus;
1609 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1610 dmResponse->operationalStatus = operationalStatus;
1611 }
1612 }
1613 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1614 kumpf 1.1 {
1615 // Fan out the request to all Provider Agent processes for this module
1616
1617 // Retrieve the provider module name
1618 String moduleName;
1619 CIMValue nameValue = providerModule.getProperty(
1620 providerModule.findProperty("Name")).getValue();
1621 nameValue.get(moduleName);
1622
1623 // Look up the Provider Agents for this module
1624 Array<ProviderAgentContainer*> paArray =
1625 _lookupProviderAgents(moduleName);
1626
1627 for (Uint32 i=0; i<paArray.size(); i++)
1628 {
1629 //
1630 // Do not start up an agent process just to enable the module
1631 //
1632 if (paArray[i]->isInitialized())
1633 {
1634 //
1635 kumpf 1.1 // Forward the request to the provider agent
1636 //
1637 response.reset(paArray[i]->processMessage(request));
1638
1639 // Determine the success of the enable module operation
1640 CIMEnableModuleResponseMessage* emResponse =
1641 dynamic_cast<CIMEnableModuleResponseMessage*>(
1642 response.get());
1643 PEGASUS_ASSERT(emResponse != 0);
1644
1645 Boolean isOk = false;
1646 for (Uint32 i=0; i < emResponse->operationalStatus.size(); i++)
1647 {
1648 if (emResponse->operationalStatus[i] ==
1649 CIM_MSE_OPSTATUS_VALUE_OK)
1650 {
1651 isOk = true;
1652 break;
1653 }
1654 }
1655
1656 kumpf 1.1 // If the operation is unsuccessful, stop and return the error
1657 if ((emResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1658 !isOk)
1659 {
1660 break;
1661 }
1662 }
1663 }
1664
1665 // Use a default response if no Provider Agents were called
1666 if (!response.get())
1667 {
1668 response.reset(request->buildResponse());
1669
1670 CIMEnableModuleResponseMessage* emResponse =
1671 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1672 PEGASUS_ASSERT(emResponse != 0);
1673
1674 Array<Uint16> operationalStatus;
1675 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1676 emResponse->operationalStatus = operationalStatus;
1677 kumpf 1.1 }
1678 }
1679 else
1680 {
1681 //
1682 // Look up the Provider Agent for this module instance and requesting
1683 // user
1684 //
1685 ProviderAgentContainer* pa = _lookupProviderAgent(providerModule,
1686 request);
1687 PEGASUS_ASSERT(pa != 0);
1688
1689 //
1690 // Forward the request to the provider agent
1691 //
1692 response.reset(pa->processMessage(request));
1693 }
1694
1695 response->syncAttributes(request);
1696
1697 PEG_METHOD_EXIT();
1698 kumpf 1.1 return response.release();
1699 }
1700
1701 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
1702 const CIMInstance& providerModule,
1703 CIMRequestMessage* request)
1704 {
1705 // Retrieve the provider module name
1706 String moduleName;
1707 CIMValue nameValue = providerModule.getProperty(
1708 providerModule.findProperty("Name")).getValue();
1709 nameValue.get(moduleName);
1710
1711 // Retrieve the provider user context configuration
1712 Uint16 userContext = 0;
1713 Uint32 pos = providerModule.findProperty(
1714 PEGASUS_PROPERTYNAME_MODULE_USERCONTEXT);
1715 if (pos != PEG_NOT_FOUND)
1716 {
1717 CIMValue userContextValue =
1718 providerModule.getProperty(pos).getValue();
1719 kumpf 1.1 if (!userContextValue.isNull())
1720 {
1721 userContextValue.get(userContext);
1722 }
1723 }
1724
1725 if (userContext == 0)
1726 {
1727 userContext = PEGASUS_DEFAULT_PROV_USERCTXT;
1728 }
1729
1730 String userName;
1731
1732 if (userContext == PG_PROVMODULE_USERCTXT_REQUESTOR)
1733 {
1734 if(request->operationContext.contains(IdentityContainer::NAME))
1735 {
1736 // User Name is in the OperationContext
1737 IdentityContainer ic = (IdentityContainer)
1738 request->operationContext.get(IdentityContainer::NAME);
1739 userName = ic.getUserName();
1740 kumpf 1.1 }
1741 //else
1742 //{
1743 // If no IdentityContainer is present, default to the CIM
1744 // Server's user context
1745 //}
1746
1747 // If authentication is disabled, use the CIM Server's user context
1748 if (!userName.size())
1749 {
1750 userName = System::getEffectiveUserName();
1751 }
1752 }
1753 else if (userContext == PG_PROVMODULE_USERCTXT_DESIGNATED)
1754 {
1755 // Retrieve the provider module designated user property value
1756 providerModule.getProperty(providerModule.findProperty(
1757 PEGASUS_PROPERTYNAME_MODULE_DESIGNATEDUSER)).getValue().
1758 get(userName);
1759 }
1760 else if (userContext == PG_PROVMODULE_USERCTXT_CIMSERVER)
1761 kumpf 1.1 {
1762 userName = System::getEffectiveUserName();
1763 }
1764 else // Privileged User
1765 {
1766 PEGASUS_ASSERT(userContext == PG_PROVMODULE_USERCTXT_PRIVILEGED);
1767 userName = System::getPrivilegedUserName();
1768 }
1769
1770 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1771 "Module name = " + moduleName);
1772 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1773 "User context = %hd.", userContext);
1774 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1775 "User name = " + userName);
1776
1777 ProviderAgentContainer* pa = 0;
1778 String key = moduleName + ":" + userName;
1779
1780 AutoMutex lock(_providerAgentTableMutex);
1781 if (!_providerAgentTable.lookup(key, pa))
1782 kumpf 1.1 {
1783 pa = new ProviderAgentContainer(
1784 moduleName, userName, userContext,
1785 _indicationCallback, _responseChunkCallback,
1786 _providerModuleFailCallback,
1787 _subscriptionInitComplete);
1788 _providerAgentTable.insert(key, pa);
1789 }
1790 return pa;
1791 }
1792
1793 Array<ProviderAgentContainer*> OOPProviderManagerRouter::_lookupProviderAgents(
1794 const String& moduleName)
1795 {
1796 Array<ProviderAgentContainer*> paArray;
1797
1798 AutoMutex lock(_providerAgentTableMutex);
1799 for (ProviderAgentTable::Iterator i = _providerAgentTable.start(); i; i++)
1800 {
1801 if (i.value()->getModuleName() == moduleName)
1802 {
1803 kumpf 1.1 paArray.append(i.value());
1804 }
1805 }
1806 return paArray;
1807 }
1808
1809 CIMResponseMessage* OOPProviderManagerRouter::_forwardRequestToAllAgents(
1810 CIMRequestMessage* request)
1811 {
1812 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1813 "OOPProviderManagerRouter::_forwardRequestToAllAgents");
1814
1815 // Get a list of the ProviderAgentContainers. We need our own array copy
1816 // because we cannot hold the _providerAgentTableMutex while calling
1817 // _ProviderAgentContainer::processMessage().
1818 Array<ProviderAgentContainer*> paContainerArray;
1819 {
1820 AutoMutex tableLock(_providerAgentTableMutex);
1821 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1822 i != 0; i++)
1823 {
1824 kumpf 1.1 paContainerArray.append(i.value());
1825 }
1826 }
1827
1828 CIMException responseException;
1829
1830 // Forward the request to each of the initialized provider agents
1831 for (Uint32 j = 0; j < paContainerArray.size(); j++)
1832 {
1833 ProviderAgentContainer* pa = paContainerArray[j];
1834 if (pa->isInitialized())
1835 {
1836 // Note: The ProviderAgentContainer could become uninitialized
1837 // before _ProviderAgentContainer::processMessage() processes
1838 // this request. In this case, the Provider Agent process will
1839 // (unfortunately) be started to process this message.
1840 AutoPtr<CIMResponseMessage> response;
1841 response.reset(pa->processMessage(request));
1842 if (response.get() != 0)
1843 {
1844 // If the operation failed, save the exception data
1845 kumpf 1.1 if ((response->cimException.getCode() != CIM_ERR_SUCCESS) &&
1846 (responseException.getCode() == CIM_ERR_SUCCESS))
1847 {
1848 responseException = response->cimException;
1849 }
1850 }
1851 }
1852 }
1853
1854 CIMResponseMessage* response = request->buildResponse();
1855 response->cimException = responseException;
1856
1857 PEG_METHOD_EXIT();
1858 return response;
1859 }
1860
1861 Boolean OOPProviderManagerRouter::hasActiveProviders()
1862 {
1863 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1864 "OOPProviderManagerRouter::hasActiveProviders");
1865
1866 kumpf 1.1 // Iterate through the _providerAgentTable looking for initialized agents
1867 AutoMutex lock(_providerAgentTableMutex);
1868 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1869 for(; i != 0; i++)
1870 {
1871 if (i.value()->isInitialized())
1872 {
1873 PEG_METHOD_EXIT();
1874 return true;
1875 }
1876 }
1877
1878 // No initialized Provider Agents were found
1879 PEG_METHOD_EXIT();
1880 return false;
1881 }
1882
1883 void OOPProviderManagerRouter::unloadIdleProviders()
1884 {
1885 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1886 "OOPProviderManagerRouter::unloadIdleProviders");
1887 kumpf 1.1
1888 // Iterate through the _providerAgentTable unloading idle providers
1889 AutoMutex lock(_providerAgentTableMutex);
1890 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1891 for(; i != 0; i++)
1892 {
1893 i.value()->unloadIdleProviders();
1894 }
1895
1896 PEG_METHOD_EXIT();
1897 }
1898
1899 PEGASUS_NAMESPACE_END
|