1 karl 1.23 //%2006////////////////////////////////////////////////////////////////////////
|
2 kumpf 1.1 //
|
3 karl 1.5 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 kumpf 1.1 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.5 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.7 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.23 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 kumpf 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.5 //
|
21 kumpf 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
33 // Jenny Yu, Hewlett-Packard Company (jenny_yu@hp.com)
34 //
|
35 gs.keenan 1.8 // Modified By: Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
36 carolann.graves 1.14 // Carol Ann Krug Graves, Hewlett-Packard Company
37 // (carolann_graves@hp.com)
|
38 kumpf 1.1 //
39 //%/////////////////////////////////////////////////////////////////////////////
40
41 #include <Pegasus/Common/Config.h>
42 #include <Pegasus/Common/Constants.h>
43 #include <Pegasus/Common/AutoPtr.h>
44 #include <Pegasus/Common/ArrayInternal.h>
45 #include <Pegasus/Common/CIMMessage.h>
46 #include <Pegasus/Common/CIMMessageSerializer.h>
47 #include <Pegasus/Common/CIMMessageDeserializer.h>
48 #include <Pegasus/Common/OperationContextInternal.h>
49 #include <Pegasus/Common/System.h>
50 #include <Pegasus/Common/AnonymousPipe.h>
51 #include <Pegasus/Common/Tracer.h>
52 #include <Pegasus/Common/Logger.h>
53 #include <Pegasus/Common/Thread.h>
54 #include <Pegasus/Common/MessageQueueService.h>
55 #include <Pegasus/Config/ConfigManager.h>
56
57 #if defined (PEGASUS_OS_TYPE_WINDOWS)
|
58 gs.keenan 1.8 # include <windows.h> // For CreateProcess()
59 #elif defined (PEGASUS_OS_OS400)
60 # include <unistd.cleinc>
61 #elif defined (PEGASUS_OS_VMS)
62 # include <perror.h>
63 # include <climsgdef.h>
64 # include <stdio.h>
65 # include <stdlib.h>
66 # include <string.h>
67 # include <processes.h>
68 # include <unixio.h>
|
69 kumpf 1.1 #else
|
70 gs.keenan 1.8 # include <unistd.h> // For fork(), exec(), and _exit()
71 # include <errno.h>
|
72 kumpf 1.18 # include <sys/types.h>
73 # if defined(PEGASUS_HAS_SIGNALS)
74 # include <sys/wait.h>
75 # endif
|
76 kumpf 1.1 #endif
77
78 #include "OOPProviderManagerRouter.h"
79
80 PEGASUS_USING_STD;
81
82 PEGASUS_NAMESPACE_BEGIN
83
84 /////////////////////////////////////////////////////////////////////////////
85 // OutstandingRequestTable and OutstandingRequestEntry
86 /////////////////////////////////////////////////////////////////////////////
87
88 /**
89 An OutstandingRequestEntry represents a request message sent to a
90 Provider Agent for which no response has been received. The request
91 sender provides the message ID and a location for the response to be
92 returned, and then waits on the semaphore. When a response matching
93 the message ID is received, it is placed into the specified location
94 and the semaphore is signaled.
95 */
96 class OutstandingRequestEntry
97 kumpf 1.1 {
98 public:
99 OutstandingRequestEntry(
100 String messageId_,
|
101 kumpf 1.26 CIMRequestMessage* requestMessage_,
|
102 kumpf 1.1 CIMResponseMessage*& responseMessage_,
103 Semaphore* responseReady_)
104 : messageId(messageId_),
|
105 kumpf 1.26 requestMessage(requestMessage_),
|
106 kumpf 1.1 responseMessage(responseMessage_),
107 responseReady(responseReady_)
108 {
109 }
110
111 String messageId;
|
112 kumpf 1.26 CIMRequestMessage* requestMessage;
|
113 kumpf 1.1 CIMResponseMessage*& responseMessage;
114 Semaphore* responseReady;
115 };
116
117 typedef HashTable<String, OutstandingRequestEntry*, EqualFunc<String>,
118 HashFunc<String> > OutstandingRequestTable;
119
120
121 /////////////////////////////////////////////////////////////////////////////
122 // ProviderAgentContainer
123 /////////////////////////////////////////////////////////////////////////////
124
125 class ProviderAgentContainer
126 {
127 public:
128 ProviderAgentContainer(
129 const String & moduleName,
|
130 kumpf 1.6 const String & userName,
|
131 kumpf 1.26 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
132 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
|
133 carolann.graves 1.14 Boolean subscriptionInitComplete);
|
134 kumpf 1.1
135 ~ProviderAgentContainer();
136
137 Boolean isInitialized();
138
|
139 kumpf 1.6 String getModuleName() const;
140
|
141 kumpf 1.1 CIMResponseMessage* processMessage(CIMRequestMessage* request);
142 void unloadIdleProviders();
143
144 private:
145 //
146 // Private methods
147 //
148
149 /** Unimplemented */
150 ProviderAgentContainer();
151 /** Unimplemented */
152 ProviderAgentContainer(const ProviderAgentContainer& pa);
153 /** Unimplemented */
154 ProviderAgentContainer& operator=(const ProviderAgentContainer& pa);
155
156 /**
157 Start a Provider Agent process and establish a pipe connection with it.
158 Note: The caller must lock the _agentMutex.
159 */
160 void _startAgentProcess();
161
162 kumpf 1.1 /**
163 Send initialization data to the Provider Agent.
164 Note: The caller must lock the _agentMutex.
165 */
166 void _sendInitializationData();
167
168 /**
169 Initialize the ProviderAgentContainer if it is not already
170 initialized. Initialization includes starting the Provider Agent
171 process, establishing a pipe connection with it, and starting a
172 thread to read response messages from the Provider Agent.
173
174 Note: The caller must lock the _agentMutex.
175 */
176 void _initialize();
177
178 /**
179 Uninitialize the ProviderAgentContainer if it is initialized.
180 The connection is closed and outstanding requests are completed
181 with an error result.
182
183 kumpf 1.1 Note: The caller must lock the _agentMutex.
|
184 kumpf 1.22
185 @param cleanShutdown Indicates whether the provider agent process
186 exited cleanly. A value of true indicates that responses have been
187 sent for all requests that have been processed. A value of false
188 indicates that one or more requests may have been partially processed.
189 */
190 void _uninitialize(Boolean cleanShutdown);
191
192 /**
193 Performs the processMessage work, but does not retry on a transient
194 error.
|
195 kumpf 1.1 */
|
196 kumpf 1.22 CIMResponseMessage* _processMessage(CIMRequestMessage* request);
|
197 kumpf 1.1
198 /**
199 Read and process response messages from the Provider Agent until
200 the connection is closed.
201 */
202 void _processResponses();
203 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
204 _responseProcessor(void* arg);
205
206 //
207 // Private data
208 //
209
210 /**
211 The _agentMutex must be locked whenever writing to the Provider
212 Agent connection, accessing the _isInitialized flag, or changing
213 the Provider Agent state.
214 */
215 Mutex _agentMutex;
216
217 /**
218 kumpf 1.1 Name of the provider module served by this Provider Agent.
219 */
220 String _moduleName;
221
222 /**
|
223 kumpf 1.6 The user context in which this Provider Agent operates.
224 */
225 String _userName;
226
227 /**
|
228 kumpf 1.1 Callback function to which all generated indications are sent for
229 processing.
230 */
|
231 kumpf 1.26 PEGASUS_INDICATION_CALLBACK_T _indicationCallback;
232
233 /**
234 Callback function to which response chunks are sent for processing.
235 */
236 PEGASUS_RESPONSE_CHUNK_CALLBACK_T _responseChunkCallback;
|
237 kumpf 1.1
238 /**
239 Indicates whether the Provider Agent is active.
240 */
241 Boolean _isInitialized;
242
243 /**
244 Pipe connection used to read responses from the Provider Agent.
245 */
246 AutoPtr<AnonymousPipe> _pipeFromAgent;
247 /**
248 Pipe connection used to write requests to the Provider Agent.
249 */
250 AutoPtr<AnonymousPipe> _pipeToAgent;
251
|
252 kumpf 1.18 #if defined(PEGASUS_HAS_SIGNALS)
253 /**
254 Process ID of the active Provider Agent.
255 */
256 pid_t _pid;
257 #endif
258
|
259 kumpf 1.1 /**
260 The _outstandingRequestTable holds an entry for each request that has
261 been sent to this Provider Agent for which no response has been
262 received. Entries are added (by the writing thread) when a request
263 is sent, and are removed (by the reading thread) when the response is
264 received (or when it is determined that no response is forthcoming).
265 */
266 OutstandingRequestTable _outstandingRequestTable;
267 /**
268 The _outstandingRequestTableMutex must be locked whenever reading or
269 updating the _outstandingRequestTable.
270 */
271 Mutex _outstandingRequestTableMutex;
|
272 kumpf 1.2
273 /**
274 Holds the last provider module instance sent to the Provider Agent in
275 a ProviderIdContainer. Since the provider module instance rarely
276 changes, an optimization is used to send it only when it differs from
277 the last provider module instance sent.
278 */
279 CIMInstance _providerModuleCache;
|
280 kumpf 1.6
281 /**
282 The number of Provider Agent processes that are currently initialized
283 (active).
284 */
285 static Uint32 _numProviderProcesses;
286
287 /**
288 The _numProviderProcessesMutex must be locked whenever reading or
289 updating the _numProviderProcesses count.
290 */
291 static Mutex _numProviderProcessesMutex;
292
293 /**
294 The maximum number of Provider Agent processes that may be initialized
295 (active) at one time.
296 */
297 static Uint32 _maxProviderProcesses;
|
298 carolann.graves 1.14
299 /**
|
300 kumpf 1.22 A value indicating that a request message has not been processed.
301 A CIMResponseMessage pointer with this value indicates that the
302 corresponding CIMRequestMessage has not been processed. This is
303 used to indicate that a provider agent exited without starting to
304 process the request, and that the request should be retried.
305 */
306 static CIMResponseMessage* _REQUEST_NOT_PROCESSED;
307
308 /**
|
309 carolann.graves 1.14 Indicates whether the Indication Service has completed initialization.
310
311 For more information, please see the description of the
312 ProviderManagerRouter::_subscriptionInitComplete member variable.
313 */
314 Boolean _subscriptionInitComplete;
|
315 kumpf 1.1 };
316
|
317 kumpf 1.6 Uint32 ProviderAgentContainer::_numProviderProcesses = 0;
318 Mutex ProviderAgentContainer::_numProviderProcessesMutex;
319 Uint32 ProviderAgentContainer::_maxProviderProcesses = PEG_NOT_FOUND;
320
|
321 kumpf 1.22 // Set this to a value that no valid CIMResponseMessage* will have.
322 CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED =
323 reinterpret_cast<CIMResponseMessage*>(&_REQUEST_NOT_PROCESSED);
324
|
325 kumpf 1.1 ProviderAgentContainer::ProviderAgentContainer(
326 const String & moduleName,
|
327 kumpf 1.6 const String & userName,
|
328 kumpf 1.26 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
329 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
|
330 carolann.graves 1.14 Boolean subscriptionInitComplete)
|
331 kumpf 1.1 : _moduleName(moduleName),
|
332 kumpf 1.6 _userName(userName),
|
333 kumpf 1.1 _indicationCallback(indicationCallback),
|
334 kumpf 1.26 _responseChunkCallback(responseChunkCallback),
|
335 carolann.graves 1.14 _isInitialized(false),
336 _subscriptionInitComplete(subscriptionInitComplete)
|
337 kumpf 1.1 {
338 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
339 "ProviderAgentContainer::ProviderAgentContainer");
340 PEG_METHOD_EXIT();
341 }
342
343 ProviderAgentContainer::~ProviderAgentContainer()
344 {
345 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
346 "ProviderAgentContainer::~ProviderAgentContainer");
347
348 // Ensure the destructor does not throw an exception
349 try
350 {
|
351 kumpf 1.17 if (isInitialized())
352 {
353 // Stop the responseProcessor thread by closing its connection
354 _pipeFromAgent->closeReadHandle();
|
355 kumpf 1.1
|
356 kumpf 1.17 // Wait for the responseProcessor thread to exit
357 while (isInitialized())
358 {
359 pegasus_yield();
360 }
|
361 kumpf 1.1 }
362 }
363 catch (...)
364 {
365 }
366
367 PEG_METHOD_EXIT();
368 }
369
370 void ProviderAgentContainer::_startAgentProcess()
371 {
372 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
373 "ProviderAgentContainer::_startAgentProcess");
374
|
375 kumpf 1.21 //
376 // Serialize the starting of agent processes. If two agent processes are
377 // started at the same time, they may get copies of each other's pipe
378 // descriptors. If this happens, the cimserver will not get a pipe read
379 // error when one of the agent processes exits, because the pipe will
380 // still be writable by the other process. This locking control needs to
381 // cover the period from where the pipes are created to where the agent
382 // ends of the pipes are closed by the cimserver.
383 //
384 static Mutex agentStartupMutex;
385 AutoMutex lock(agentStartupMutex);
386
|
387 kumpf 1.1 AutoPtr<AnonymousPipe> pipeFromAgent(new AnonymousPipe());
388 AutoPtr<AnonymousPipe> pipeToAgent(new AnonymousPipe());
389
390 //
391 // Start a cimprovagt process for this provider module
392 //
393
394 #if defined (PEGASUS_OS_TYPE_WINDOWS)
395 //
396 // Set up members of the PROCESS_INFORMATION structure
397 //
398 PROCESS_INFORMATION piProcInfo;
399 ZeroMemory (&piProcInfo, sizeof (PROCESS_INFORMATION));
400
401 //
402 // Set up members of the STARTUPINFO structure
403 //
404 STARTUPINFO siStartInfo;
405 ZeroMemory (&siStartInfo, sizeof (STARTUPINFO));
406 siStartInfo.cb = sizeof (STARTUPINFO);
407
408 kumpf 1.1 //
409 // Generate the command line
410 //
411 char cmdLine[2048];
412 char readHandle[32];
413 char writeHandle[32];
414 pipeToAgent->exportReadHandle(readHandle);
415 pipeFromAgent->exportWriteHandle(writeHandle);
416
417 sprintf(cmdLine, "\"%s\" %s %s \"%s\"",
418 (const char*)ConfigManager::getHomedPath(
419 PEGASUS_PROVIDER_AGENT_PROC_NAME).getCString(),
420 readHandle, writeHandle, (const char*)_moduleName.getCString());
421
422 //
423 // Create the child process
424 //
425 if (!CreateProcess (
426 NULL, //
427 cmdLine, // command line
428 NULL, // process security attributes
429 kumpf 1.1 NULL, // primary thread security attributes
430 TRUE, // handles are inherited
431 0, // creation flags
432 NULL, // use parent's environment
433 NULL, // use parent's current directory
434 &siStartInfo, // STARTUPINFO
435 &piProcInfo)) // PROCESS_INFORMATION
436 {
437 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
438 "CreateProcess() failed. errno = %d.", GetLastError());
439 PEG_METHOD_EXIT();
440 throw Exception(MessageLoaderParms(
441 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
442 "Failed to start cimprovagt \"$0\".",
443 _moduleName));
444 }
445
446 CloseHandle(piProcInfo.hProcess);
447 CloseHandle(piProcInfo.hThread);
|
448 gs.keenan 1.8
449 #elif defined (PEGASUS_OS_VMS)
450
|
451 gs.keenan 1.10 //
452 // fork and exec the child process
453 //
454 int status;
|
455 gs.keenan 1.8
|
456 gs.keenan 1.10 status = vfork ();
457 switch (status)
458 {
459 case 0:
460 try
|
461 gs.keenan 1.8 {
|
462 gs.keenan 1.10 //
463 // Execute the cimprovagt program
464 //
465 String agentCommandPath =
466 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
467 CString agentCommandPathCString = agentCommandPath.getCString();
468
469 char readHandle[32];
470 char writeHandle[32];
471 pipeToAgent->exportReadHandle(readHandle);
472 pipeFromAgent->exportWriteHandle(writeHandle);
473
474 if ((status = execl(agentCommandPathCString, agentCommandPathCString,
475 readHandle, writeHandle,
476 (const char*)_moduleName.getCString(), (char*)0)) == -1);
477 {
478 // If we're still here, there was an error
479 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
480 "execl() failed. errno = %d.", errno);
481 _exit(1);
482 }
483 gs.keenan 1.10 }
484 catch (...)
485 {
486 // There's not much we can do here in no man's land
487 try
488 {
489 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
490 "Caught exception before calling execl().");
491 }
492 catch (...)
493 {
494 }
495 _exit(1);
496 }
497 PEG_METHOD_EXIT();
498 return;
499 break;
500
501 case -1:
502 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
503 "fork() failed. errno = %d.", errno);
504 gs.keenan 1.10 PEG_METHOD_EXIT();
505 throw Exception(MessageLoaderParms(
506 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
507 "Failed to start cimprovagt \"$0\".",
508 _moduleName));
509 break;
510
511 default:
512 // Close our copies of the agent's ends of the pipes
513 pipeToAgent->closeReadHandle();
514 pipeFromAgent->closeWriteHandle();
|
515 gs.keenan 1.8
|
516 gs.keenan 1.10 _pipeToAgent.reset(pipeToAgent.release());
517 _pipeFromAgent.reset(pipeFromAgent.release());
|
518 gs.keenan 1.8
|
519 gs.keenan 1.10 PEG_METHOD_EXIT();
520 }
|
521 chuck 1.16 #elif defined (PEGASUS_OS_OS400)
522
523 //Out of provider support for OS400 goes here when needed.
524
|
525 kumpf 1.1 #else
526 pid_t pid = fork();
527 if (pid < 0)
528 {
529 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
530 "fork() failed. errno = %d.", errno);
531 PEG_METHOD_EXIT();
532 throw Exception(MessageLoaderParms(
533 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
534 "Failed to start cimprovagt \"$0\".",
535 _moduleName));
536 }
537 else if (pid == 0)
538 {
539 //
540 // Child side of the fork
541 //
542
543 try
544 {
545 // Close our copies of the parent's ends of the pipes
546 kumpf 1.1 pipeToAgent->closeWriteHandle();
547 pipeFromAgent->closeReadHandle();
548
549 //
550 // Execute the cimprovagt program
551 //
552 String agentCommandPath =
553 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
554 CString agentCommandPathCString = agentCommandPath.getCString();
555
556 char readHandle[32];
557 char writeHandle[32];
558 pipeToAgent->exportReadHandle(readHandle);
559 pipeFromAgent->exportWriteHandle(writeHandle);
560
|
561 kumpf 1.13 #ifndef PEGASUS_DISABLE_PROV_USERCTXT
|
562 kumpf 1.6 // Set the user context of the Provider Agent process
563 if (_userName != System::getEffectiveUserName())
564 {
565 if (!System::changeUserContext(_userName.getCString()))
566 {
567 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
568 "System::changeUserContext() failed. userName = %s.",
|
569 kumpf 1.9 (const char*)_userName.getCString());
|
570 kumpf 1.6 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
571 Logger::WARNING,
572 "ProviderManager.OOPProviderManagerRouter."
573 "USER_CONTEXT_CHANGE_FAILED",
574 "Unable to change user context to \"$0\".", _userName);
575 _exit(1);
576 }
577 }
|
578 kumpf 1.13 #endif
|
579 kumpf 1.6
|
580 kumpf 1.1 execl(agentCommandPathCString, agentCommandPathCString,
581 readHandle, writeHandle,
582 (const char*)_moduleName.getCString(), (char*)0);
583
584 // If we're still here, there was an error
585 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
586 "execl() failed. errno = %d.", errno);
587 _exit(1);
588 }
589 catch (...)
590 {
591 // There's not much we can do here in no man's land
592 try
593 {
594 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
595 "Caught exception before calling execl().");
596 }
597 catch (...) {}
598 _exit(1);
599 }
600 }
|
601 kumpf 1.18 # if defined(PEGASUS_HAS_SIGNALS)
602 _pid = pid;
603 # endif
|
604 kumpf 1.1 #endif
605
606 //
607 // CIM Server process
608 //
609
610 // Close our copies of the agent's ends of the pipes
611 pipeToAgent->closeReadHandle();
612 pipeFromAgent->closeWriteHandle();
613
614 _pipeToAgent.reset(pipeToAgent.release());
615 _pipeFromAgent.reset(pipeFromAgent.release());
616
|
617 gs.keenan 1.10 PEG_METHOD_EXIT();
|
618 kumpf 1.1 }
619
620 // Note: Caller must lock _agentMutex
621 void ProviderAgentContainer::_sendInitializationData()
622 {
623 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
624 "ProviderAgentContainer::_sendInitializationData");
625
626 //
627 // Gather config properties to pass to the Provider Agent
628 //
629 ConfigManager* configManager = ConfigManager::getInstance();
630 Array<Pair<String, String> > configProperties;
631
632 Array<String> configPropertyNames;
633 configManager->getAllPropertyNames(configPropertyNames, true);
634 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
635 {
636 String configPropertyValue =
637 configManager->getCurrentValue(configPropertyNames[i]);
638 String configPropertyDefaultValue =
639 kumpf 1.1 configManager->getDefaultValue(configPropertyNames[i]);
640 if (configPropertyValue != configPropertyDefaultValue)
641 {
642 configProperties.append(Pair<String, String>(
643 configPropertyNames[i], configPropertyValue));
644 }
645 }
646
647 //
648 // Create a Provider Agent initialization message
649 //
650 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
651 new CIMInitializeProviderAgentRequestMessage(
652 String("0"), // messageId
653 configManager->getPegasusHome(),
654 configProperties,
655 System::bindVerbose,
|
656 carolann.graves 1.14 _subscriptionInitComplete,
|
657 kumpf 1.1 QueueIdStack()));
658
659 //
660 // Write the initialization message to the pipe
661 //
662 AnonymousPipe::Status writeStatus =
663 _pipeToAgent->writeMessage(request.get());
664
665 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
666 {
667 PEG_METHOD_EXIT();
668 throw Exception(MessageLoaderParms(
669 "ProviderManager.OOPProviderManagerRouter."
670 "CIMPROVAGT_COMMUNICATION_FAILED",
671 "Failed to communicate with cimprovagt \"$0\".",
672 _moduleName));
673 }
674
|
675 kumpf 1.25 // Wait for a null response from the Provider Agent indicating it has
676 // initialized successfully.
677
678 CIMMessage* message;
679 AnonymousPipe::Status readStatus;
680 do
681 {
682 readStatus = _pipeFromAgent->readMessage(message);
683 } while (readStatus == AnonymousPipe::STATUS_INTERRUPT);
684
685 if (readStatus != AnonymousPipe::STATUS_SUCCESS)
686 {
687 PEG_METHOD_EXIT();
688 throw Exception(MessageLoaderParms(
689 "ProviderManager.OOPProviderManagerRouter."
690 "CIMPROVAGT_COMMUNICATION_FAILED",
691 "Failed to communicate with cimprovagt \"$0\".",
692 _moduleName));
693 }
694
695 PEGASUS_ASSERT(message == 0);
|
696 kumpf 1.1
697 PEG_METHOD_EXIT();
698 }
699
700 // Note: Caller must lock _agentMutex
701 void ProviderAgentContainer::_initialize()
702 {
703 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
704 "ProviderAgentContainer::_initialize");
705
706 if (_isInitialized)
707 {
708 PEGASUS_ASSERT(0);
709 PEG_METHOD_EXIT();
710 return;
711 }
712
|
713 kumpf 1.6 if (_maxProviderProcesses == PEG_NOT_FOUND)
714 {
|
715 kumpf 1.25 String maxProviderProcesses = ConfigManager::getInstance()->
716 getCurrentValue("maxProviderProcesses");
|
717 kumpf 1.6 CString maxProviderProcessesString = maxProviderProcesses.getCString();
718 char* end = 0;
719 _maxProviderProcesses = strtol(maxProviderProcessesString, &end, 10);
720 }
721
722 {
723 AutoMutex lock(_numProviderProcessesMutex);
724 if ((_maxProviderProcesses != 0) &&
725 (_numProviderProcesses >= _maxProviderProcesses))
726 {
727 throw PEGASUS_CIM_EXCEPTION(
728 CIM_ERR_FAILED,
729 MessageLoaderParms(
730 "ProviderManager.OOPProviderManagerRouter."
731 "MAX_PROVIDER_PROCESSES_REACHED",
732 "The maximum number of cimprovagt processes has been "
733 "reached."));
734 }
735 else
736 {
737 _numProviderProcesses++;
738 kumpf 1.6 }
739 }
740
|
741 kumpf 1.1 try
742 {
743 _startAgentProcess();
744
|
745 kumpf 1.18 _isInitialized = true;
746
|
747 kumpf 1.1 _sendInitializationData();
748
749 // Start a thread to read and process responses from the Provider Agent
|
750 kumpf 1.20 ThreadStatus rtn = PEGASUS_THREAD_OK;
751 while ((rtn = MessageQueueService::get_thread_pool()->
752 allocate_and_awaken(this, _responseProcessor)) !=
753 PEGASUS_THREAD_OK)
754 {
755 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
756 {
757 pegasus_yield();
758 }
759 else
760 {
761 Logger::put(
762 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
763 "Not enough threads to process responses from the "
764 "provider agent.");
|
765 konrad.r 1.19
|
766 kumpf 1.20 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
767 "Could not allocate thread to process responses from the "
768 "provider agent.");
769
770 throw Exception(MessageLoaderParms(
771 "ProviderManager.OOPProviderManagerRouter."
772 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
773 "Failed to allocate thread for cimprovagt \"$0\".",
774 _moduleName));
775 }
776 }
|
777 kumpf 1.1 }
778 catch (...)
779 {
|
780 kumpf 1.20 // Closing the connection causes the agent process to exit
781 _pipeToAgent.reset();
782 _pipeFromAgent.reset();
783
|
784 kumpf 1.18 #if defined(PEGASUS_HAS_SIGNALS)
785 if (_isInitialized)
786 {
787 // Harvest the status of the agent process to prevent a zombie
788 Boolean keepWaiting = false;
789 do
790 {
791 pid_t status = waitpid(_pid, 0, 0);
792 if (status == -1)
793 {
794 if (errno == EINTR)
795 {
796 keepWaiting = true;
797 }
798 else
799 {
800 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
801 "ProviderAgentContainer::_initialize(): "
802 "waitpid failed; errno = %d.", errno);
803 }
804 }
805 kumpf 1.18 } while (keepWaiting);
806 }
807 #endif
808
|
809 kumpf 1.1 _isInitialized = false;
|
810 kumpf 1.6
811 {
812 AutoMutex lock(_numProviderProcessesMutex);
813 _numProviderProcesses--;
814 }
815
|
816 kumpf 1.1 PEG_METHOD_EXIT();
817 throw;
818 }
819
820 PEG_METHOD_EXIT();
821 }
822
823 Boolean ProviderAgentContainer::isInitialized()
824 {
825 AutoMutex lock(_agentMutex);
826 return _isInitialized;
827 }
828
829 // Note: Caller must lock _agentMutex
|
830 kumpf 1.22 void ProviderAgentContainer::_uninitialize(Boolean cleanShutdown)
|
831 kumpf 1.1 {
832 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
833 "ProviderAgentContainer::_uninitialize");
834
835 if (!_isInitialized)
836 {
837 PEGASUS_ASSERT(0);
838 PEG_METHOD_EXIT();
839 return;
840 }
841
842 try
843 {
844 // Close the connection with the Provider Agent
845 _pipeFromAgent.reset();
846 _pipeToAgent.reset();
847
|
848 kumpf 1.2 _providerModuleCache = CIMInstance();
849
|
850 kumpf 1.6 {
851 AutoMutex lock(_numProviderProcessesMutex);
852 _numProviderProcesses--;
853 }
854
|
855 kumpf 1.18 #if defined(PEGASUS_HAS_SIGNALS)
856 // Harvest the status of the agent process to prevent a zombie
857 Boolean keepWaiting = false;
858 do
859 {
860 pid_t status = waitpid(_pid, 0, 0);
861 if (status == -1)
862 {
863 if (errno == EINTR)
864 {
865 keepWaiting = true;
866 }
867 else
868 {
869 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
870 "ProviderAgentContainer::_uninitialize(): "
871 "waitpid failed; errno = %d.", errno);
872 }
873 }
874 } while (keepWaiting);
875 #endif
876 kumpf 1.18
|
877 kumpf 1.1 _isInitialized = false;
878
879 //
880 // Complete with null responses all outstanding requests on this
881 // connection
882 //
883 {
|
884 carolann.graves 1.27 //
885 // If not a clean shutdown, log a warning message in case module
886 // included indication providers
887 //
888 if (!cleanShutdown)
889 {
890 Logger::put_l(
891 Logger::STANDARD_LOG, System::CIMSERVER, Logger::WARNING,
892 "ProviderManager.OOPProviderManagerRouter."
893 "OOP_PROVIDER_MODULE_FAILURE_DETECTED",
894 "A failure was detected in provider module $0. The"
895 " generation of indications by providers in this module"
896 " may be affected. To ensure these providers are"
897 " serving active subscriptions, disable and then"
898 " re-enable this module using the cimprovider command.",
899 _moduleName);
900 }
901
|
902 kumpf 1.1 AutoMutex tableLock(_outstandingRequestTableMutex);
903
|
904 kumpf 1.22 CIMResponseMessage* response =
905 cleanShutdown ? _REQUEST_NOT_PROCESSED : 0;
906
|
907 kumpf 1.1 for (OutstandingRequestTable::Iterator i =
908 _outstandingRequestTable.start();
909 i != 0; i++)
910 {
911 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
912 String("Completing messageId \"") + i.value()->messageId +
913 "\" with a null response.");
|
914 kumpf 1.22 i.value()->responseMessage = response;
|
915 kumpf 1.1 i.value()->responseReady->signal();
916 }
917
918 _outstandingRequestTable.clear();
919 }
920 }
921 catch (...)
922 {
923 // We're uninitializing, so do not propagate the exception
924 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
925 "Ignoring _uninitialize() exception.");
926 }
927
928 PEG_METHOD_EXIT();
929 }
930
|
931 kumpf 1.6 String ProviderAgentContainer::getModuleName() const
932 {
933 return _moduleName;
934 }
935
|
936 kumpf 1.1 CIMResponseMessage* ProviderAgentContainer::processMessage(
937 CIMRequestMessage* request)
938 {
939 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
940 "ProviderAgentContainer::processMessage");
941
942 CIMResponseMessage* response;
|
943 kumpf 1.22
944 do
945 {
946 response = _processMessage(request);
|
947 kumpf 1.25
948 if (response == _REQUEST_NOT_PROCESSED)
949 {
950 // Check for request message types that should not be retried.
951 if ((request->getType() ==
952 CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
953 (request->getType() ==
954 CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) ||
955 (request->getType() ==
956 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
957 (request->getType() ==
958 CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE))
959 {
960 response = request->buildResponse();
961 break;
962 }
963 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
964 {
965 CIMDisableModuleResponseMessage* dmResponse =
966 dynamic_cast<CIMDisableModuleResponseMessage*>(response);
967 PEGASUS_ASSERT(dmResponse != 0);
968 kumpf 1.25
969 Array<Uint16> operationalStatus;
970 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
971 dmResponse->operationalStatus = operationalStatus;
972 break;
973 }
974 }
|
975 kumpf 1.22 } while (response == _REQUEST_NOT_PROCESSED);
976
977 PEG_METHOD_EXIT();
978 return response;
979 }
980
981 CIMResponseMessage* ProviderAgentContainer::_processMessage(
982 CIMRequestMessage* request)
983 {
984 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
985 "ProviderAgentContainer::_processMessage");
986
987 CIMResponseMessage* response;
|
988 kumpf 1.1 String originalMessageId = request->messageId;
989
|
990 kumpf 1.2 // These three variables are used for the provider module optimization.
991 // See the _providerModuleCache member description for more information.
992 AutoPtr<ProviderIdContainer> origProviderId;
993 Boolean doProviderModuleOptimization = false;
994 Boolean updateProviderModuleCache = false;
995
|
996 kumpf 1.1 try
997 {
998 // The messageId attribute is used to correlate response messages
999 // from the Provider Agent with request messages, so it is imperative
1000 // that the ID is unique for each request. The incoming ID cannot be
1001 // trusted to be unique, so we substitute a unique one. The memory
1002 // address of the request is used as the source of a unique piece of
1003 // data. (The message ID is only required to be unique while the
1004 // request is outstanding.)
1005 char messagePtrString[20];
1006 sprintf(messagePtrString, "%p", request);
1007 String uniqueMessageId = messagePtrString;
1008
1009 //
1010 // Set up the OutstandingRequestEntry for this request
1011 //
1012 Semaphore waitSemaphore(0);
1013 OutstandingRequestEntry outstandingRequestEntry(
|
1014 kumpf 1.26 uniqueMessageId, request, response, &waitSemaphore);
|
1015 kumpf 1.1
1016 //
1017 // Lock the Provider Agent Container while initializing the
1018 // agent and writing the request to the connection
1019 //
1020 {
1021 AutoMutex lock(_agentMutex);
1022
1023 //
1024 // Initialize the Provider Agent, if necessary
1025 //
1026 if (!_isInitialized)
1027 {
1028 _initialize();
1029 }
1030
1031 //
1032 // Add an entry to the OutstandingRequestTable for this request
1033 //
1034 {
1035 AutoMutex tableLock(_outstandingRequestTableMutex);
1036 kumpf 1.1
1037 _outstandingRequestTable.insert(
1038 uniqueMessageId, &outstandingRequestEntry);
1039 }
1040
|
1041 kumpf 1.2 // Get the provider module from the ProviderIdContainer to see if
1042 // we can optimize out the transmission of this instance to the
1043 // Provider Agent. (See the _providerModuleCache description.)
1044 try
1045 {
1046 ProviderIdContainer pidc = request->operationContext.get(
1047 ProviderIdContainer::NAME);
1048 origProviderId.reset(new ProviderIdContainer(
1049 pidc.getModule(), pidc.getProvider(),
1050 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1051 if (_providerModuleCache.isUninitialized() ||
1052 (!pidc.getModule().identical(_providerModuleCache)))
1053 {
1054 // We haven't sent this provider module instance to the
1055 // Provider Agent yet. Update our cache after we send it.
1056 updateProviderModuleCache = true;
1057 }
1058 else
1059 {
1060 // Replace the provider module in the ProviderIdContainer
1061 // with an uninitialized instance. We'll need to put the
1062 kumpf 1.2 // original one back after the message is sent.
1063 request->operationContext.set(ProviderIdContainer(
1064 CIMInstance(), pidc.getProvider(),
1065 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1066 doProviderModuleOptimization = true;
1067 }
1068 }
1069 catch (...)
1070 {
1071 // No ProviderIdContainer to optimize
1072 }
1073
|
1074 kumpf 1.1 //
1075 // Write the message to the pipe
1076 //
1077 try
1078 {
1079 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3,
1080 String("Sending request to agent with messageId ") +
1081 uniqueMessageId);
1082
1083 request->messageId = uniqueMessageId;
1084 AnonymousPipe::Status writeStatus =
1085 _pipeToAgent->writeMessage(request);
1086 request->messageId = originalMessageId;
1087
|
1088 kumpf 1.2 if (doProviderModuleOptimization)
1089 {
1090 request->operationContext.set(*origProviderId.get());
1091 }
1092
|
1093 kumpf 1.1 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
1094 {
1095 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1096 "Failed to write message to pipe. writeStatus = %d.",
1097 writeStatus);
|
1098 kumpf 1.25
1099 request->messageId = originalMessageId;
1100
1101 if (doProviderModuleOptimization)
1102 {
1103 request->operationContext.set(*origProviderId.get());
1104 }
1105
1106 // Remove this OutstandingRequestTable entry
1107 {
1108 AutoMutex tableLock(_outstandingRequestTableMutex);
1109 Boolean removed =
1110 _outstandingRequestTable.remove(uniqueMessageId);
1111 PEGASUS_ASSERT(removed);
1112 }
1113
1114 // A response value of _REQUEST_NOT_PROCESSED indicates
1115 // that the request was not processed by the provider
1116 // agent, so it can be retried safely.
1117 PEG_METHOD_EXIT();
1118 return _REQUEST_NOT_PROCESSED;
|
1119 kumpf 1.1 }
|
1120 kumpf 1.2
1121 if (updateProviderModuleCache)
1122 {
1123 _providerModuleCache = origProviderId->getModule();
1124 }
|
1125 kumpf 1.1 }
1126 catch (...)
1127 {
1128 request->messageId = originalMessageId;
|
1129 kumpf 1.2
1130 if (doProviderModuleOptimization)
1131 {
1132 request->operationContext.set(*origProviderId.get());
1133 }
1134
|
1135 kumpf 1.1 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1136 "Failed to write message to pipe.");
1137 // Remove the OutstandingRequestTable entry for this request
1138 {
1139 AutoMutex tableLock(_outstandingRequestTableMutex);
1140 Boolean removed =
1141 _outstandingRequestTable.remove(uniqueMessageId);
1142 PEGASUS_ASSERT(removed);
1143 }
|
1144 kumpf 1.25 PEG_METHOD_EXIT();
|
1145 kumpf 1.1 throw;
1146 }
1147 }
1148
1149 //
1150 // Wait for the response
1151 //
1152 try
1153 {
1154 // Must not hold _agentMutex while waiting for the response
1155 waitSemaphore.wait();
1156 }
1157 catch (...)
1158 {
1159 // Remove the OutstandingRequestTable entry for this request
1160 {
1161 AutoMutex tableLock(_outstandingRequestTableMutex);
1162 Boolean removed =
1163 _outstandingRequestTable.remove(uniqueMessageId);
1164 PEGASUS_ASSERT(removed);
1165 }
|
1166 kumpf 1.25 PEG_METHOD_EXIT();
|
1167 kumpf 1.1 throw;
1168 }
1169
|
1170 kumpf 1.22 // A response value of _REQUEST_NOT_PROCESSED indicates that the
1171 // provider agent process was terminating when the request was sent.
1172 // The request was not processed by the provider agent, so it can be
1173 // retried safely.
1174 if (response == _REQUEST_NOT_PROCESSED)
1175 {
|
1176 kumpf 1.25 PEG_METHOD_EXIT();
|
1177 kumpf 1.22 return response;
1178 }
1179
|
1180 kumpf 1.1 // A null response is returned when an agent connection is closed
1181 // while requests remain outstanding.
1182 if (response == 0)
1183 {
1184 response = request->buildResponse();
1185 response->cimException = PEGASUS_CIM_EXCEPTION(
1186 CIM_ERR_FAILED,
1187 MessageLoaderParms(
1188 "ProviderManager.OOPProviderManagerRouter."
1189 "CIMPROVAGT_CONNECTION_LOST",
1190 "Lost connection with cimprovagt \"$0\".",
1191 _moduleName));
1192 }
1193 }
1194 catch (CIMException& e)
1195 {
1196 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1197 String("Caught exception: ") + e.getMessage());
1198 response = request->buildResponse();
1199 response->cimException = e;
1200 }
1201 kumpf 1.1 catch (Exception& e)
1202 {
1203 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1204 String("Caught exception: ") + e.getMessage());
1205 response = request->buildResponse();
1206 response->cimException = PEGASUS_CIM_EXCEPTION(
1207 CIM_ERR_FAILED, e.getMessage());
1208 }
1209 catch (...)
1210 {
1211 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1212 "Caught unknown exception");
1213 response = request->buildResponse();
1214 response->cimException = PEGASUS_CIM_EXCEPTION(
1215 CIM_ERR_FAILED, String::EMPTY);
1216 }
1217
1218 response->messageId = originalMessageId;
1219
1220 PEG_METHOD_EXIT();
1221 return response;
1222 kumpf 1.1 }
1223
1224 void ProviderAgentContainer::unloadIdleProviders()
1225 {
1226 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
|
1227 carolann.graves 1.4 "ProviderAgentContainer::unloadIdleProviders");
|
1228 kumpf 1.1
1229 AutoMutex lock(_agentMutex);
1230 if (_isInitialized)
1231 {
1232 // Send a "wake up" message to the Provider Agent.
1233 // Don't bother checking whether the operation is successful.
1234 Uint32 messageLength = 0;
1235 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
1236 }
1237
1238 PEG_METHOD_EXIT();
1239 }
1240
1241 void ProviderAgentContainer::_processResponses()
1242 {
1243 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1244 "ProviderAgentContainer::_processResponses");
1245
1246 //
1247 // Process responses until the pipe is closed
1248 //
1249 kumpf 1.1 while (1)
1250 {
1251 try
1252 {
1253 CIMMessage* message;
1254
1255 //
1256 // Read a response from the Provider Agent
1257 //
1258 AnonymousPipe::Status readStatus =
1259 _pipeFromAgent->readMessage(message);
1260
1261 // Ignore interrupts
1262 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
1263 {
1264 continue;
1265 }
1266
1267 // Handle an error the same way as a closed connection
1268 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
1269 (readStatus == AnonymousPipe::STATUS_CLOSED))
1270 kumpf 1.1 {
1271 AutoMutex lock(_agentMutex);
|
1272 kumpf 1.22 _uninitialize(false);
1273 return;
1274 }
1275
1276 // A null message indicates that the provider agent process has
1277 // finished its processing and is ready to exit.
1278 if (message == 0)
1279 {
1280 AutoMutex lock(_agentMutex);
1281 _uninitialize(true);
|
1282 kumpf 1.1 return;
1283 }
1284
1285 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
1286 {
1287 // Forward indications to the indication callback
1288 _indicationCallback(
1289 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
1290 message));
1291 }
|
1292 kumpf 1.26 else if (!message->isComplete())
1293 {
1294 CIMResponseMessage* response;
1295 response = dynamic_cast<CIMResponseMessage*>(message);
1296 PEGASUS_ASSERT(response != 0);
1297
1298 // Get the OutstandingRequestEntry for this response chunk
1299 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1300 {
1301 AutoMutex tableLock(_outstandingRequestTableMutex);
1302 Boolean foundEntry = _outstandingRequestTable.lookup(
1303 response->messageId, _outstandingRequestEntry);
1304 PEGASUS_ASSERT(foundEntry);
1305 }
1306
1307 // Put the original message ID into the response
1308 response->messageId =
1309 _outstandingRequestEntry->requestMessage->messageId;
1310
1311 // Call the response chunk callback to process the chunk
1312 _responseChunkCallback(
1313 kumpf 1.26 _outstandingRequestEntry->requestMessage, response);
1314 }
|
1315 kumpf 1.1 else
1316 {
1317 CIMResponseMessage* response;
1318 response = dynamic_cast<CIMResponseMessage*>(message);
1319 PEGASUS_ASSERT(response != 0);
1320
1321 // Give the response to the waiting OutstandingRequestEntry
1322 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1323 {
1324 AutoMutex tableLock(_outstandingRequestTableMutex);
1325 Boolean foundEntry = _outstandingRequestTable.lookup(
1326 response->messageId, _outstandingRequestEntry);
1327 PEGASUS_ASSERT(foundEntry);
1328
1329 // Remove the completed request from the table
1330 Boolean removed =
1331 _outstandingRequestTable.remove(response->messageId);
1332 PEGASUS_ASSERT(removed);
1333 }
1334
1335 _outstandingRequestEntry->responseMessage = response;
1336 kumpf 1.1 _outstandingRequestEntry->responseReady->signal();
1337 }
1338 }
1339 catch (Exception& e)
1340 {
1341 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1342 String("Ignoring exception: ") + e.getMessage());
1343 }
1344 catch (...)
1345 {
1346 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1347 "Ignoring exception");
1348 }
1349 }
1350
1351 }
1352
1353 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
1354 ProviderAgentContainer::_responseProcessor(void* arg)
1355 {
1356 ProviderAgentContainer* pa =
1357 kumpf 1.1 reinterpret_cast<ProviderAgentContainer*>(arg);
1358
1359 pa->_processResponses();
1360
1361 return(PEGASUS_THREAD_RETURN(0));
1362 }
1363
1364 /////////////////////////////////////////////////////////////////////////////
1365 // OOPProviderManagerRouter
1366 /////////////////////////////////////////////////////////////////////////////
1367
1368 OOPProviderManagerRouter::OOPProviderManagerRouter(
|
1369 kumpf 1.26 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
1370 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback)
|
1371 kumpf 1.1 {
1372 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1373 "OOPProviderManagerRouter::OOPProviderManagerRouter");
1374
1375 _indicationCallback = indicationCallback;
|
1376 kumpf 1.26 _responseChunkCallback = responseChunkCallback;
|
1377 carolann.graves 1.14 _subscriptionInitComplete = false;
|
1378 kumpf 1.1
1379 PEG_METHOD_EXIT();
1380 }
1381
1382 OOPProviderManagerRouter::~OOPProviderManagerRouter()
1383 {
1384 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1385 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
1386
1387 try
1388 {
1389 // Clean up the ProviderAgentContainers
1390 AutoMutex lock(_providerAgentTableMutex);
1391 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1392 for(; i != 0; i++)
1393 {
1394 delete i.value();
1395 }
1396 }
1397 catch (...) {}
1398
1399 kumpf 1.1 PEG_METHOD_EXIT();
1400 }
1401
1402 Message* OOPProviderManagerRouter::processMessage(Message* message)
1403 {
1404 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1405 "OOPProviderManagerRouter::processMessage");
1406
1407 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
1408 PEGASUS_ASSERT(request != 0);
1409
1410 AutoPtr<CIMResponseMessage> response;
1411
1412 //
1413 // Get the provider information from the request
1414 //
1415 CIMInstance providerModule;
1416
1417 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
1418 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
1419 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
1420 kumpf 1.1 {
1421 // Provider information is in the OperationContext
1422 ProviderIdContainer pidc = (ProviderIdContainer)
1423 request->operationContext.get(ProviderIdContainer::NAME);
1424 providerModule = pidc.getModule();
1425 }
1426 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1427 {
1428 CIMEnableModuleRequestMessage* emReq =
1429 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
1430 providerModule = emReq->providerModule;
1431 }
1432 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1433 {
1434 CIMDisableModuleRequestMessage* dmReq =
1435 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
1436 providerModule = dmReq->providerModule;
1437 }
1438 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
|
1439 carolann.graves 1.14 (request->getType() ==
1440 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
|
1441 kumpf 1.1 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1442 {
1443 // This operation is not provider-specific
1444 }
1445 else
1446 {
1447 // Unrecognized message type. This should never happen.
1448 PEGASUS_ASSERT(0);
1449 response.reset(request->buildResponse());
1450 response->cimException = PEGASUS_CIM_EXCEPTION(
1451 CIM_ERR_FAILED, "Unrecognized message type.");
1452 PEG_METHOD_EXIT();
1453 return response.release();
1454 }
1455
1456 //
1457 // Process the request message
1458 //
1459 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1460 {
1461 // Forward the CIMStopAllProvidersRequest to all providers
1462 kumpf 1.1 response.reset(_forwardRequestToAllAgents(request));
1463
1464 // Note: Do not uninitialize the ProviderAgentContainers here.
1465 // Just let the selecting thread notice when the agent connections
1466 // are closed.
1467 }
|
1468 carolann.graves 1.14 else if (request->getType () ==
1469 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1470 {
1471 _subscriptionInitComplete = true;
1472
1473 //
1474 // Forward the CIMSubscriptionInitCompleteRequestMessage to
1475 // all providers
1476 //
1477 response.reset (_forwardRequestToAllAgents (request));
1478 }
|
1479 kumpf 1.1 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1480 {
1481 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1482 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1483 PEGASUS_ASSERT(notifyRequest != 0);
1484
1485 if (notifyRequest->currentValueModified)
1486 {
1487 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1488 response.reset(_forwardRequestToAllAgents(request));
1489 }
1490 else
1491 {
1492 // No need to notify provider agents about changes to planned value
1493 response.reset(request->buildResponse());
1494 }
1495 }
|
1496 kumpf 1.6 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
|
1497 kumpf 1.1 {
|
1498 kumpf 1.6 // Fan out the request to all Provider Agent processes for this module
1499
|
1500 kumpf 1.1 // Retrieve the provider module name
1501 String moduleName;
1502 CIMValue nameValue = providerModule.getProperty(
1503 providerModule.findProperty("Name")).getValue();
1504 nameValue.get(moduleName);
1505
|
1506 kumpf 1.6 // Look up the Provider Agents for this module
1507 Array<ProviderAgentContainer*> paArray =
1508 _lookupProviderAgents(moduleName);
|
1509 kumpf 1.1
|
1510 kumpf 1.6 for (Uint32 i=0; i<paArray.size(); i++)
|
1511 kumpf 1.1 {
1512 //
1513 // Do not start up an agent process just to disable the module
1514 //
|
1515 kumpf 1.6 if (paArray[i]->isInitialized())
1516 {
1517 //
1518 // Forward the request to the provider agent
1519 //
1520 response.reset(paArray[i]->processMessage(request));
1521
1522 // Note: Do not uninitialize the ProviderAgentContainer here
1523 // when a disable module operation is successful. Just let the
1524 // selecting thread notice when the agent connection is closed.
1525
1526 // Determine the success of the disable module operation
1527 CIMDisableModuleResponseMessage* dmResponse =
1528 dynamic_cast<CIMDisableModuleResponseMessage*>(
1529 response.get());
1530 PEGASUS_ASSERT(dmResponse != 0);
1531
1532 Boolean isStopped = false;
1533 for (Uint32 i=0; i < dmResponse->operationalStatus.size(); i++)
1534 {
1535 if (dmResponse->operationalStatus[i] ==
1536 kumpf 1.6 CIM_MSE_OPSTATUS_VALUE_STOPPED)
1537 {
1538 isStopped = true;
1539 break;
1540 }
1541 }
1542
1543 // If the operation is unsuccessful, stop and return the error
1544 if ((dmResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1545 !isStopped)
1546 {
1547 break;
1548 }
1549 }
1550 }
1551
1552 // Use a default response if no Provider Agents were called
1553 if (!response.get())
1554 {
|
1555 kumpf 1.1 response.reset(request->buildResponse());
1556
1557 CIMDisableModuleResponseMessage* dmResponse =
1558 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1559 PEGASUS_ASSERT(dmResponse != 0);
1560
1561 Array<Uint16> operationalStatus;
1562 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1563 dmResponse->operationalStatus = operationalStatus;
1564 }
|
1565 kumpf 1.6 }
1566 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1567 {
1568 // Fan out the request to all Provider Agent processes for this module
1569
1570 // Retrieve the provider module name
1571 String moduleName;
1572 CIMValue nameValue = providerModule.getProperty(
1573 providerModule.findProperty("Name")).getValue();
1574 nameValue.get(moduleName);
1575
1576 // Look up the Provider Agents for this module
1577 Array<ProviderAgentContainer*> paArray =
1578 _lookupProviderAgents(moduleName);
1579
1580 for (Uint32 i=0; i<paArray.size(); i++)
|
1581 kumpf 1.1 {
1582 //
1583 // Do not start up an agent process just to enable the module
1584 //
|
1585 kumpf 1.6 if (paArray[i]->isInitialized())
1586 {
1587 //
1588 // Forward the request to the provider agent
1589 //
1590 response.reset(paArray[i]->processMessage(request));
1591
1592 // Determine the success of the enable module operation
1593 CIMEnableModuleResponseMessage* emResponse =
1594 dynamic_cast<CIMEnableModuleResponseMessage*>(
1595 response.get());
1596 PEGASUS_ASSERT(emResponse != 0);
1597
1598 Boolean isOk = false;
1599 for (Uint32 i=0; i < emResponse->operationalStatus.size(); i++)
1600 {
1601 if (emResponse->operationalStatus[i] ==
1602 CIM_MSE_OPSTATUS_VALUE_OK)
1603 {
1604 isOk = true;
1605 break;
1606 kumpf 1.6 }
1607 }
1608
1609 // If the operation is unsuccessful, stop and return the error
1610 if ((emResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1611 !isOk)
1612 {
1613 break;
1614 }
1615 }
1616 }
1617
1618 // Use a default response if no Provider Agents were called
1619 if (!response.get())
1620 {
|
1621 kumpf 1.1 response.reset(request->buildResponse());
1622
1623 CIMEnableModuleResponseMessage* emResponse =
1624 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1625 PEGASUS_ASSERT(emResponse != 0);
1626
1627 Array<Uint16> operationalStatus;
1628 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1629 emResponse->operationalStatus = operationalStatus;
1630 }
|
1631 kumpf 1.6 }
1632 else
1633 {
1634 // Retrieve the provider module name
1635 String moduleName;
1636 CIMValue nameValue = providerModule.getProperty(
1637 providerModule.findProperty("Name")).getValue();
1638 nameValue.get(moduleName);
1639
1640 // Retrieve the provider user context configuration
1641 Uint16 userContext = 0;
1642 Uint32 pos = providerModule.findProperty(
1643 PEGASUS_PROPERTYNAME_MODULE_USERCONTEXT);
1644 if (pos != PEG_NOT_FOUND)
1645 {
|
1646 kumpf 1.12 CIMValue userContextValue =
1647 providerModule.getProperty(pos).getValue();
1648 if (!userContextValue.isNull())
1649 {
1650 userContextValue.get(userContext);
1651 }
|
1652 kumpf 1.6 }
1653
1654 if (userContext == 0)
1655 {
1656 userContext = PG_PROVMODULE_USERCTXT_PRIVILEGED;
1657 }
1658
1659 String userName;
1660
1661 if (userContext == PG_PROVMODULE_USERCTXT_REQUESTOR)
|
1662 kumpf 1.1 {
|
1663 kumpf 1.6 try
1664 {
1665 // User Name is in the OperationContext
1666 IdentityContainer ic = (IdentityContainer)
1667 request->operationContext.get(IdentityContainer::NAME);
1668 userName = ic.getUserName();
1669 }
1670 catch (Exception& e)
1671 {
1672 // If no IdentityContainer is present, default to the CIM
1673 // Server's user context
1674 }
|
1675 kumpf 1.1
|
1676 kumpf 1.6 // If authentication is disabled, use the CIM Server's user context
1677 if (!userName.size())
1678 {
1679 userName = System::getEffectiveUserName();
1680 }
1681 }
1682 else if (userContext == PG_PROVMODULE_USERCTXT_DESIGNATED)
1683 {
1684 // Retrieve the provider module name
1685 providerModule.getProperty(providerModule.findProperty(
1686 PEGASUS_PROPERTYNAME_MODULE_DESIGNATEDUSER)).getValue().
1687 get(userName);
1688 }
1689 else if (userContext == PG_PROVMODULE_USERCTXT_CIMSERVER)
1690 {
1691 userName = System::getEffectiveUserName();
1692 }
1693 else // Privileged User
1694 {
1695 PEGASUS_ASSERT(userContext == PG_PROVMODULE_USERCTXT_PRIVILEGED);
1696 userName = System::getPrivilegedUserName();
|
1697 kumpf 1.1 }
|
1698 kumpf 1.6
1699 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1700 "Module name = " + moduleName);
1701 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1702 "User context = %hd.", userContext);
1703 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1704 "User name = " + userName);
1705
1706 // Look up the Provider Agent for this module and user
1707 ProviderAgentContainer* pa = _lookupProviderAgent(moduleName, userName);
1708 PEGASUS_ASSERT(pa != 0);
1709
1710 //
1711 // Forward the request to the provider agent
1712 //
1713 response.reset(pa->processMessage(request));
|
1714 kumpf 1.1 }
1715
1716 response->syncAttributes(request);
1717
1718 PEG_METHOD_EXIT();
1719 return response.release();
1720 }
1721
1722 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
|
1723 kumpf 1.6 const String& moduleName,
1724 const String& userName)
|
1725 kumpf 1.1 {
1726 ProviderAgentContainer* pa = 0;
|
1727 kumpf 1.6 String key = moduleName + ":" + userName;
|
1728 kumpf 1.1
1729 AutoMutex lock(_providerAgentTableMutex);
|
1730 kumpf 1.6 if (!_providerAgentTable.lookup(key, pa))
|
1731 kumpf 1.1 {
|
1732 kumpf 1.6 pa = new ProviderAgentContainer(
|
1733 kumpf 1.26 moduleName, userName, _indicationCallback, _responseChunkCallback,
|
1734 carolann.graves 1.14 _subscriptionInitComplete);
|
1735 kumpf 1.6 _providerAgentTable.insert(key, pa);
|
1736 kumpf 1.1 }
1737 return pa;
1738 }
1739
|
1740 kumpf 1.6 Array<ProviderAgentContainer*> OOPProviderManagerRouter::_lookupProviderAgents(
1741 const String& moduleName)
1742 {
1743 Array<ProviderAgentContainer*> paArray;
1744
1745 AutoMutex lock(_providerAgentTableMutex);
1746 for (ProviderAgentTable::Iterator i = _providerAgentTable.start(); i; i++)
1747 {
1748 if (i.value()->getModuleName() == moduleName)
1749 {
1750 paArray.append(i.value());
1751 }
1752 }
1753 return paArray;
1754 }
1755
|
1756 kumpf 1.1 CIMResponseMessage* OOPProviderManagerRouter::_forwardRequestToAllAgents(
1757 CIMRequestMessage* request)
1758 {
1759 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1760 "OOPProviderManagerRouter::_forwardRequestToAllAgents");
1761
1762 // Get a list of the ProviderAgentContainers. We need our own array copy
1763 // because we cannot hold the _providerAgentTableMutex while calling
1764 // _ProviderAgentContainer::processMessage().
1765 Array<ProviderAgentContainer*> paContainerArray;
1766 {
1767 AutoMutex tableLock(_providerAgentTableMutex);
1768 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1769 i != 0; i++)
1770 {
1771 paContainerArray.append(i.value());
1772 }
1773 }
1774
1775 CIMException responseException;
1776
1777 kumpf 1.1 // Forward the request to each of the initialized provider agents
1778 for (Uint32 j = 0; j < paContainerArray.size(); j++)
1779 {
1780 ProviderAgentContainer* pa = paContainerArray[j];
1781 if (pa->isInitialized())
1782 {
1783 // Note: The ProviderAgentContainer could become uninitialized
1784 // before _ProviderAgentContainer::processMessage() processes
1785 // this request. In this case, the Provider Agent process will
1786 // (unfortunately) be started to process this message.
1787 AutoPtr<CIMResponseMessage> response;
1788 response.reset(pa->processMessage(request));
1789 if (response.get() != 0)
1790 {
1791 // If the operation failed, save the exception data
1792 if ((response->cimException.getCode() != CIM_ERR_SUCCESS) &&
1793 (responseException.getCode() == CIM_ERR_SUCCESS))
1794 {
1795 responseException = response->cimException;
1796 }
1797 }
1798 kumpf 1.1 }
1799 }
1800
1801 CIMResponseMessage* response = request->buildResponse();
1802 response->cimException = responseException;
1803
1804 PEG_METHOD_EXIT();
1805 return response;
1806 }
1807
1808 Boolean OOPProviderManagerRouter::hasActiveProviders()
1809 {
1810 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1811 "OOPProviderManagerRouter::hasActiveProviders");
1812
1813 // Iterate through the _providerAgentTable looking for initialized agents
1814 AutoMutex lock(_providerAgentTableMutex);
1815 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1816 for(; i != 0; i++)
1817 {
1818 if (i.value()->isInitialized())
1819 kumpf 1.1 {
1820 PEG_METHOD_EXIT();
1821 return true;
1822 }
1823 }
1824
1825 // No initialized Provider Agents were found
1826 PEG_METHOD_EXIT();
1827 return false;
1828 }
1829
1830 void OOPProviderManagerRouter::unloadIdleProviders()
1831 {
1832 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1833 "OOPProviderManagerRouter::unloadIdleProviders");
1834
1835 // Iterate through the _providerAgentTable unloading idle providers
1836 AutoMutex lock(_providerAgentTableMutex);
1837 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1838 for(; i != 0; i++)
1839 {
1840 kumpf 1.1 i.value()->unloadIdleProviders();
1841 }
1842
1843 PEG_METHOD_EXIT();
1844 }
1845
1846 PEGASUS_NAMESPACE_END
|