1 karl 1.5 //%2004////////////////////////////////////////////////////////////////////////
|
2 kumpf 1.1 //
|
3 karl 1.5 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 kumpf 1.1 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.5 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 kumpf 1.1 //
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
11 // of this software and associated documentation files (the "Software"), to
12 // deal in the Software without restriction, including without limitation the
13 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
14 // sell copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
|
16 karl 1.5 //
|
17 kumpf 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
18 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
19 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
20 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
21 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 //
26 //==============================================================================
27 //
28 // Author: Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
29 // Jenny Yu, Hewlett-Packard Company (jenny_yu@hp.com)
30 //
31 // Modified By:
32 //
33 //%/////////////////////////////////////////////////////////////////////////////
34
35 #include <Pegasus/Common/Config.h>
36 #include <Pegasus/Common/Constants.h>
37 #include <Pegasus/Common/AutoPtr.h>
38 kumpf 1.1 #include <Pegasus/Common/ArrayInternal.h>
39 #include <Pegasus/Common/CIMMessage.h>
40 #include <Pegasus/Common/CIMMessageSerializer.h>
41 #include <Pegasus/Common/CIMMessageDeserializer.h>
42 #include <Pegasus/Common/OperationContextInternal.h>
43 #include <Pegasus/Common/System.h>
44 #include <Pegasus/Common/AnonymousPipe.h>
45 #include <Pegasus/Common/Tracer.h>
46 #include <Pegasus/Common/Logger.h>
47 #include <Pegasus/Common/Thread.h>
48 #include <Pegasus/Common/MessageQueueService.h>
49 #include <Pegasus/Config/ConfigManager.h>
50
51 #if defined (PEGASUS_OS_TYPE_WINDOWS)
52 #include <windows.h> // For CreateProcess()
53 #else
54 # if defined (PEGASUS_OS_OS400)
55 # include <unistd.cleinc>
56 # else
57 # include <unistd.h> // For fork(), exec(), and _exit()
58 # endif
59 kumpf 1.1 #include <errno.h>
60 #endif
61
62 #include "OOPProviderManagerRouter.h"
63
64 PEGASUS_USING_STD;
65
66 PEGASUS_NAMESPACE_BEGIN
67
68 /////////////////////////////////////////////////////////////////////////////
69 // OutstandingRequestTable and OutstandingRequestEntry
70 /////////////////////////////////////////////////////////////////////////////
71
72 /**
73 An OutstandingRequestEntry represents a request message sent to a
74 Provider Agent for which no response has been received. The request
75 sender provides the message ID and a location for the response to be
76 returned, and then waits on the semaphore. When a response matching
77 the message ID is received, it is placed into the specified location
78 and the semaphore is signaled.
79 */
80 kumpf 1.1 class OutstandingRequestEntry
81 {
82 public:
83 OutstandingRequestEntry(
84 String messageId_,
85 CIMResponseMessage*& responseMessage_,
86 Semaphore* responseReady_)
87 : messageId(messageId_),
88 responseMessage(responseMessage_),
89 responseReady(responseReady_)
90 {
91 }
92
93 String messageId;
94 CIMResponseMessage*& responseMessage;
95 Semaphore* responseReady;
96 };
97
98 typedef HashTable<String, OutstandingRequestEntry*, EqualFunc<String>,
99 HashFunc<String> > OutstandingRequestTable;
100
101 kumpf 1.1
102 /////////////////////////////////////////////////////////////////////////////
103 // ProviderAgentContainer
104 /////////////////////////////////////////////////////////////////////////////
105
106 class ProviderAgentContainer
107 {
108 public:
109 ProviderAgentContainer(
110 const String & moduleName,
111 PEGASUS_INDICATION_CALLBACK indicationCallback);
112
113 ~ProviderAgentContainer();
114
115 Boolean isInitialized();
116
117 CIMResponseMessage* processMessage(CIMRequestMessage* request);
118 void unloadIdleProviders();
119
120 private:
121 //
122 kumpf 1.1 // Private methods
123 //
124
125 /** Unimplemented */
126 ProviderAgentContainer();
127 /** Unimplemented */
128 ProviderAgentContainer(const ProviderAgentContainer& pa);
129 /** Unimplemented */
130 ProviderAgentContainer& operator=(const ProviderAgentContainer& pa);
131
132 /**
133 Start a Provider Agent process and establish a pipe connection with it.
134 Note: The caller must lock the _agentMutex.
135 */
136 void _startAgentProcess();
137
138 /**
139 Send initialization data to the Provider Agent.
140 Note: The caller must lock the _agentMutex.
141 */
142 void _sendInitializationData();
143 kumpf 1.1
144 /**
145 Initialize the ProviderAgentContainer if it is not already
146 initialized. Initialization includes starting the Provider Agent
147 process, establishing a pipe connection with it, and starting a
148 thread to read response messages from the Provider Agent.
149
150 Note: The caller must lock the _agentMutex.
151 */
152 void _initialize();
153
154 /**
155 Uninitialize the ProviderAgentContainer if it is initialized.
156 The connection is closed and outstanding requests are completed
157 with an error result.
158
159 Note: The caller must lock the _agentMutex.
160 */
161 void _uninitialize();
162
163 /**
164 kumpf 1.1 Read and process response messages from the Provider Agent until
165 the connection is closed.
166 */
167 void _processResponses();
168 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
169 _responseProcessor(void* arg);
170
171 //
172 // Private data
173 //
174
175 /**
176 The _agentMutex must be locked whenever writing to the Provider
177 Agent connection, accessing the _isInitialized flag, or changing
178 the Provider Agent state.
179 */
180 Mutex _agentMutex;
181
182 /**
183 Name of the provider module served by this Provider Agent.
184 */
185 kumpf 1.1 String _moduleName;
186
187 /**
188 Callback function to which all generated indications are sent for
189 processing.
190 */
191 PEGASUS_INDICATION_CALLBACK _indicationCallback;
192
193 /**
194 Indicates whether the Provider Agent is active.
195 */
196 Boolean _isInitialized;
197
198 /**
199 Pipe connection used to read responses from the Provider Agent.
200 */
201 AutoPtr<AnonymousPipe> _pipeFromAgent;
202 /**
203 Pipe connection used to write requests to the Provider Agent.
204 */
205 AutoPtr<AnonymousPipe> _pipeToAgent;
206 kumpf 1.1
207 /**
208 The _outstandingRequestTable holds an entry for each request that has
209 been sent to this Provider Agent for which no response has been
210 received. Entries are added (by the writing thread) when a request
211 is sent, and are removed (by the reading thread) when the response is
212 received (or when it is determined that no response is forthcoming).
213 */
214 OutstandingRequestTable _outstandingRequestTable;
215 /**
216 The _outstandingRequestTableMutex must be locked whenever reading or
217 updating the _outstandingRequestTable.
218 */
219 Mutex _outstandingRequestTableMutex;
|
220 kumpf 1.2
221 /**
222 Holds the last provider module instance sent to the Provider Agent in
223 a ProviderIdContainer. Since the provider module instance rarely
224 changes, an optimization is used to send it only when it differs from
225 the last provider module instance sent.
226 */
227 CIMInstance _providerModuleCache;
|
228 kumpf 1.1 };
229
230 ProviderAgentContainer::ProviderAgentContainer(
231 const String & moduleName,
232 PEGASUS_INDICATION_CALLBACK indicationCallback)
233 : _moduleName(moduleName),
234 _indicationCallback(indicationCallback),
235 _isInitialized(false)
236 {
237 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
238 "ProviderAgentContainer::ProviderAgentContainer");
239 PEG_METHOD_EXIT();
240 }
241
242 ProviderAgentContainer::~ProviderAgentContainer()
243 {
244 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
245 "ProviderAgentContainer::~ProviderAgentContainer");
246
247 // Ensure the destructor does not throw an exception
248 try
249 kumpf 1.1 {
250 // Stop the responseProcessor thread by closing its connection
251 _pipeFromAgent->closeReadHandle();
252
253 // Wait for the responseProcessor thread to exit
254 while (isInitialized())
255 {
256 pegasus_yield();
257 }
258 }
259 catch (...)
260 {
261 }
262
263 PEG_METHOD_EXIT();
264 }
265
266 void ProviderAgentContainer::_startAgentProcess()
267 {
268 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
269 "ProviderAgentContainer::_startAgentProcess");
270 kumpf 1.1
271 AutoPtr<AnonymousPipe> pipeFromAgent(new AnonymousPipe());
272 AutoPtr<AnonymousPipe> pipeToAgent(new AnonymousPipe());
273
274 //
275 // Start a cimprovagt process for this provider module
276 //
277
278 #if defined (PEGASUS_OS_TYPE_WINDOWS)
279 //
280 // Set up members of the PROCESS_INFORMATION structure
281 //
282 PROCESS_INFORMATION piProcInfo;
283 ZeroMemory (&piProcInfo, sizeof (PROCESS_INFORMATION));
284
285 //
286 // Set up members of the STARTUPINFO structure
287 //
288 STARTUPINFO siStartInfo;
289 ZeroMemory (&siStartInfo, sizeof (STARTUPINFO));
290 siStartInfo.cb = sizeof (STARTUPINFO);
291 kumpf 1.1
292 //
293 // Generate the command line
294 //
295 char cmdLine[2048];
296 char readHandle[32];
297 char writeHandle[32];
298 pipeToAgent->exportReadHandle(readHandle);
299 pipeFromAgent->exportWriteHandle(writeHandle);
300
301 sprintf(cmdLine, "\"%s\" %s %s \"%s\"",
302 (const char*)ConfigManager::getHomedPath(
303 PEGASUS_PROVIDER_AGENT_PROC_NAME).getCString(),
304 readHandle, writeHandle, (const char*)_moduleName.getCString());
305
306 //
307 // Create the child process
308 //
309 if (!CreateProcess (
310 NULL, //
311 cmdLine, // command line
312 kumpf 1.1 NULL, // process security attributes
313 NULL, // primary thread security attributes
314 TRUE, // handles are inherited
315 0, // creation flags
316 NULL, // use parent's environment
317 NULL, // use parent's current directory
318 &siStartInfo, // STARTUPINFO
319 &piProcInfo)) // PROCESS_INFORMATION
320 {
321 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
322 "CreateProcess() failed. errno = %d.", GetLastError());
323 PEG_METHOD_EXIT();
324 throw Exception(MessageLoaderParms(
325 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
326 "Failed to start cimprovagt \"$0\".",
327 _moduleName));
328 }
329
330 CloseHandle(piProcInfo.hProcess);
331 CloseHandle(piProcInfo.hThread);
332 #else
333 kumpf 1.1 pid_t pid = fork();
334 if (pid < 0)
335 {
336 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
337 "fork() failed. errno = %d.", errno);
338 PEG_METHOD_EXIT();
339 throw Exception(MessageLoaderParms(
340 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
341 "Failed to start cimprovagt \"$0\".",
342 _moduleName));
343 }
344 else if (pid == 0)
345 {
346 //
347 // Child side of the fork
348 //
349
350 try
351 {
352 // Close our copies of the parent's ends of the pipes
353 pipeToAgent->closeWriteHandle();
354 kumpf 1.1 pipeFromAgent->closeReadHandle();
355
356 //
357 // Execute the cimprovagt program
358 //
359 String agentCommandPath =
360 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
361 CString agentCommandPathCString = agentCommandPath.getCString();
362
363 char readHandle[32];
364 char writeHandle[32];
365 pipeToAgent->exportReadHandle(readHandle);
366 pipeFromAgent->exportWriteHandle(writeHandle);
367
368 execl(agentCommandPathCString, agentCommandPathCString,
369 readHandle, writeHandle,
370 (const char*)_moduleName.getCString(), (char*)0);
371
372 // If we're still here, there was an error
373 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
374 "execl() failed. errno = %d.", errno);
375 kumpf 1.1 _exit(1);
376 }
377 catch (...)
378 {
379 // There's not much we can do here in no man's land
380 try
381 {
382 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
383 "Caught exception before calling execl().");
384 }
385 catch (...) {}
386 _exit(1);
387 }
388 }
389 #endif
390
391 //
392 // CIM Server process
393 //
394
395 // Close our copies of the agent's ends of the pipes
396 kumpf 1.1 pipeToAgent->closeReadHandle();
397 pipeFromAgent->closeWriteHandle();
398
399 _pipeToAgent.reset(pipeToAgent.release());
400 _pipeFromAgent.reset(pipeFromAgent.release());
401
402 PEG_METHOD_EXIT();
403 }
404
405 // Note: Caller must lock _agentMutex
406 void ProviderAgentContainer::_sendInitializationData()
407 {
408 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
409 "ProviderAgentContainer::_sendInitializationData");
410
411 //
412 // Gather config properties to pass to the Provider Agent
413 //
414 ConfigManager* configManager = ConfigManager::getInstance();
415 Array<Pair<String, String> > configProperties;
416
417 kumpf 1.1 Array<String> configPropertyNames;
418 configManager->getAllPropertyNames(configPropertyNames, true);
419 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
420 {
421 String configPropertyValue =
422 configManager->getCurrentValue(configPropertyNames[i]);
423 String configPropertyDefaultValue =
424 configManager->getDefaultValue(configPropertyNames[i]);
425 if (configPropertyValue != configPropertyDefaultValue)
426 {
427 configProperties.append(Pair<String, String>(
428 configPropertyNames[i], configPropertyValue));
429 }
430 }
431
432 //
433 // Create a Provider Agent initialization message
434 //
435 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
436 new CIMInitializeProviderAgentRequestMessage(
437 String("0"), // messageId
438 kumpf 1.1 configManager->getPegasusHome(),
439 configProperties,
440 System::bindVerbose,
441 QueueIdStack()));
442
443 //
444 // Write the initialization message to the pipe
445 //
446 AnonymousPipe::Status writeStatus =
447 _pipeToAgent->writeMessage(request.get());
448
449 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
450 {
451 PEG_METHOD_EXIT();
452 throw Exception(MessageLoaderParms(
453 "ProviderManager.OOPProviderManagerRouter."
454 "CIMPROVAGT_COMMUNICATION_FAILED",
455 "Failed to communicate with cimprovagt \"$0\".",
456 _moduleName));
457 }
458
459 kumpf 1.1 // Do not wait for a response from the Provider Agent. (It isn't coming.)
460
461 PEG_METHOD_EXIT();
462 }
463
464 // Note: Caller must lock _agentMutex
465 void ProviderAgentContainer::_initialize()
466 {
467 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
468 "ProviderAgentContainer::_initialize");
469
470 if (_isInitialized)
471 {
472 PEGASUS_ASSERT(0);
473 PEG_METHOD_EXIT();
474 return;
475 }
476
477 try
478 {
479 _startAgentProcess();
480 kumpf 1.1
481 _sendInitializationData();
482
483 _isInitialized = true;
484
485 // Start a thread to read and process responses from the Provider Agent
|
486 denise.eckstein 1.5.2.1 ThreadStatus rtn = PEGASUS_THREAD_OK;
487 while ((rtn = MessageQueueService::get_thread_pool()->
488 allocate_and_awaken(this, _responseProcessor)) !=
489 PEGASUS_THREAD_OK)
|
490 kumpf 1.1 {
|
491 denise.eckstein 1.5.2.1 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
492 {
493 pegasus_yield();
494 }
495 else
496 {
497 Logger::put(
498 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
499 "Not enough threads to process responses from the "
500 "provider agent.");
501
502 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
503 "Could not allocate thread to process responses from the "
504 "provider agent.");
505
506 throw Exception(MessageLoaderParms(
507 "ProviderManager.OOPProviderManagerRouter."
508 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
509 "Failed to allocate thread for cimprovagt \"$0\".",
510 _moduleName));
511 }
|
512 kumpf 1.1 }
513 }
514 catch (...)
515 {
516 _isInitialized = false;
517 _pipeToAgent.reset();
518 _pipeFromAgent.reset();
519 PEG_METHOD_EXIT();
520 throw;
521 }
522
523 PEG_METHOD_EXIT();
524 }
525
526 Boolean ProviderAgentContainer::isInitialized()
527 {
528 AutoMutex lock(_agentMutex);
529 return _isInitialized;
530 }
531
532 // Note: Caller must lock _agentMutex
533 kumpf 1.1 void ProviderAgentContainer::_uninitialize()
534 {
535 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
536 "ProviderAgentContainer::_uninitialize");
537
538 if (!_isInitialized)
539 {
540 PEGASUS_ASSERT(0);
541 PEG_METHOD_EXIT();
542 return;
543 }
544
545 try
546 {
547 // Close the connection with the Provider Agent
548 _pipeFromAgent.reset();
549 _pipeToAgent.reset();
550
|
551 kumpf 1.2 _providerModuleCache = CIMInstance();
552
|
553 kumpf 1.1 _isInitialized = false;
554
555 //
556 // Complete with null responses all outstanding requests on this
557 // connection
558 //
559 {
560 AutoMutex tableLock(_outstandingRequestTableMutex);
561
562 for (OutstandingRequestTable::Iterator i =
563 _outstandingRequestTable.start();
564 i != 0; i++)
565 {
566 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
567 String("Completing messageId \"") + i.value()->messageId +
568 "\" with a null response.");
569 i.value()->responseMessage = 0;
570 i.value()->responseReady->signal();
571 }
572
573 _outstandingRequestTable.clear();
574 kumpf 1.1 }
575 }
576 catch (...)
577 {
578 // We're uninitializing, so do not propagate the exception
579 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
580 "Ignoring _uninitialize() exception.");
581 }
582
583 PEG_METHOD_EXIT();
584 }
585
586 CIMResponseMessage* ProviderAgentContainer::processMessage(
587 CIMRequestMessage* request)
588 {
589 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
590 "ProviderAgentContainer::processMessage");
591
592 CIMResponseMessage* response;
593 String originalMessageId = request->messageId;
594
|
595 kumpf 1.2 // These three variables are used for the provider module optimization.
596 // See the _providerModuleCache member description for more information.
597 AutoPtr<ProviderIdContainer> origProviderId;
598 Boolean doProviderModuleOptimization = false;
599 Boolean updateProviderModuleCache = false;
600
|
601 kumpf 1.1 try
602 {
603 // The messageId attribute is used to correlate response messages
604 // from the Provider Agent with request messages, so it is imperative
605 // that the ID is unique for each request. The incoming ID cannot be
606 // trusted to be unique, so we substitute a unique one. The memory
607 // address of the request is used as the source of a unique piece of
608 // data. (The message ID is only required to be unique while the
609 // request is outstanding.)
610 char messagePtrString[20];
611 sprintf(messagePtrString, "%p", request);
612 String uniqueMessageId = messagePtrString;
613
614 //
615 // Set up the OutstandingRequestEntry for this request
616 //
617 Semaphore waitSemaphore(0);
618 OutstandingRequestEntry outstandingRequestEntry(
619 uniqueMessageId, response, &waitSemaphore);
620
621 //
622 kumpf 1.1 // Lock the Provider Agent Container while initializing the
623 // agent and writing the request to the connection
624 //
625 {
626 AutoMutex lock(_agentMutex);
627
628 //
629 // Initialize the Provider Agent, if necessary
630 //
631 if (!_isInitialized)
632 {
633 _initialize();
634 }
635
636 //
637 // Add an entry to the OutstandingRequestTable for this request
638 //
639 {
640 AutoMutex tableLock(_outstandingRequestTableMutex);
641
642 _outstandingRequestTable.insert(
643 kumpf 1.1 uniqueMessageId, &outstandingRequestEntry);
644 }
645
|
646 kumpf 1.2 // Get the provider module from the ProviderIdContainer to see if
647 // we can optimize out the transmission of this instance to the
648 // Provider Agent. (See the _providerModuleCache description.)
649 try
650 {
651 ProviderIdContainer pidc = request->operationContext.get(
652 ProviderIdContainer::NAME);
653 origProviderId.reset(new ProviderIdContainer(
654 pidc.getModule(), pidc.getProvider(),
655 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
656 if (_providerModuleCache.isUninitialized() ||
657 (!pidc.getModule().identical(_providerModuleCache)))
658 {
659 // We haven't sent this provider module instance to the
660 // Provider Agent yet. Update our cache after we send it.
661 updateProviderModuleCache = true;
662 }
663 else
664 {
665 // Replace the provider module in the ProviderIdContainer
666 // with an uninitialized instance. We'll need to put the
667 kumpf 1.2 // original one back after the message is sent.
668 request->operationContext.set(ProviderIdContainer(
669 CIMInstance(), pidc.getProvider(),
670 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
671 doProviderModuleOptimization = true;
672 }
673 }
674 catch (...)
675 {
676 // No ProviderIdContainer to optimize
677 }
678
|
679 kumpf 1.1 //
680 // Write the message to the pipe
681 //
682 try
683 {
684 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3,
685 String("Sending request to agent with messageId ") +
686 uniqueMessageId);
687
688 request->messageId = uniqueMessageId;
689 AnonymousPipe::Status writeStatus =
690 _pipeToAgent->writeMessage(request);
691 request->messageId = originalMessageId;
692
|
693 kumpf 1.2 if (doProviderModuleOptimization)
694 {
695 request->operationContext.set(*origProviderId.get());
696 }
697
|
698 kumpf 1.1 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
699 {
700 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
701 "Failed to write message to pipe. writeStatus = %d.",
702 writeStatus);
703 throw Exception(MessageLoaderParms(
704 "ProviderManager.OOPProviderManagerRouter."
705 "CIMPROVAGT_COMMUNICATION_FAILED",
706 "Failed to communicate with cimprovagt \"$0\".",
707 _moduleName));
708 }
|
709 kumpf 1.2
710 if (updateProviderModuleCache)
711 {
712 _providerModuleCache = origProviderId->getModule();
713 }
|
714 kumpf 1.1 }
715 catch (...)
716 {
717 request->messageId = originalMessageId;
|
718 kumpf 1.2
719 if (doProviderModuleOptimization)
720 {
721 request->operationContext.set(*origProviderId.get());
722 }
723
|
724 kumpf 1.1 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
725 "Failed to write message to pipe.");
726 // Remove the OutstandingRequestTable entry for this request
727 {
728 AutoMutex tableLock(_outstandingRequestTableMutex);
729 Boolean removed =
730 _outstandingRequestTable.remove(uniqueMessageId);
731 PEGASUS_ASSERT(removed);
732 }
733 throw;
734 }
735 }
736
737 //
738 // Wait for the response
739 //
740 try
741 {
742 // Must not hold _agentMutex while waiting for the response
743 waitSemaphore.wait();
744 }
745 kumpf 1.1 catch (...)
746 {
747 // Remove the OutstandingRequestTable entry for this request
748 {
749 AutoMutex tableLock(_outstandingRequestTableMutex);
750 Boolean removed =
751 _outstandingRequestTable.remove(uniqueMessageId);
752 PEGASUS_ASSERT(removed);
753 }
754 throw;
755 }
756
757 // A null response is returned when an agent connection is closed
758 // while requests remain outstanding.
759 if (response == 0)
760 {
761 response = request->buildResponse();
762 response->cimException = PEGASUS_CIM_EXCEPTION(
763 CIM_ERR_FAILED,
764 MessageLoaderParms(
765 "ProviderManager.OOPProviderManagerRouter."
766 kumpf 1.1 "CIMPROVAGT_CONNECTION_LOST",
767 "Lost connection with cimprovagt \"$0\".",
768 _moduleName));
769 }
770 }
771 catch (CIMException& e)
772 {
773 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
774 String("Caught exception: ") + e.getMessage());
775 response = request->buildResponse();
776 response->cimException = e;
777 }
778 catch (Exception& e)
779 {
780 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
781 String("Caught exception: ") + e.getMessage());
782 response = request->buildResponse();
783 response->cimException = PEGASUS_CIM_EXCEPTION(
784 CIM_ERR_FAILED, e.getMessage());
785 }
786 catch (...)
787 kumpf 1.1 {
788 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
789 "Caught unknown exception");
790 response = request->buildResponse();
791 response->cimException = PEGASUS_CIM_EXCEPTION(
792 CIM_ERR_FAILED, String::EMPTY);
793 }
794
795 response->messageId = originalMessageId;
796
797 PEG_METHOD_EXIT();
798 return response;
799 }
800
801 void ProviderAgentContainer::unloadIdleProviders()
802 {
803 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
|
804 carolann.graves 1.4 "ProviderAgentContainer::unloadIdleProviders");
|
805 kumpf 1.1
806 AutoMutex lock(_agentMutex);
807 if (_isInitialized)
808 {
809 // Send a "wake up" message to the Provider Agent.
810 // Don't bother checking whether the operation is successful.
811 Uint32 messageLength = 0;
812 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
813 }
814
815 PEG_METHOD_EXIT();
816 }
817
818 void ProviderAgentContainer::_processResponses()
819 {
820 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
821 "ProviderAgentContainer::_processResponses");
822
823 //
824 // Process responses until the pipe is closed
825 //
826 kumpf 1.1 while (1)
827 {
828 try
829 {
830 CIMMessage* message;
831
832 //
833 // Read a response from the Provider Agent
834 //
835 AnonymousPipe::Status readStatus =
836 _pipeFromAgent->readMessage(message);
837
838 // Ignore interrupts
839 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
840 {
841 continue;
842 }
843
844 // Handle an error the same way as a closed connection
845 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
846 (readStatus == AnonymousPipe::STATUS_CLOSED))
847 kumpf 1.1 {
848 AutoMutex lock(_agentMutex);
849 _uninitialize();
850 return;
851 }
852
853 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
854 {
855 // Forward indications to the indication callback
856 _indicationCallback(
857 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
858 message));
859 }
860 else
861 {
862 CIMResponseMessage* response;
863 response = dynamic_cast<CIMResponseMessage*>(message);
864 PEGASUS_ASSERT(response != 0);
865
866 // Give the response to the waiting OutstandingRequestEntry
867 OutstandingRequestEntry* _outstandingRequestEntry = 0;
868 kumpf 1.1 {
869 AutoMutex tableLock(_outstandingRequestTableMutex);
870 Boolean foundEntry = _outstandingRequestTable.lookup(
871 response->messageId, _outstandingRequestEntry);
872 PEGASUS_ASSERT(foundEntry);
873
874 // Remove the completed request from the table
875 Boolean removed =
876 _outstandingRequestTable.remove(response->messageId);
877 PEGASUS_ASSERT(removed);
878 }
879
880 _outstandingRequestEntry->responseMessage = response;
881 _outstandingRequestEntry->responseReady->signal();
882 }
883 }
884 catch (Exception& e)
885 {
886 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
887 String("Ignoring exception: ") + e.getMessage());
888 }
889 kumpf 1.1 catch (...)
890 {
891 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
892 "Ignoring exception");
893 }
894 }
895
896 PEG_METHOD_EXIT();
897 }
898
899 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
900 ProviderAgentContainer::_responseProcessor(void* arg)
901 {
902 ProviderAgentContainer* pa =
903 reinterpret_cast<ProviderAgentContainer*>(arg);
904
905 pa->_processResponses();
906
907 return(PEGASUS_THREAD_RETURN(0));
908 }
909
910 kumpf 1.1 /////////////////////////////////////////////////////////////////////////////
911 // OOPProviderManagerRouter
912 /////////////////////////////////////////////////////////////////////////////
913
914 OOPProviderManagerRouter::OOPProviderManagerRouter(
915 PEGASUS_INDICATION_CALLBACK indicationCallback)
916 {
917 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
918 "OOPProviderManagerRouter::OOPProviderManagerRouter");
919
920 _indicationCallback = indicationCallback;
921
922 PEG_METHOD_EXIT();
923 }
924
925 OOPProviderManagerRouter::~OOPProviderManagerRouter()
926 {
927 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
928 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
929
930 try
931 kumpf 1.1 {
932 // Clean up the ProviderAgentContainers
933 AutoMutex lock(_providerAgentTableMutex);
934 ProviderAgentTable::Iterator i = _providerAgentTable.start();
935 for(; i != 0; i++)
936 {
937 delete i.value();
938 }
939 }
940 catch (...) {}
941
942 PEG_METHOD_EXIT();
943 }
944
945 // Private, unimplemented constructor
946 OOPProviderManagerRouter::OOPProviderManagerRouter()
947 {
948 }
949
950 // Private, unimplemented constructor
951 OOPProviderManagerRouter::OOPProviderManagerRouter(
952 kumpf 1.1 const OOPProviderManagerRouter&)
|
953 david.dillard 1.3 : ProviderManagerRouter(*this)
|
954 kumpf 1.1 {
955 }
956
957 // Private, unimplemented assignment operator
958 OOPProviderManagerRouter& OOPProviderManagerRouter::operator=(
959 const OOPProviderManagerRouter&)
960 {
961 return *this;
962 }
963
964 Message* OOPProviderManagerRouter::processMessage(Message* message)
965 {
966 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
967 "OOPProviderManagerRouter::processMessage");
968
969 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
970 PEGASUS_ASSERT(request != 0);
971
972 AutoPtr<CIMResponseMessage> response;
973
974 //
975 kumpf 1.1 // Get the provider information from the request
976 //
977 CIMInstance providerModule;
978
979 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
980 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
981 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
982 {
983 // Provider information is in the OperationContext
984 ProviderIdContainer pidc = (ProviderIdContainer)
985 request->operationContext.get(ProviderIdContainer::NAME);
986 providerModule = pidc.getModule();
987 }
988 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
989 {
990 CIMEnableModuleRequestMessage* emReq =
991 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
992 providerModule = emReq->providerModule;
993 }
994 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
995 {
996 kumpf 1.1 CIMDisableModuleRequestMessage* dmReq =
997 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
998 providerModule = dmReq->providerModule;
999 }
1000 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
1001 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1002 {
1003 // This operation is not provider-specific
1004 }
1005 else
1006 {
1007 // Unrecognized message type. This should never happen.
1008 PEGASUS_ASSERT(0);
1009 response.reset(request->buildResponse());
1010 response->cimException = PEGASUS_CIM_EXCEPTION(
1011 CIM_ERR_FAILED, "Unrecognized message type.");
1012 PEG_METHOD_EXIT();
1013 return response.release();
1014 }
1015
1016 //
1017 kumpf 1.1 // Process the request message
1018 //
1019 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1020 {
1021 // Forward the CIMStopAllProvidersRequest to all providers
1022 response.reset(_forwardRequestToAllAgents(request));
1023
1024 // Note: Do not uninitialize the ProviderAgentContainers here.
1025 // Just let the selecting thread notice when the agent connections
1026 // are closed.
1027 }
1028 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1029 {
1030 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1031 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1032 PEGASUS_ASSERT(notifyRequest != 0);
1033
1034 if (notifyRequest->currentValueModified)
1035 {
1036 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1037 response.reset(_forwardRequestToAllAgents(request));
1038 kumpf 1.1 }
1039 else
1040 {
1041 // No need to notify provider agents about changes to planned value
1042 response.reset(request->buildResponse());
1043 }
1044 }
1045 else
1046 {
1047 // Retrieve the provider module name
1048 String moduleName;
1049 CIMValue nameValue = providerModule.getProperty(
1050 providerModule.findProperty("Name")).getValue();
1051 nameValue.get(moduleName);
1052
1053 // Look up the Provider Agent for this module
1054 ProviderAgentContainer * pa = _lookupProviderAgent(moduleName);
1055 PEGASUS_ASSERT(pa != 0);
1056
1057 // Determine whether the Provider Agent has been initialized
1058 Boolean paInitialized = pa->isInitialized();
1059 kumpf 1.1
1060 if ((request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE) &&
1061 !paInitialized)
1062 {
1063 //
1064 // Do not start up an agent process just to disable the module
1065 //
1066 response.reset(request->buildResponse());
1067
1068 CIMDisableModuleResponseMessage* dmResponse =
1069 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1070 PEGASUS_ASSERT(dmResponse != 0);
1071
1072 Array<Uint16> operationalStatus;
1073 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1074 dmResponse->operationalStatus = operationalStatus;
1075 }
1076 else if ((request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE) &&
1077 !paInitialized)
1078 {
1079 //
1080 kumpf 1.1 // Do not start up an agent process just to enable the module
1081 //
1082 response.reset(request->buildResponse());
1083
1084 CIMEnableModuleResponseMessage* emResponse =
1085 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1086 PEGASUS_ASSERT(emResponse != 0);
1087
1088 Array<Uint16> operationalStatus;
1089 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1090 emResponse->operationalStatus = operationalStatus;
1091 }
1092 else
1093 {
1094 //
1095 // Forward the request to the provider agent
1096 //
1097 response.reset(pa->processMessage(request));
1098
1099 // Note: Do not uninitialize the ProviderAgentContainer here when
1100 // a disable module operation is successful.) Just let the
1101 kumpf 1.1 // selecting thread notice when the agent connection is closed.
1102 }
1103 }
1104
1105 response->syncAttributes(request);
1106
1107 PEG_METHOD_EXIT();
1108 return response.release();
1109 }
1110
1111 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
1112 const String& moduleName)
1113 {
1114 ProviderAgentContainer* pa = 0;
1115
1116 AutoMutex lock(_providerAgentTableMutex);
1117 if (!_providerAgentTable.lookup(moduleName, pa))
1118 {
1119 pa = new ProviderAgentContainer(moduleName, _indicationCallback);
1120 _providerAgentTable.insert(moduleName, pa);
1121 }
1122 kumpf 1.1 return pa;
1123 }
1124
1125 CIMResponseMessage* OOPProviderManagerRouter::_forwardRequestToAllAgents(
1126 CIMRequestMessage* request)
1127 {
1128 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1129 "OOPProviderManagerRouter::_forwardRequestToAllAgents");
1130
1131 // Get a list of the ProviderAgentContainers. We need our own array copy
1132 // because we cannot hold the _providerAgentTableMutex while calling
1133 // _ProviderAgentContainer::processMessage().
1134 Array<ProviderAgentContainer*> paContainerArray;
1135 {
1136 AutoMutex tableLock(_providerAgentTableMutex);
1137 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1138 i != 0; i++)
1139 {
1140 paContainerArray.append(i.value());
1141 }
1142 }
1143 kumpf 1.1
1144 CIMException responseException;
1145
1146 // Forward the request to each of the initialized provider agents
1147 for (Uint32 j = 0; j < paContainerArray.size(); j++)
1148 {
1149 ProviderAgentContainer* pa = paContainerArray[j];
1150 if (pa->isInitialized())
1151 {
1152 // Note: The ProviderAgentContainer could become uninitialized
1153 // before _ProviderAgentContainer::processMessage() processes
1154 // this request. In this case, the Provider Agent process will
1155 // (unfortunately) be started to process this message.
1156 AutoPtr<CIMResponseMessage> response;
1157 response.reset(pa->processMessage(request));
1158 if (response.get() != 0)
1159 {
1160 // If the operation failed, save the exception data
1161 if ((response->cimException.getCode() != CIM_ERR_SUCCESS) &&
1162 (responseException.getCode() == CIM_ERR_SUCCESS))
1163 {
1164 kumpf 1.1 responseException = response->cimException;
1165 }
1166 }
1167 }
1168 }
1169
1170 CIMResponseMessage* response = request->buildResponse();
1171 response->cimException = responseException;
1172
1173 PEG_METHOD_EXIT();
1174 return response;
1175 }
1176
1177 Boolean OOPProviderManagerRouter::hasActiveProviders()
1178 {
1179 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1180 "OOPProviderManagerRouter::hasActiveProviders");
1181
1182 // Iterate through the _providerAgentTable looking for initialized agents
1183 AutoMutex lock(_providerAgentTableMutex);
1184 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1185 kumpf 1.1 for(; i != 0; i++)
1186 {
1187 if (i.value()->isInitialized())
1188 {
1189 PEG_METHOD_EXIT();
1190 return true;
1191 }
1192 }
1193
1194 // No initialized Provider Agents were found
1195 PEG_METHOD_EXIT();
1196 return false;
1197 }
1198
1199 void OOPProviderManagerRouter::unloadIdleProviders()
1200 {
1201 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1202 "OOPProviderManagerRouter::unloadIdleProviders");
1203
1204 // Iterate through the _providerAgentTable unloading idle providers
1205 AutoMutex lock(_providerAgentTableMutex);
1206 kumpf 1.1 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1207 for(; i != 0; i++)
1208 {
1209 i.value()->unloadIdleProviders();
1210 }
1211
1212 PEG_METHOD_EXIT();
1213 }
1214
1215 PEGASUS_NAMESPACE_END
|