1 a.dunfey 1.22.2.1 //%2006////////////////////////////////////////////////////////////////////////
|
2 kumpf 1.1 //
|
3 karl 1.5 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 kumpf 1.1 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.5 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.7 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 a.dunfey 1.22.2.1 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 kumpf 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.5 //
|
21 kumpf 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
33 // Jenny Yu, Hewlett-Packard Company (jenny_yu@hp.com)
34 //
|
35 gs.keenan 1.8 // Modified By: Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
36 carolann.graves 1.14 // Carol Ann Krug Graves, Hewlett-Packard Company
37 // (carolann_graves@hp.com)
|
38 kumpf 1.1 //
39 //%/////////////////////////////////////////////////////////////////////////////
40
41 #include <Pegasus/Common/Config.h>
42 #include <Pegasus/Common/Constants.h>
43 #include <Pegasus/Common/AutoPtr.h>
44 #include <Pegasus/Common/ArrayInternal.h>
45 #include <Pegasus/Common/CIMMessage.h>
46 #include <Pegasus/Common/CIMMessageSerializer.h>
47 #include <Pegasus/Common/CIMMessageDeserializer.h>
48 #include <Pegasus/Common/OperationContextInternal.h>
49 #include <Pegasus/Common/System.h>
50 #include <Pegasus/Common/AnonymousPipe.h>
51 #include <Pegasus/Common/Tracer.h>
52 #include <Pegasus/Common/Logger.h>
53 #include <Pegasus/Common/Thread.h>
54 #include <Pegasus/Common/MessageQueueService.h>
55 #include <Pegasus/Config/ConfigManager.h>
56
57 #if defined (PEGASUS_OS_TYPE_WINDOWS)
|
58 gs.keenan 1.8 # include <windows.h> // For CreateProcess()
59 #elif defined (PEGASUS_OS_OS400)
60 # include <unistd.cleinc>
61 #elif defined (PEGASUS_OS_VMS)
62 # include <perror.h>
63 # include <climsgdef.h>
64 # include <stdio.h>
65 # include <stdlib.h>
66 # include <string.h>
67 # include <processes.h>
68 # include <unixio.h>
|
69 kumpf 1.1 #else
|
70 gs.keenan 1.8 # include <unistd.h> // For fork(), exec(), and _exit()
71 # include <errno.h>
|
72 kumpf 1.18 # include <sys/types.h>
73 # if defined(PEGASUS_HAS_SIGNALS)
74 # include <sys/wait.h>
75 # endif
|
76 kumpf 1.1 #endif
77
78 #include "OOPProviderManagerRouter.h"
79
80 PEGASUS_USING_STD;
81
82 PEGASUS_NAMESPACE_BEGIN
83
84 /////////////////////////////////////////////////////////////////////////////
85 // OutstandingRequestTable and OutstandingRequestEntry
86 /////////////////////////////////////////////////////////////////////////////
87
88 /**
89 An OutstandingRequestEntry represents a request message sent to a
90 Provider Agent for which no response has been received. The request
91 sender provides the message ID and a location for the response to be
92 returned, and then waits on the semaphore. When a response matching
93 the message ID is received, it is placed into the specified location
94 and the semaphore is signaled.
95 */
96 class OutstandingRequestEntry
97 kumpf 1.1 {
98 public:
99 OutstandingRequestEntry(
100 String messageId_,
|
101 a.dunfey 1.22.2.2 CIMRequestMessage* requestMessage_,
|
102 kumpf 1.1 CIMResponseMessage*& responseMessage_,
103 Semaphore* responseReady_)
104 : messageId(messageId_),
|
105 a.dunfey 1.22.2.2 requestMessage(requestMessage_),
|
106 kumpf 1.1 responseMessage(responseMessage_),
107 responseReady(responseReady_)
108 {
109 }
110
111 String messageId;
|
112 a.dunfey 1.22.2.2 CIMRequestMessage* requestMessage;
|
113 kumpf 1.1 CIMResponseMessage*& responseMessage;
114 Semaphore* responseReady;
115 };
116
117 typedef HashTable<String, OutstandingRequestEntry*, EqualFunc<String>,
118 HashFunc<String> > OutstandingRequestTable;
119
120
121 /////////////////////////////////////////////////////////////////////////////
122 // ProviderAgentContainer
123 /////////////////////////////////////////////////////////////////////////////
124
125 class ProviderAgentContainer
126 {
127 public:
128 ProviderAgentContainer(
129 const String & moduleName,
|
130 kumpf 1.6 const String & userName,
|
131 a.dunfey 1.22.2.2 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
132 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
|
133 carolann.graves 1.14 Boolean subscriptionInitComplete);
|
134 kumpf 1.1
135 ~ProviderAgentContainer();
136
137 Boolean isInitialized();
138
|
139 kumpf 1.6 String getModuleName() const;
140
|
141 kumpf 1.1 CIMResponseMessage* processMessage(CIMRequestMessage* request);
142 void unloadIdleProviders();
143
144 private:
145 //
146 // Private methods
147 //
148
149 /** Unimplemented */
150 ProviderAgentContainer();
151 /** Unimplemented */
152 ProviderAgentContainer(const ProviderAgentContainer& pa);
153 /** Unimplemented */
154 ProviderAgentContainer& operator=(const ProviderAgentContainer& pa);
155
156 /**
157 Start a Provider Agent process and establish a pipe connection with it.
158 Note: The caller must lock the _agentMutex.
159 */
160 void _startAgentProcess();
161
162 kumpf 1.1 /**
163 Send initialization data to the Provider Agent.
164 Note: The caller must lock the _agentMutex.
165 */
166 void _sendInitializationData();
167
168 /**
169 Initialize the ProviderAgentContainer if it is not already
170 initialized. Initialization includes starting the Provider Agent
171 process, establishing a pipe connection with it, and starting a
172 thread to read response messages from the Provider Agent.
173
174 Note: The caller must lock the _agentMutex.
175 */
176 void _initialize();
177
178 /**
179 Uninitialize the ProviderAgentContainer if it is initialized.
180 The connection is closed and outstanding requests are completed
181 with an error result.
182
183 kumpf 1.1 Note: The caller must lock the _agentMutex.
|
184 kumpf 1.22
185 @param cleanShutdown Indicates whether the provider agent process
186 exited cleanly. A value of true indicates that responses have been
187 sent for all requests that have been processed. A value of false
188 indicates that one or more requests may have been partially processed.
189 */
190 void _uninitialize(Boolean cleanShutdown);
191
192 /**
193 Performs the processMessage work, but does not retry on a transient
194 error.
|
195 kumpf 1.1 */
|
196 kumpf 1.22 CIMResponseMessage* _processMessage(CIMRequestMessage* request);
|
197 kumpf 1.1
198 /**
199 Read and process response messages from the Provider Agent until
200 the connection is closed.
201 */
202 void _processResponses();
203 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
204 _responseProcessor(void* arg);
205
206 //
207 // Private data
208 //
209
210 /**
211 The _agentMutex must be locked whenever writing to the Provider
212 Agent connection, accessing the _isInitialized flag, or changing
213 the Provider Agent state.
214 */
215 Mutex _agentMutex;
216
217 /**
218 kumpf 1.1 Name of the provider module served by this Provider Agent.
219 */
220 String _moduleName;
221
222 /**
|
223 kumpf 1.6 The user context in which this Provider Agent operates.
224 */
225 String _userName;
226
227 /**
|
228 kumpf 1.1 Callback function to which all generated indications are sent for
229 processing.
230 */
|
231 a.dunfey 1.22.2.2 PEGASUS_INDICATION_CALLBACK_T _indicationCallback;
232
233 /**
234 Callback function to which response chunks are sent for processing.
235 */
236 PEGASUS_RESPONSE_CHUNK_CALLBACK_T _responseChunkCallback;
|
237 kumpf 1.1
238 /**
239 Indicates whether the Provider Agent is active.
240 */
241 Boolean _isInitialized;
242
243 /**
244 Pipe connection used to read responses from the Provider Agent.
245 */
246 AutoPtr<AnonymousPipe> _pipeFromAgent;
247 /**
248 Pipe connection used to write requests to the Provider Agent.
249 */
250 AutoPtr<AnonymousPipe> _pipeToAgent;
251
|
252 kumpf 1.18 #if defined(PEGASUS_HAS_SIGNALS)
253 /**
254 Process ID of the active Provider Agent.
255 */
256 pid_t _pid;
257 #endif
258
|
259 kumpf 1.1 /**
260 The _outstandingRequestTable holds an entry for each request that has
261 been sent to this Provider Agent for which no response has been
262 received. Entries are added (by the writing thread) when a request
263 is sent, and are removed (by the reading thread) when the response is
264 received (or when it is determined that no response is forthcoming).
265 */
266 OutstandingRequestTable _outstandingRequestTable;
267 /**
268 The _outstandingRequestTableMutex must be locked whenever reading or
269 updating the _outstandingRequestTable.
270 */
271 Mutex _outstandingRequestTableMutex;
|
272 kumpf 1.2
273 /**
274 Holds the last provider module instance sent to the Provider Agent in
275 a ProviderIdContainer. Since the provider module instance rarely
276 changes, an optimization is used to send it only when it differs from
277 the last provider module instance sent.
278 */
279 CIMInstance _providerModuleCache;
|
280 kumpf 1.6
281 /**
282 The number of Provider Agent processes that are currently initialized
283 (active).
284 */
285 static Uint32 _numProviderProcesses;
286
287 /**
288 The _numProviderProcessesMutex must be locked whenever reading or
289 updating the _numProviderProcesses count.
290 */
291 static Mutex _numProviderProcessesMutex;
292
293 /**
294 The maximum number of Provider Agent processes that may be initialized
295 (active) at one time.
296 */
297 static Uint32 _maxProviderProcesses;
|
298 carolann.graves 1.14
299 /**
|
300 kumpf 1.22 A value indicating that a request message has not been processed.
301 A CIMResponseMessage pointer with this value indicates that the
302 corresponding CIMRequestMessage has not been processed. This is
303 used to indicate that a provider agent exited without starting to
304 process the request, and that the request should be retried.
305 */
306 static CIMResponseMessage* _REQUEST_NOT_PROCESSED;
307
308 /**
|
309 carolann.graves 1.14 Indicates whether the Indication Service has completed initialization.
310
311 For more information, please see the description of the
312 ProviderManagerRouter::_subscriptionInitComplete member variable.
313 */
314 Boolean _subscriptionInitComplete;
|
315 kumpf 1.1 };
316
|
317 kumpf 1.6 Uint32 ProviderAgentContainer::_numProviderProcesses = 0;
318 Mutex ProviderAgentContainer::_numProviderProcessesMutex;
319 Uint32 ProviderAgentContainer::_maxProviderProcesses = PEG_NOT_FOUND;
320
|
321 kumpf 1.22 // Set this to a value that no valid CIMResponseMessage* will have.
322 CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED =
323 reinterpret_cast<CIMResponseMessage*>(&_REQUEST_NOT_PROCESSED);
324
|
325 kumpf 1.1 ProviderAgentContainer::ProviderAgentContainer(
326 const String & moduleName,
|
327 kumpf 1.6 const String & userName,
|
328 a.dunfey 1.22.2.2 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
329 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback,
|
330 carolann.graves 1.14 Boolean subscriptionInitComplete)
|
331 kumpf 1.1 : _moduleName(moduleName),
|
332 kumpf 1.6 _userName(userName),
|
333 kumpf 1.1 _indicationCallback(indicationCallback),
|
334 a.dunfey 1.22.2.2 _responseChunkCallback(responseChunkCallback),
|
335 carolann.graves 1.14 _isInitialized(false),
336 _subscriptionInitComplete(subscriptionInitComplete)
|
337 kumpf 1.1 {
338 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
339 "ProviderAgentContainer::ProviderAgentContainer");
340 PEG_METHOD_EXIT();
341 }
342
343 ProviderAgentContainer::~ProviderAgentContainer()
344 {
345 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
346 "ProviderAgentContainer::~ProviderAgentContainer");
347
348 // Ensure the destructor does not throw an exception
349 try
350 {
|
351 kumpf 1.17 if (isInitialized())
352 {
353 // Stop the responseProcessor thread by closing its connection
354 _pipeFromAgent->closeReadHandle();
|
355 kumpf 1.1
|
356 kumpf 1.17 // Wait for the responseProcessor thread to exit
357 while (isInitialized())
358 {
359 pegasus_yield();
360 }
|
361 kumpf 1.1 }
362 }
363 catch (...)
364 {
365 }
366
367 PEG_METHOD_EXIT();
368 }
369
370 void ProviderAgentContainer::_startAgentProcess()
371 {
372 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
373 "ProviderAgentContainer::_startAgentProcess");
374
|
375 kumpf 1.21 //
376 // Serialize the starting of agent processes. If two agent processes are
377 // started at the same time, they may get copies of each other's pipe
378 // descriptors. If this happens, the cimserver will not get a pipe read
379 // error when one of the agent processes exits, because the pipe will
380 // still be writable by the other process. This locking control needs to
381 // cover the period from where the pipes are created to where the agent
382 // ends of the pipes are closed by the cimserver.
383 //
384 static Mutex agentStartupMutex;
385 AutoMutex lock(agentStartupMutex);
386
|
387 kumpf 1.1 AutoPtr<AnonymousPipe> pipeFromAgent(new AnonymousPipe());
388 AutoPtr<AnonymousPipe> pipeToAgent(new AnonymousPipe());
389
390 //
391 // Start a cimprovagt process for this provider module
392 //
393
394 #if defined (PEGASUS_OS_TYPE_WINDOWS)
395 //
396 // Set up members of the PROCESS_INFORMATION structure
397 //
398 PROCESS_INFORMATION piProcInfo;
399 ZeroMemory (&piProcInfo, sizeof (PROCESS_INFORMATION));
400
401 //
402 // Set up members of the STARTUPINFO structure
403 //
404 STARTUPINFO siStartInfo;
405 ZeroMemory (&siStartInfo, sizeof (STARTUPINFO));
406 siStartInfo.cb = sizeof (STARTUPINFO);
407
408 kumpf 1.1 //
409 // Generate the command line
410 //
411 char cmdLine[2048];
412 char readHandle[32];
413 char writeHandle[32];
414 pipeToAgent->exportReadHandle(readHandle);
415 pipeFromAgent->exportWriteHandle(writeHandle);
416
417 sprintf(cmdLine, "\"%s\" %s %s \"%s\"",
418 (const char*)ConfigManager::getHomedPath(
419 PEGASUS_PROVIDER_AGENT_PROC_NAME).getCString(),
420 readHandle, writeHandle, (const char*)_moduleName.getCString());
421
422 //
423 // Create the child process
424 //
425 if (!CreateProcess (
426 NULL, //
427 cmdLine, // command line
428 NULL, // process security attributes
429 kumpf 1.1 NULL, // primary thread security attributes
430 TRUE, // handles are inherited
431 0, // creation flags
432 NULL, // use parent's environment
433 NULL, // use parent's current directory
434 &siStartInfo, // STARTUPINFO
435 &piProcInfo)) // PROCESS_INFORMATION
436 {
437 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
438 "CreateProcess() failed. errno = %d.", GetLastError());
439 PEG_METHOD_EXIT();
440 throw Exception(MessageLoaderParms(
441 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
442 "Failed to start cimprovagt \"$0\".",
443 _moduleName));
444 }
445
446 CloseHandle(piProcInfo.hProcess);
447 CloseHandle(piProcInfo.hThread);
|
448 gs.keenan 1.8
449 #elif defined (PEGASUS_OS_VMS)
450
|
451 gs.keenan 1.10 //
452 // fork and exec the child process
453 //
454 int status;
|
455 gs.keenan 1.8
|
456 gs.keenan 1.10 status = vfork ();
457 switch (status)
458 {
459 case 0:
460 try
|
461 gs.keenan 1.8 {
|
462 gs.keenan 1.10 //
463 // Execute the cimprovagt program
464 //
465 String agentCommandPath =
466 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
467 CString agentCommandPathCString = agentCommandPath.getCString();
468
469 char readHandle[32];
470 char writeHandle[32];
471 pipeToAgent->exportReadHandle(readHandle);
472 pipeFromAgent->exportWriteHandle(writeHandle);
473
474 if ((status = execl(agentCommandPathCString, agentCommandPathCString,
475 readHandle, writeHandle,
476 (const char*)_moduleName.getCString(), (char*)0)) == -1);
477 {
478 // If we're still here, there was an error
479 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
480 "execl() failed. errno = %d.", errno);
481 _exit(1);
482 }
483 gs.keenan 1.10 }
484 catch (...)
485 {
486 // There's not much we can do here in no man's land
487 try
488 {
489 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
490 "Caught exception before calling execl().");
491 }
492 catch (...)
493 {
494 }
495 _exit(1);
496 }
497 PEG_METHOD_EXIT();
498 return;
499 break;
500
501 case -1:
502 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
503 "fork() failed. errno = %d.", errno);
504 gs.keenan 1.10 PEG_METHOD_EXIT();
505 throw Exception(MessageLoaderParms(
506 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
507 "Failed to start cimprovagt \"$0\".",
508 _moduleName));
509 break;
510
511 default:
512 // Close our copies of the agent's ends of the pipes
513 pipeToAgent->closeReadHandle();
514 pipeFromAgent->closeWriteHandle();
|
515 gs.keenan 1.8
|
516 gs.keenan 1.10 _pipeToAgent.reset(pipeToAgent.release());
517 _pipeFromAgent.reset(pipeFromAgent.release());
|
518 gs.keenan 1.8
|
519 gs.keenan 1.10 PEG_METHOD_EXIT();
520 }
|
521 chuck 1.16 #elif defined (PEGASUS_OS_OS400)
522
523 //Out of provider support for OS400 goes here when needed.
524
|
525 kumpf 1.1 #else
526 pid_t pid = fork();
527 if (pid < 0)
528 {
529 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
530 "fork() failed. errno = %d.", errno);
531 PEG_METHOD_EXIT();
532 throw Exception(MessageLoaderParms(
533 "ProviderManager.OOPProviderManagerRouter.CIMPROVAGT_START_FAILED",
534 "Failed to start cimprovagt \"$0\".",
535 _moduleName));
536 }
537 else if (pid == 0)
538 {
539 //
540 // Child side of the fork
541 //
542
543 try
544 {
545 // Close our copies of the parent's ends of the pipes
546 kumpf 1.1 pipeToAgent->closeWriteHandle();
547 pipeFromAgent->closeReadHandle();
548
549 //
550 // Execute the cimprovagt program
551 //
552 String agentCommandPath =
553 ConfigManager::getHomedPath(PEGASUS_PROVIDER_AGENT_PROC_NAME);
554 CString agentCommandPathCString = agentCommandPath.getCString();
555
556 char readHandle[32];
557 char writeHandle[32];
558 pipeToAgent->exportReadHandle(readHandle);
559 pipeFromAgent->exportWriteHandle(writeHandle);
560
|
561 kumpf 1.13 #ifndef PEGASUS_DISABLE_PROV_USERCTXT
|
562 kumpf 1.6 // Set the user context of the Provider Agent process
563 if (_userName != System::getEffectiveUserName())
564 {
565 if (!System::changeUserContext(_userName.getCString()))
566 {
567 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
568 "System::changeUserContext() failed. userName = %s.",
|
569 kumpf 1.9 (const char*)_userName.getCString());
|
570 kumpf 1.6 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
571 Logger::WARNING,
572 "ProviderManager.OOPProviderManagerRouter."
573 "USER_CONTEXT_CHANGE_FAILED",
574 "Unable to change user context to \"$0\".", _userName);
575 _exit(1);
576 }
577 }
|
578 kumpf 1.13 #endif
|
579 kumpf 1.6
|
580 kumpf 1.1 execl(agentCommandPathCString, agentCommandPathCString,
581 readHandle, writeHandle,
582 (const char*)_moduleName.getCString(), (char*)0);
583
584 // If we're still here, there was an error
585 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
586 "execl() failed. errno = %d.", errno);
587 _exit(1);
588 }
589 catch (...)
590 {
591 // There's not much we can do here in no man's land
592 try
593 {
594 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
595 "Caught exception before calling execl().");
596 }
597 catch (...) {}
598 _exit(1);
599 }
600 }
|
601 kumpf 1.18 # if defined(PEGASUS_HAS_SIGNALS)
602 _pid = pid;
603 # endif
|
604 kumpf 1.1 #endif
605
606 //
607 // CIM Server process
608 //
609
610 // Close our copies of the agent's ends of the pipes
611 pipeToAgent->closeReadHandle();
612 pipeFromAgent->closeWriteHandle();
613
614 _pipeToAgent.reset(pipeToAgent.release());
615 _pipeFromAgent.reset(pipeFromAgent.release());
616
|
617 gs.keenan 1.10 PEG_METHOD_EXIT();
|
618 kumpf 1.1 }
619
620 // Note: Caller must lock _agentMutex
621 void ProviderAgentContainer::_sendInitializationData()
622 {
623 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
624 "ProviderAgentContainer::_sendInitializationData");
625
626 //
627 // Gather config properties to pass to the Provider Agent
628 //
629 ConfigManager* configManager = ConfigManager::getInstance();
630 Array<Pair<String, String> > configProperties;
631
632 Array<String> configPropertyNames;
633 configManager->getAllPropertyNames(configPropertyNames, true);
634 for (Uint32 i = 0; i < configPropertyNames.size(); i++)
635 {
636 String configPropertyValue =
637 configManager->getCurrentValue(configPropertyNames[i]);
638 String configPropertyDefaultValue =
639 kumpf 1.1 configManager->getDefaultValue(configPropertyNames[i]);
640 if (configPropertyValue != configPropertyDefaultValue)
641 {
642 configProperties.append(Pair<String, String>(
643 configPropertyNames[i], configPropertyValue));
644 }
645 }
646
647 //
648 // Create a Provider Agent initialization message
649 //
650 AutoPtr<CIMInitializeProviderAgentRequestMessage> request(
651 new CIMInitializeProviderAgentRequestMessage(
652 String("0"), // messageId
653 configManager->getPegasusHome(),
654 configProperties,
655 System::bindVerbose,
|
656 carolann.graves 1.14 _subscriptionInitComplete,
|
657 kumpf 1.1 QueueIdStack()));
658
659 //
660 // Write the initialization message to the pipe
661 //
662 AnonymousPipe::Status writeStatus =
663 _pipeToAgent->writeMessage(request.get());
664
665 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
666 {
667 PEG_METHOD_EXIT();
668 throw Exception(MessageLoaderParms(
669 "ProviderManager.OOPProviderManagerRouter."
670 "CIMPROVAGT_COMMUNICATION_FAILED",
671 "Failed to communicate with cimprovagt \"$0\".",
672 _moduleName));
673 }
674
|
675 a.dunfey 1.22.2.2 // Wait for a null response from the Provider Agent indicating it has
676 // initialized successfully.
677
678 CIMMessage* message;
679 AnonymousPipe::Status readStatus;
680 do
681 {
682 readStatus = _pipeFromAgent->readMessage(message);
683 } while (readStatus == AnonymousPipe::STATUS_INTERRUPT);
684
685 if (readStatus != AnonymousPipe::STATUS_SUCCESS)
686 {
687 PEG_METHOD_EXIT();
688 throw Exception(MessageLoaderParms(
689 "ProviderManager.OOPProviderManagerRouter."
690 "CIMPROVAGT_COMMUNICATION_FAILED",
691 "Failed to communicate with cimprovagt \"$0\".",
692 _moduleName));
693 }
694
695 PEGASUS_ASSERT(message == 0);
|
696 kumpf 1.1
697 PEG_METHOD_EXIT();
698 }
699
700 // Note: Caller must lock _agentMutex
701 void ProviderAgentContainer::_initialize()
702 {
703 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
704 "ProviderAgentContainer::_initialize");
705
706 if (_isInitialized)
707 {
708 PEGASUS_ASSERT(0);
709 PEG_METHOD_EXIT();
710 return;
711 }
712
|
713 kumpf 1.6 if (_maxProviderProcesses == PEG_NOT_FOUND)
714 {
|
715 a.dunfey 1.22.2.2 String maxProviderProcesses = ConfigManager::getInstance()->
716 getCurrentValue("maxProviderProcesses");
|
717 kumpf 1.6 CString maxProviderProcessesString = maxProviderProcesses.getCString();
718 char* end = 0;
719 _maxProviderProcesses = strtol(maxProviderProcessesString, &end, 10);
720 }
721
722 {
723 AutoMutex lock(_numProviderProcessesMutex);
724 if ((_maxProviderProcesses != 0) &&
725 (_numProviderProcesses >= _maxProviderProcesses))
726 {
727 throw PEGASUS_CIM_EXCEPTION(
728 CIM_ERR_FAILED,
729 MessageLoaderParms(
730 "ProviderManager.OOPProviderManagerRouter."
731 "MAX_PROVIDER_PROCESSES_REACHED",
732 "The maximum number of cimprovagt processes has been "
733 "reached."));
734 }
735 else
736 {
737 _numProviderProcesses++;
738 kumpf 1.6 }
739 }
740
|
741 kumpf 1.1 try
742 {
743 _startAgentProcess();
744
|
745 kumpf 1.18 _isInitialized = true;
746
|
747 kumpf 1.1 _sendInitializationData();
748
749 // Start a thread to read and process responses from the Provider Agent
|
750 kumpf 1.20 ThreadStatus rtn = PEGASUS_THREAD_OK;
751 while ((rtn = MessageQueueService::get_thread_pool()->
752 allocate_and_awaken(this, _responseProcessor)) !=
753 PEGASUS_THREAD_OK)
754 {
755 if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
756 {
757 pegasus_yield();
758 }
759 else
760 {
761 Logger::put(
762 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
763 "Not enough threads to process responses from the "
764 "provider agent.");
|
765 konrad.r 1.19
|
766 kumpf 1.20 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
767 "Could not allocate thread to process responses from the "
768 "provider agent.");
769
770 throw Exception(MessageLoaderParms(
771 "ProviderManager.OOPProviderManagerRouter."
772 "CIMPROVAGT_THREAD_ALLOCATION_FAILED",
773 "Failed to allocate thread for cimprovagt \"$0\".",
774 _moduleName));
775 }
776 }
|
777 kumpf 1.1 }
778 catch (...)
779 {
|
780 kumpf 1.20 // Closing the connection causes the agent process to exit
781 _pipeToAgent.reset();
782 _pipeFromAgent.reset();
783
|
784 kumpf 1.18 #if defined(PEGASUS_HAS_SIGNALS)
785 if (_isInitialized)
786 {
787 // Harvest the status of the agent process to prevent a zombie
788 Boolean keepWaiting = false;
789 do
790 {
791 pid_t status = waitpid(_pid, 0, 0);
792 if (status == -1)
793 {
794 if (errno == EINTR)
795 {
796 keepWaiting = true;
797 }
798 else
799 {
800 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
801 "ProviderAgentContainer::_initialize(): "
802 "waitpid failed; errno = %d.", errno);
803 }
804 }
805 kumpf 1.18 } while (keepWaiting);
806 }
807 #endif
808
|
809 kumpf 1.1 _isInitialized = false;
|
810 kumpf 1.6
811 {
812 AutoMutex lock(_numProviderProcessesMutex);
813 _numProviderProcesses--;
814 }
815
|
816 kumpf 1.1 PEG_METHOD_EXIT();
817 throw;
818 }
819
820 PEG_METHOD_EXIT();
821 }
822
823 Boolean ProviderAgentContainer::isInitialized()
824 {
825 AutoMutex lock(_agentMutex);
826 return _isInitialized;
827 }
828
829 // Note: Caller must lock _agentMutex
|
830 kumpf 1.22 void ProviderAgentContainer::_uninitialize(Boolean cleanShutdown)
|
831 kumpf 1.1 {
832 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
833 "ProviderAgentContainer::_uninitialize");
834
835 if (!_isInitialized)
836 {
837 PEGASUS_ASSERT(0);
838 PEG_METHOD_EXIT();
839 return;
840 }
841
842 try
843 {
844 // Close the connection with the Provider Agent
845 _pipeFromAgent.reset();
846 _pipeToAgent.reset();
847
|
848 kumpf 1.2 _providerModuleCache = CIMInstance();
849
|
850 kumpf 1.6 {
851 AutoMutex lock(_numProviderProcessesMutex);
852 _numProviderProcesses--;
853 }
854
|
855 kumpf 1.18 #if defined(PEGASUS_HAS_SIGNALS)
856 // Harvest the status of the agent process to prevent a zombie
857 Boolean keepWaiting = false;
858 do
859 {
860 pid_t status = waitpid(_pid, 0, 0);
861 if (status == -1)
862 {
863 if (errno == EINTR)
864 {
865 keepWaiting = true;
866 }
867 else
868 {
869 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
870 "ProviderAgentContainer::_uninitialize(): "
871 "waitpid failed; errno = %d.", errno);
872 }
873 }
874 } while (keepWaiting);
875 #endif
876 kumpf 1.18
|
877 kumpf 1.1 _isInitialized = false;
878
879 //
880 // Complete with null responses all outstanding requests on this
881 // connection
882 //
883 {
884 AutoMutex tableLock(_outstandingRequestTableMutex);
885
|
886 kumpf 1.22 CIMResponseMessage* response =
887 cleanShutdown ? _REQUEST_NOT_PROCESSED : 0;
888
|
889 kumpf 1.1 for (OutstandingRequestTable::Iterator i =
890 _outstandingRequestTable.start();
891 i != 0; i++)
892 {
893 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
894 String("Completing messageId \"") + i.value()->messageId +
895 "\" with a null response.");
|
896 kumpf 1.22 i.value()->responseMessage = response;
|
897 kumpf 1.1 i.value()->responseReady->signal();
898 }
899
900 _outstandingRequestTable.clear();
901 }
902 }
903 catch (...)
904 {
905 // We're uninitializing, so do not propagate the exception
906 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
907 "Ignoring _uninitialize() exception.");
908 }
909
910 PEG_METHOD_EXIT();
911 }
912
|
913 kumpf 1.6 String ProviderAgentContainer::getModuleName() const
914 {
915 return _moduleName;
916 }
917
|
918 kumpf 1.1 CIMResponseMessage* ProviderAgentContainer::processMessage(
919 CIMRequestMessage* request)
920 {
921 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
922 "ProviderAgentContainer::processMessage");
923
924 CIMResponseMessage* response;
|
925 kumpf 1.22
926 do
927 {
928 response = _processMessage(request);
|
929 a.dunfey 1.22.2.2
930 if (response == _REQUEST_NOT_PROCESSED)
931 {
932 // Check for request message types that should not be retried.
933 if ((request->getType() ==
934 CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
935 (request->getType() ==
936 CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) ||
937 (request->getType() ==
938 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
939 (request->getType() ==
940 CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE))
941 {
942 response = request->buildResponse();
943 break;
944 }
945 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
946 {
947 CIMDisableModuleResponseMessage* dmResponse =
948 dynamic_cast<CIMDisableModuleResponseMessage*>(response);
949 PEGASUS_ASSERT(dmResponse != 0);
950 a.dunfey 1.22.2.2
951 Array<Uint16> operationalStatus;
952 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
953 dmResponse->operationalStatus = operationalStatus;
954 break;
955 }
956 }
|
957 kumpf 1.22 } while (response == _REQUEST_NOT_PROCESSED);
958
959 PEG_METHOD_EXIT();
960 return response;
961 }
962
963 CIMResponseMessage* ProviderAgentContainer::_processMessage(
964 CIMRequestMessage* request)
965 {
966 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
967 "ProviderAgentContainer::_processMessage");
968
969 CIMResponseMessage* response;
|
970 kumpf 1.1 String originalMessageId = request->messageId;
971
|
972 kumpf 1.2 // These three variables are used for the provider module optimization.
973 // See the _providerModuleCache member description for more information.
974 AutoPtr<ProviderIdContainer> origProviderId;
975 Boolean doProviderModuleOptimization = false;
976 Boolean updateProviderModuleCache = false;
977
|
978 kumpf 1.1 try
979 {
980 // The messageId attribute is used to correlate response messages
981 // from the Provider Agent with request messages, so it is imperative
982 // that the ID is unique for each request. The incoming ID cannot be
983 // trusted to be unique, so we substitute a unique one. The memory
984 // address of the request is used as the source of a unique piece of
985 // data. (The message ID is only required to be unique while the
986 // request is outstanding.)
987 char messagePtrString[20];
988 sprintf(messagePtrString, "%p", request);
989 String uniqueMessageId = messagePtrString;
990
991 //
992 // Set up the OutstandingRequestEntry for this request
993 //
994 Semaphore waitSemaphore(0);
995 OutstandingRequestEntry outstandingRequestEntry(
|
996 a.dunfey 1.22.2.2 uniqueMessageId, request, response, &waitSemaphore);
|
997 kumpf 1.1
998 //
999 // Lock the Provider Agent Container while initializing the
1000 // agent and writing the request to the connection
1001 //
1002 {
1003 AutoMutex lock(_agentMutex);
1004
1005 //
1006 // Initialize the Provider Agent, if necessary
1007 //
1008 if (!_isInitialized)
1009 {
1010 _initialize();
1011 }
1012
1013 //
1014 // Add an entry to the OutstandingRequestTable for this request
1015 //
1016 {
1017 AutoMutex tableLock(_outstandingRequestTableMutex);
1018 kumpf 1.1
1019 _outstandingRequestTable.insert(
1020 uniqueMessageId, &outstandingRequestEntry);
1021 }
1022
|
1023 kumpf 1.2 // Get the provider module from the ProviderIdContainer to see if
1024 // we can optimize out the transmission of this instance to the
1025 // Provider Agent. (See the _providerModuleCache description.)
1026 try
1027 {
1028 ProviderIdContainer pidc = request->operationContext.get(
1029 ProviderIdContainer::NAME);
1030 origProviderId.reset(new ProviderIdContainer(
1031 pidc.getModule(), pidc.getProvider(),
1032 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1033 if (_providerModuleCache.isUninitialized() ||
1034 (!pidc.getModule().identical(_providerModuleCache)))
1035 {
1036 // We haven't sent this provider module instance to the
1037 // Provider Agent yet. Update our cache after we send it.
1038 updateProviderModuleCache = true;
1039 }
1040 else
1041 {
1042 // Replace the provider module in the ProviderIdContainer
1043 // with an uninitialized instance. We'll need to put the
1044 kumpf 1.2 // original one back after the message is sent.
1045 request->operationContext.set(ProviderIdContainer(
1046 CIMInstance(), pidc.getProvider(),
1047 pidc.isRemoteNameSpace(), pidc.getRemoteInfo()));
1048 doProviderModuleOptimization = true;
1049 }
1050 }
1051 catch (...)
1052 {
1053 // No ProviderIdContainer to optimize
1054 }
1055
|
1056 kumpf 1.1 //
1057 // Write the message to the pipe
1058 //
1059 try
1060 {
1061 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3,
1062 String("Sending request to agent with messageId ") +
1063 uniqueMessageId);
1064
1065 request->messageId = uniqueMessageId;
1066 AnonymousPipe::Status writeStatus =
1067 _pipeToAgent->writeMessage(request);
1068 request->messageId = originalMessageId;
1069
|
1070 kumpf 1.2 if (doProviderModuleOptimization)
1071 {
1072 request->operationContext.set(*origProviderId.get());
1073 }
1074
|
1075 kumpf 1.1 if (writeStatus != AnonymousPipe::STATUS_SUCCESS)
1076 {
1077 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1078 "Failed to write message to pipe. writeStatus = %d.",
1079 writeStatus);
|
1080 a.dunfey 1.22.2.2
1081 request->messageId = originalMessageId;
1082
1083 if (doProviderModuleOptimization)
1084 {
1085 request->operationContext.set(*origProviderId.get());
1086 }
1087
1088 // Remove this OutstandingRequestTable entry
1089 {
1090 AutoMutex tableLock(_outstandingRequestTableMutex);
1091 Boolean removed =
1092 _outstandingRequestTable.remove(uniqueMessageId);
1093 PEGASUS_ASSERT(removed);
1094 }
1095
1096 // A response value of _REQUEST_NOT_PROCESSED indicates
1097 // that the request was not processed by the provider
1098 // agent, so it can be retried safely.
1099 PEG_METHOD_EXIT();
1100 return _REQUEST_NOT_PROCESSED;
|
1101 kumpf 1.1 }
|
1102 kumpf 1.2
1103 if (updateProviderModuleCache)
1104 {
1105 _providerModuleCache = origProviderId->getModule();
1106 }
|
1107 kumpf 1.1 }
1108 catch (...)
1109 {
1110 request->messageId = originalMessageId;
|
1111 kumpf 1.2
1112 if (doProviderModuleOptimization)
1113 {
1114 request->operationContext.set(*origProviderId.get());
1115 }
1116
|
1117 kumpf 1.1 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1118 "Failed to write message to pipe.");
1119 // Remove the OutstandingRequestTable entry for this request
1120 {
1121 AutoMutex tableLock(_outstandingRequestTableMutex);
1122 Boolean removed =
1123 _outstandingRequestTable.remove(uniqueMessageId);
1124 PEGASUS_ASSERT(removed);
1125 }
|
1126 a.dunfey 1.22.2.2 PEG_METHOD_EXIT();
|
1127 kumpf 1.1 throw;
1128 }
1129 }
1130
1131 //
1132 // Wait for the response
1133 //
1134 try
1135 {
1136 // Must not hold _agentMutex while waiting for the response
1137 waitSemaphore.wait();
1138 }
1139 catch (...)
1140 {
1141 // Remove the OutstandingRequestTable entry for this request
1142 {
1143 AutoMutex tableLock(_outstandingRequestTableMutex);
1144 Boolean removed =
1145 _outstandingRequestTable.remove(uniqueMessageId);
1146 PEGASUS_ASSERT(removed);
1147 }
|
1148 a.dunfey 1.22.2.2 PEG_METHOD_EXIT();
|
1149 kumpf 1.1 throw;
1150 }
1151
|
1152 kumpf 1.22 // A response value of _REQUEST_NOT_PROCESSED indicates that the
1153 // provider agent process was terminating when the request was sent.
1154 // The request was not processed by the provider agent, so it can be
1155 // retried safely.
1156 if (response == _REQUEST_NOT_PROCESSED)
1157 {
|
1158 a.dunfey 1.22.2.2 PEG_METHOD_EXIT();
|
1159 kumpf 1.22 return response;
1160 }
1161
|
1162 kumpf 1.1 // A null response is returned when an agent connection is closed
1163 // while requests remain outstanding.
1164 if (response == 0)
1165 {
1166 response = request->buildResponse();
1167 response->cimException = PEGASUS_CIM_EXCEPTION(
1168 CIM_ERR_FAILED,
1169 MessageLoaderParms(
1170 "ProviderManager.OOPProviderManagerRouter."
1171 "CIMPROVAGT_CONNECTION_LOST",
1172 "Lost connection with cimprovagt \"$0\".",
1173 _moduleName));
1174 }
1175 }
1176 catch (CIMException& e)
1177 {
1178 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1179 String("Caught exception: ") + e.getMessage());
1180 response = request->buildResponse();
1181 response->cimException = e;
1182 }
1183 kumpf 1.1 catch (Exception& e)
1184 {
1185 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1186 String("Caught exception: ") + e.getMessage());
1187 response = request->buildResponse();
1188 response->cimException = PEGASUS_CIM_EXCEPTION(
1189 CIM_ERR_FAILED, e.getMessage());
1190 }
1191 catch (...)
1192 {
1193 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
1194 "Caught unknown exception");
1195 response = request->buildResponse();
1196 response->cimException = PEGASUS_CIM_EXCEPTION(
1197 CIM_ERR_FAILED, String::EMPTY);
1198 }
1199
1200 response->messageId = originalMessageId;
1201
1202 PEG_METHOD_EXIT();
1203 return response;
1204 kumpf 1.1 }
1205
1206 void ProviderAgentContainer::unloadIdleProviders()
1207 {
1208 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
|
1209 carolann.graves 1.4 "ProviderAgentContainer::unloadIdleProviders");
|
1210 kumpf 1.1
1211 AutoMutex lock(_agentMutex);
1212 if (_isInitialized)
1213 {
1214 // Send a "wake up" message to the Provider Agent.
1215 // Don't bother checking whether the operation is successful.
1216 Uint32 messageLength = 0;
1217 _pipeToAgent->writeBuffer((const char*)&messageLength, sizeof(Uint32));
1218 }
1219
1220 PEG_METHOD_EXIT();
1221 }
1222
1223 void ProviderAgentContainer::_processResponses()
1224 {
1225 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1226 "ProviderAgentContainer::_processResponses");
1227
1228 //
1229 // Process responses until the pipe is closed
1230 //
1231 kumpf 1.1 while (1)
1232 {
1233 try
1234 {
1235 CIMMessage* message;
1236
1237 //
1238 // Read a response from the Provider Agent
1239 //
1240 AnonymousPipe::Status readStatus =
1241 _pipeFromAgent->readMessage(message);
1242
1243 // Ignore interrupts
1244 if (readStatus == AnonymousPipe::STATUS_INTERRUPT)
1245 {
1246 continue;
1247 }
1248
1249 // Handle an error the same way as a closed connection
1250 if ((readStatus == AnonymousPipe::STATUS_ERROR) ||
1251 (readStatus == AnonymousPipe::STATUS_CLOSED))
1252 kumpf 1.1 {
1253 AutoMutex lock(_agentMutex);
|
1254 kumpf 1.22 _uninitialize(false);
1255 return;
1256 }
1257
1258 // A null message indicates that the provider agent process has
1259 // finished its processing and is ready to exit.
1260 if (message == 0)
1261 {
1262 AutoMutex lock(_agentMutex);
1263 _uninitialize(true);
|
1264 kumpf 1.1 return;
1265 }
1266
1267 if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE)
1268 {
1269 // Forward indications to the indication callback
1270 _indicationCallback(
1271 reinterpret_cast<CIMProcessIndicationRequestMessage*>(
1272 message));
1273 }
|
1274 a.dunfey 1.22.2.2 else if (!message->isComplete())
1275 {
1276 CIMResponseMessage* response;
1277 response = dynamic_cast<CIMResponseMessage*>(message);
1278 PEGASUS_ASSERT(response != 0);
1279
1280 // Get the OutstandingRequestEntry for this response chunk
1281 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1282 {
1283 AutoMutex tableLock(_outstandingRequestTableMutex);
1284 Boolean foundEntry = _outstandingRequestTable.lookup(
1285 response->messageId, _outstandingRequestEntry);
1286 PEGASUS_ASSERT(foundEntry);
1287 }
1288
1289 // Put the original message ID into the response
1290 response->messageId =
1291 _outstandingRequestEntry->requestMessage->messageId;
1292
1293 // Call the response chunk callback to process the chunk
1294 _responseChunkCallback(
1295 a.dunfey 1.22.2.2 _outstandingRequestEntry->requestMessage, response);
1296 }
|
1297 kumpf 1.1 else
1298 {
1299 CIMResponseMessage* response;
1300 response = dynamic_cast<CIMResponseMessage*>(message);
1301 PEGASUS_ASSERT(response != 0);
1302
1303 // Give the response to the waiting OutstandingRequestEntry
1304 OutstandingRequestEntry* _outstandingRequestEntry = 0;
1305 {
1306 AutoMutex tableLock(_outstandingRequestTableMutex);
1307 Boolean foundEntry = _outstandingRequestTable.lookup(
1308 response->messageId, _outstandingRequestEntry);
1309 PEGASUS_ASSERT(foundEntry);
1310
1311 // Remove the completed request from the table
1312 Boolean removed =
1313 _outstandingRequestTable.remove(response->messageId);
1314 PEGASUS_ASSERT(removed);
1315 }
1316
1317 _outstandingRequestEntry->responseMessage = response;
1318 kumpf 1.1 _outstandingRequestEntry->responseReady->signal();
1319 }
1320 }
1321 catch (Exception& e)
1322 {
1323 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1324 String("Ignoring exception: ") + e.getMessage());
1325 }
1326 catch (...)
1327 {
1328 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1329 "Ignoring exception");
1330 }
1331 }
1332
1333 }
1334
1335 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
1336 ProviderAgentContainer::_responseProcessor(void* arg)
1337 {
1338 ProviderAgentContainer* pa =
1339 kumpf 1.1 reinterpret_cast<ProviderAgentContainer*>(arg);
1340
1341 pa->_processResponses();
1342
1343 return(PEGASUS_THREAD_RETURN(0));
1344 }
1345
1346 /////////////////////////////////////////////////////////////////////////////
1347 // OOPProviderManagerRouter
1348 /////////////////////////////////////////////////////////////////////////////
1349
1350 OOPProviderManagerRouter::OOPProviderManagerRouter(
|
1351 a.dunfey 1.22.2.2 PEGASUS_INDICATION_CALLBACK_T indicationCallback,
1352 PEGASUS_RESPONSE_CHUNK_CALLBACK_T responseChunkCallback)
|
1353 kumpf 1.1 {
1354 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1355 "OOPProviderManagerRouter::OOPProviderManagerRouter");
1356
1357 _indicationCallback = indicationCallback;
|
1358 a.dunfey 1.22.2.2 _responseChunkCallback = responseChunkCallback;
|
1359 carolann.graves 1.14 _subscriptionInitComplete = false;
|
1360 kumpf 1.1
1361 PEG_METHOD_EXIT();
1362 }
1363
1364 OOPProviderManagerRouter::~OOPProviderManagerRouter()
1365 {
1366 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1367 "OOPProviderManagerRouter::~OOPProviderManagerRouter");
1368
1369 try
1370 {
1371 // Clean up the ProviderAgentContainers
1372 AutoMutex lock(_providerAgentTableMutex);
1373 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1374 for(; i != 0; i++)
1375 {
1376 delete i.value();
1377 }
1378 }
1379 catch (...) {}
1380
1381 kumpf 1.1 PEG_METHOD_EXIT();
1382 }
1383
1384 Message* OOPProviderManagerRouter::processMessage(Message* message)
1385 {
1386 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1387 "OOPProviderManagerRouter::processMessage");
1388
1389 CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message);
1390 PEGASUS_ASSERT(request != 0);
1391
1392 AutoPtr<CIMResponseMessage> response;
1393
1394 //
1395 // Get the provider information from the request
1396 //
1397 CIMInstance providerModule;
1398
1399 if ((dynamic_cast<CIMOperationRequestMessage*>(request) != 0) ||
1400 (dynamic_cast<CIMIndicationRequestMessage*>(request) != 0) ||
1401 (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE))
1402 kumpf 1.1 {
1403 // Provider information is in the OperationContext
1404 ProviderIdContainer pidc = (ProviderIdContainer)
1405 request->operationContext.get(ProviderIdContainer::NAME);
1406 providerModule = pidc.getModule();
1407 }
1408 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1409 {
1410 CIMEnableModuleRequestMessage* emReq =
1411 dynamic_cast<CIMEnableModuleRequestMessage*>(request);
1412 providerModule = emReq->providerModule;
1413 }
1414 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
1415 {
1416 CIMDisableModuleRequestMessage* dmReq =
1417 dynamic_cast<CIMDisableModuleRequestMessage*>(request);
1418 providerModule = dmReq->providerModule;
1419 }
1420 else if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) ||
|
1421 carolann.graves 1.14 (request->getType() ==
1422 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) ||
|
1423 kumpf 1.1 (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE))
1424 {
1425 // This operation is not provider-specific
1426 }
1427 else
1428 {
1429 // Unrecognized message type. This should never happen.
1430 PEGASUS_ASSERT(0);
1431 response.reset(request->buildResponse());
1432 response->cimException = PEGASUS_CIM_EXCEPTION(
1433 CIM_ERR_FAILED, "Unrecognized message type.");
1434 PEG_METHOD_EXIT();
1435 return response.release();
1436 }
1437
1438 //
1439 // Process the request message
1440 //
1441 if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE)
1442 {
1443 // Forward the CIMStopAllProvidersRequest to all providers
1444 kumpf 1.1 response.reset(_forwardRequestToAllAgents(request));
1445
1446 // Note: Do not uninitialize the ProviderAgentContainers here.
1447 // Just let the selecting thread notice when the agent connections
1448 // are closed.
1449 }
|
1450 carolann.graves 1.14 else if (request->getType () ==
1451 CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE)
1452 {
1453 _subscriptionInitComplete = true;
1454
1455 //
1456 // Forward the CIMSubscriptionInitCompleteRequestMessage to
1457 // all providers
1458 //
1459 response.reset (_forwardRequestToAllAgents (request));
1460 }
|
1461 kumpf 1.1 else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE)
1462 {
1463 CIMNotifyConfigChangeRequestMessage* notifyRequest =
1464 dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request);
1465 PEGASUS_ASSERT(notifyRequest != 0);
1466
1467 if (notifyRequest->currentValueModified)
1468 {
1469 // Forward the CIMNotifyConfigChangeRequestMessage to all providers
1470 response.reset(_forwardRequestToAllAgents(request));
1471 }
1472 else
1473 {
1474 // No need to notify provider agents about changes to planned value
1475 response.reset(request->buildResponse());
1476 }
1477 }
|
1478 kumpf 1.6 else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE)
|
1479 kumpf 1.1 {
|
1480 kumpf 1.6 // Fan out the request to all Provider Agent processes for this module
1481
|
1482 kumpf 1.1 // Retrieve the provider module name
1483 String moduleName;
1484 CIMValue nameValue = providerModule.getProperty(
1485 providerModule.findProperty("Name")).getValue();
1486 nameValue.get(moduleName);
1487
|
1488 kumpf 1.6 // Look up the Provider Agents for this module
1489 Array<ProviderAgentContainer*> paArray =
1490 _lookupProviderAgents(moduleName);
|
1491 kumpf 1.1
|
1492 kumpf 1.6 for (Uint32 i=0; i<paArray.size(); i++)
|
1493 kumpf 1.1 {
1494 //
1495 // Do not start up an agent process just to disable the module
1496 //
|
1497 kumpf 1.6 if (paArray[i]->isInitialized())
1498 {
1499 //
1500 // Forward the request to the provider agent
1501 //
1502 response.reset(paArray[i]->processMessage(request));
1503
1504 // Note: Do not uninitialize the ProviderAgentContainer here
1505 // when a disable module operation is successful. Just let the
1506 // selecting thread notice when the agent connection is closed.
1507
1508 // Determine the success of the disable module operation
1509 CIMDisableModuleResponseMessage* dmResponse =
1510 dynamic_cast<CIMDisableModuleResponseMessage*>(
1511 response.get());
1512 PEGASUS_ASSERT(dmResponse != 0);
1513
1514 Boolean isStopped = false;
1515 for (Uint32 i=0; i < dmResponse->operationalStatus.size(); i++)
1516 {
1517 if (dmResponse->operationalStatus[i] ==
1518 kumpf 1.6 CIM_MSE_OPSTATUS_VALUE_STOPPED)
1519 {
1520 isStopped = true;
1521 break;
1522 }
1523 }
1524
1525 // If the operation is unsuccessful, stop and return the error
1526 if ((dmResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1527 !isStopped)
1528 {
1529 break;
1530 }
1531 }
1532 }
1533
1534 // Use a default response if no Provider Agents were called
1535 if (!response.get())
1536 {
|
1537 kumpf 1.1 response.reset(request->buildResponse());
1538
1539 CIMDisableModuleResponseMessage* dmResponse =
1540 dynamic_cast<CIMDisableModuleResponseMessage*>(response.get());
1541 PEGASUS_ASSERT(dmResponse != 0);
1542
1543 Array<Uint16> operationalStatus;
1544 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED);
1545 dmResponse->operationalStatus = operationalStatus;
1546 }
|
1547 kumpf 1.6 }
1548 else if (request->getType() == CIM_ENABLE_MODULE_REQUEST_MESSAGE)
1549 {
1550 // Fan out the request to all Provider Agent processes for this module
1551
1552 // Retrieve the provider module name
1553 String moduleName;
1554 CIMValue nameValue = providerModule.getProperty(
1555 providerModule.findProperty("Name")).getValue();
1556 nameValue.get(moduleName);
1557
1558 // Look up the Provider Agents for this module
1559 Array<ProviderAgentContainer*> paArray =
1560 _lookupProviderAgents(moduleName);
1561
1562 for (Uint32 i=0; i<paArray.size(); i++)
|
1563 kumpf 1.1 {
1564 //
1565 // Do not start up an agent process just to enable the module
1566 //
|
1567 kumpf 1.6 if (paArray[i]->isInitialized())
1568 {
1569 //
1570 // Forward the request to the provider agent
1571 //
1572 response.reset(paArray[i]->processMessage(request));
1573
1574 // Determine the success of the enable module operation
1575 CIMEnableModuleResponseMessage* emResponse =
1576 dynamic_cast<CIMEnableModuleResponseMessage*>(
1577 response.get());
1578 PEGASUS_ASSERT(emResponse != 0);
1579
1580 Boolean isOk = false;
1581 for (Uint32 i=0; i < emResponse->operationalStatus.size(); i++)
1582 {
1583 if (emResponse->operationalStatus[i] ==
1584 CIM_MSE_OPSTATUS_VALUE_OK)
1585 {
1586 isOk = true;
1587 break;
1588 kumpf 1.6 }
1589 }
1590
1591 // If the operation is unsuccessful, stop and return the error
1592 if ((emResponse->cimException.getCode() != CIM_ERR_SUCCESS) ||
1593 !isOk)
1594 {
1595 break;
1596 }
1597 }
1598 }
1599
1600 // Use a default response if no Provider Agents were called
1601 if (!response.get())
1602 {
|
1603 kumpf 1.1 response.reset(request->buildResponse());
1604
1605 CIMEnableModuleResponseMessage* emResponse =
1606 dynamic_cast<CIMEnableModuleResponseMessage*>(response.get());
1607 PEGASUS_ASSERT(emResponse != 0);
1608
1609 Array<Uint16> operationalStatus;
1610 operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_OK);
1611 emResponse->operationalStatus = operationalStatus;
1612 }
|
1613 kumpf 1.6 }
1614 else
1615 {
1616 // Retrieve the provider module name
1617 String moduleName;
1618 CIMValue nameValue = providerModule.getProperty(
1619 providerModule.findProperty("Name")).getValue();
1620 nameValue.get(moduleName);
1621
1622 // Retrieve the provider user context configuration
1623 Uint16 userContext = 0;
1624 Uint32 pos = providerModule.findProperty(
1625 PEGASUS_PROPERTYNAME_MODULE_USERCONTEXT);
1626 if (pos != PEG_NOT_FOUND)
1627 {
|
1628 kumpf 1.12 CIMValue userContextValue =
1629 providerModule.getProperty(pos).getValue();
1630 if (!userContextValue.isNull())
1631 {
1632 userContextValue.get(userContext);
1633 }
|
1634 kumpf 1.6 }
1635
1636 if (userContext == 0)
1637 {
1638 userContext = PG_PROVMODULE_USERCTXT_PRIVILEGED;
1639 }
1640
1641 String userName;
1642
1643 if (userContext == PG_PROVMODULE_USERCTXT_REQUESTOR)
|
1644 kumpf 1.1 {
|
1645 kumpf 1.6 try
1646 {
1647 // User Name is in the OperationContext
1648 IdentityContainer ic = (IdentityContainer)
1649 request->operationContext.get(IdentityContainer::NAME);
1650 userName = ic.getUserName();
1651 }
1652 catch (Exception& e)
1653 {
1654 // If no IdentityContainer is present, default to the CIM
1655 // Server's user context
1656 }
|
1657 kumpf 1.1
|
1658 kumpf 1.6 // If authentication is disabled, use the CIM Server's user context
1659 if (!userName.size())
1660 {
1661 userName = System::getEffectiveUserName();
1662 }
1663 }
1664 else if (userContext == PG_PROVMODULE_USERCTXT_DESIGNATED)
1665 {
1666 // Retrieve the provider module name
1667 providerModule.getProperty(providerModule.findProperty(
1668 PEGASUS_PROPERTYNAME_MODULE_DESIGNATEDUSER)).getValue().
1669 get(userName);
1670 }
1671 else if (userContext == PG_PROVMODULE_USERCTXT_CIMSERVER)
1672 {
1673 userName = System::getEffectiveUserName();
1674 }
1675 else // Privileged User
1676 {
1677 PEGASUS_ASSERT(userContext == PG_PROVMODULE_USERCTXT_PRIVILEGED);
1678 userName = System::getPrivilegedUserName();
|
1679 kumpf 1.1 }
|
1680 kumpf 1.6
1681 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1682 "Module name = " + moduleName);
1683 Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1684 "User context = %hd.", userContext);
1685 PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL4,
1686 "User name = " + userName);
1687
1688 // Look up the Provider Agent for this module and user
1689 ProviderAgentContainer* pa = _lookupProviderAgent(moduleName, userName);
1690 PEGASUS_ASSERT(pa != 0);
1691
1692 //
1693 // Forward the request to the provider agent
1694 //
1695 response.reset(pa->processMessage(request));
|
1696 kumpf 1.1 }
1697
1698 response->syncAttributes(request);
1699
1700 PEG_METHOD_EXIT();
1701 return response.release();
1702 }
1703
1704 ProviderAgentContainer* OOPProviderManagerRouter::_lookupProviderAgent(
|
1705 kumpf 1.6 const String& moduleName,
1706 const String& userName)
|
1707 kumpf 1.1 {
1708 ProviderAgentContainer* pa = 0;
|
1709 kumpf 1.6 String key = moduleName + ":" + userName;
|
1710 kumpf 1.1
1711 AutoMutex lock(_providerAgentTableMutex);
|
1712 kumpf 1.6 if (!_providerAgentTable.lookup(key, pa))
|
1713 kumpf 1.1 {
|
1714 kumpf 1.6 pa = new ProviderAgentContainer(
|
1715 a.dunfey 1.22.2.2 moduleName, userName, _indicationCallback, _responseChunkCallback,
|
1716 carolann.graves 1.14 _subscriptionInitComplete);
|
1717 kumpf 1.6 _providerAgentTable.insert(key, pa);
|
1718 kumpf 1.1 }
1719 return pa;
1720 }
1721
|
1722 kumpf 1.6 Array<ProviderAgentContainer*> OOPProviderManagerRouter::_lookupProviderAgents(
1723 const String& moduleName)
1724 {
1725 Array<ProviderAgentContainer*> paArray;
1726
1727 AutoMutex lock(_providerAgentTableMutex);
1728 for (ProviderAgentTable::Iterator i = _providerAgentTable.start(); i; i++)
1729 {
1730 if (i.value()->getModuleName() == moduleName)
1731 {
1732 paArray.append(i.value());
1733 }
1734 }
1735 return paArray;
1736 }
1737
|
1738 kumpf 1.1 CIMResponseMessage* OOPProviderManagerRouter::_forwardRequestToAllAgents(
1739 CIMRequestMessage* request)
1740 {
1741 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1742 "OOPProviderManagerRouter::_forwardRequestToAllAgents");
1743
1744 // Get a list of the ProviderAgentContainers. We need our own array copy
1745 // because we cannot hold the _providerAgentTableMutex while calling
1746 // _ProviderAgentContainer::processMessage().
1747 Array<ProviderAgentContainer*> paContainerArray;
1748 {
1749 AutoMutex tableLock(_providerAgentTableMutex);
1750 for (ProviderAgentTable::Iterator i = _providerAgentTable.start();
1751 i != 0; i++)
1752 {
1753 paContainerArray.append(i.value());
1754 }
1755 }
1756
1757 CIMException responseException;
1758
1759 kumpf 1.1 // Forward the request to each of the initialized provider agents
1760 for (Uint32 j = 0; j < paContainerArray.size(); j++)
1761 {
1762 ProviderAgentContainer* pa = paContainerArray[j];
1763 if (pa->isInitialized())
1764 {
1765 // Note: The ProviderAgentContainer could become uninitialized
1766 // before _ProviderAgentContainer::processMessage() processes
1767 // this request. In this case, the Provider Agent process will
1768 // (unfortunately) be started to process this message.
1769 AutoPtr<CIMResponseMessage> response;
1770 response.reset(pa->processMessage(request));
1771 if (response.get() != 0)
1772 {
1773 // If the operation failed, save the exception data
1774 if ((response->cimException.getCode() != CIM_ERR_SUCCESS) &&
1775 (responseException.getCode() == CIM_ERR_SUCCESS))
1776 {
1777 responseException = response->cimException;
1778 }
1779 }
1780 kumpf 1.1 }
1781 }
1782
1783 CIMResponseMessage* response = request->buildResponse();
1784 response->cimException = responseException;
1785
1786 PEG_METHOD_EXIT();
1787 return response;
1788 }
1789
1790 Boolean OOPProviderManagerRouter::hasActiveProviders()
1791 {
1792 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1793 "OOPProviderManagerRouter::hasActiveProviders");
1794
1795 // Iterate through the _providerAgentTable looking for initialized agents
1796 AutoMutex lock(_providerAgentTableMutex);
1797 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1798 for(; i != 0; i++)
1799 {
1800 if (i.value()->isInitialized())
1801 kumpf 1.1 {
1802 PEG_METHOD_EXIT();
1803 return true;
1804 }
1805 }
1806
1807 // No initialized Provider Agents were found
1808 PEG_METHOD_EXIT();
1809 return false;
1810 }
1811
1812 void OOPProviderManagerRouter::unloadIdleProviders()
1813 {
1814 PEG_METHOD_ENTER(TRC_PROVIDERMANAGER,
1815 "OOPProviderManagerRouter::unloadIdleProviders");
1816
1817 // Iterate through the _providerAgentTable unloading idle providers
1818 AutoMutex lock(_providerAgentTableMutex);
1819 ProviderAgentTable::Iterator i = _providerAgentTable.start();
1820 for(; i != 0; i++)
1821 {
1822 kumpf 1.1 i.value()->unloadIdleProviders();
1823 }
1824
1825 PEG_METHOD_EXIT();
1826 }
1827
1828 PEGASUS_NAMESPACE_END
|