1 martin 1.54 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.55 //
|
3 martin 1.54 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.55 //
|
10 martin 1.54 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.55 //
|
17 martin 1.54 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.55 //
|
20 martin 1.54 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.55 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.54 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.55 //
|
28 martin 1.54 //////////////////////////////////////////////////////////////////////////
|
29 mike 1.2 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Config.h>
|
33 kumpf 1.15 #include <Pegasus/Common/PegasusVersion.h>
|
34 kumpf 1.19 #include <Pegasus/Common/Tracer.h>
|
35 kumpf 1.26 #include <Pegasus/Server/CIMOperationRequestDispatcher.h>
|
36 mike 1.2
37 #include "CIMExportRequestDispatcher.h"
38
39 PEGASUS_USING_STD;
40
41 PEGASUS_NAMESPACE_BEGIN
42
|
43 kumpf 1.6 CIMExportRequestDispatcher::CIMExportRequestDispatcher()
|
44 carolann.graves 1.43 : Base(PEGASUS_QUEUENAME_EXPORTREQDISPATCHER)
|
45 kumpf 1.6 {
|
46 kumpf 1.19 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
47 "CIMExportRequestDispatcher::CIMExportRequestDispatcher");
|
48 mday 1.7
|
49 kumpf 1.19 PEG_METHOD_EXIT();
|
50 mike 1.2 }
51
52 CIMExportRequestDispatcher::~CIMExportRequestDispatcher()
53 {
|
54 kumpf 1.19 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
55 "CIMExportRequestDispatcher::~CIMExportRequestDispatcher");
|
56 mike 1.2
|
57 kumpf 1.19 PEG_METHOD_EXIT();
|
58 mike 1.2 }
59
|
60 kumpf 1.8 void CIMExportRequestDispatcher::_handle_async_request(AsyncRequest *req)
|
61 mike 1.2 {
|
62 kumpf 1.19 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
63 "CIMExportRequestDispatcher::_handle_async_request");
64
|
65 kumpf 1.45 PEGASUS_ASSERT(req != 0 && req->op != 0);
|
66 kumpf 1.29
|
67 kumpf 1.47 if (req->getType() == ASYNC_CIMSERVICE_STOP)
|
68 kumpf 1.8 {
69 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
70 }
|
71 kumpf 1.47 else if (req->getType() == ASYNC_ASYNC_LEGACY_OP_START)
|
72 kumpf 1.8 {
|
73 kumpf 1.29 Message *legacy =
74 (static_cast<AsyncLegacyOperationStart *>(req)->get_action());
75 if (legacy->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE)
76 {
77 Message* legacy_response = _handleExportIndicationRequest(
78 (CIMExportIndicationRequestMessage*) legacy);
|
79 marek 1.57 // constructor puts itself into a linked list, DO NOT remove the new
80 new AsyncLegacyOperationResult(req->op, legacy_response);
|
81 kumpf 1.29
|
82 venkat.puvvada 1.53 _complete_op_node(req->op);
|
83 kumpf 1.29 delete legacy;
84 }
85 else
86 {
|
87 marek 1.49 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
88 kumpf 1.29 "CIMExportRequestDispatcher::_handle_async_request got "
|
89 kumpf 1.47 "unexpected legacy message type '%s'",
90 MessageTypeToString(legacy->getType())));
|
91 kumpf 1.45 _make_response(req, async_results::CIM_NAK);
|
92 kumpf 1.29 delete legacy;
93 }
|
94 kumpf 1.8 }
95 else
|
96 kumpf 1.19 {
|
97 kumpf 1.8 Base::_handle_async_request(req);
|
98 kumpf 1.19 }
99 PEG_METHOD_EXIT();
|
100 kumpf 1.8 }
|
101 mike 1.2
|
102 mday 1.9 void CIMExportRequestDispatcher::handleEnqueue(Message* message)
|
103 kumpf 1.8 {
|
104 kumpf 1.45 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
105 "CIMExportRequestDispatcher::handleEnqueue");
|
106 mike 1.2
|
107 carolann.graves 1.42 PEGASUS_ASSERT(message != 0);
|
108 chip 1.11
|
109 kumpf 1.8 switch (message->getType())
|
110 mike 1.2 {
|
111 kumpf 1.45 case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
|
112 kumpf 1.29 {
113 CIMExportIndicationResponseMessage* response =
114 _handleExportIndicationRequest(
115 (CIMExportIndicationRequestMessage*) message);
|
116 j.alex 1.36
|
117 marek 1.44 PEG_TRACE((
|
118 j.alex 1.36 TRC_HTTP,
|
119 marek 1.49 Tracer::LEVEL4,
|
120 kumpf 1.45 "_CIMExportRequestDispatcher::handleEnqueue(message) - "
121 "message->getCloseConnect() returned %d",
|
122 marek 1.44 message->getCloseConnect()));
|
123 j.alex 1.36
124 response->setCloseConnect(message->getCloseConnect());
125
|
126 sahana.prabhakar 1.56 MessageQueue* queue = MessageQueue::lookup(response->dest);
127 PEGASUS_ASSERT(queue != 0);
128
129 queue->enqueue(response);
|
130 kumpf 1.45 break;
|
131 kumpf 1.29 }
|
132 kumpf 1.8
133 default:
|
134 dl.meetei 1.58 PEGASUS_UNREACHABLE(PEGASUS_ASSERT(0);)
|
135 kumpf 1.8 break;
|
136 mike 1.2 }
|
137 kumpf 1.8 delete message;
|
138 mike 1.2
|
139 kumpf 1.19 PEG_METHOD_EXIT();
|
140 mike 1.2 }
|
141 mday 1.10
142 void CIMExportRequestDispatcher::handleEnqueue()
143 {
|
144 kumpf 1.48 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
145 "CIMExportRequestDispatcher::handleEnqueue");
146
147 // It is important to handle the enqueued message on a separate thread,
148 // because this method is likely to be processing on a central (Monitor)
149 // thread and handling the message will likely include a call to a
150 // provider. The thread is launched here rather than at a lower level
151 // because async messages are handled differently and the
152 // _handleExportIndicationRequest message does not have enough context
153 // to manage the difference.
154
155 ThreadStatus rtn = PEGASUS_THREAD_OK;
156 while ((rtn = _thread_pool->allocate_and_awaken(
157 (void *)this,
158 CIMExportRequestDispatcher::_handleEnqueueOnThread)) !=
159 PEGASUS_THREAD_OK)
160 {
161 if (rtn != PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
162 {
|
163 marek 1.49 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
164 kumpf 1.48 "Could not allocate thread for %s.",
165 getQueueName()));
166 break;
167 }
|
168 kumpf 1.19
|
169 kumpf 1.48 Threads::yield();
170 }
|
171 chip 1.11
|
172 kumpf 1.48 PEG_METHOD_EXIT();
|
173 mday 1.10 }
174
|
175 kumpf 1.48 // Note: This method should not throw an exception. It is used as a thread
176 // entry point, and any exceptions thrown are ignored.
177 ThreadReturnType PEGASUS_THREAD_CDECL
178 CIMExportRequestDispatcher::_handleEnqueueOnThread(void* arg)
179 {
180 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
181 "CIMExportRequestDispatcher::_handleEnqueueOnThread");
182
183 PEGASUS_ASSERT(arg != 0);
184
185 CIMExportRequestDispatcher* dispatcher =
186 reinterpret_cast<CIMExportRequestDispatcher*>(arg);
187 PEGASUS_ASSERT(dispatcher != 0);
188
189 Message* message = dispatcher->dequeue();
190 if (message)
191 {
192 dispatcher->handleEnqueue(message);
193 }
194
195 PEG_METHOD_EXIT();
196 kumpf 1.48 return ThreadReturnType(0);
197 }
|
198 mike 1.2
|
199 kumpf 1.29 CIMExportIndicationResponseMessage*
200 CIMExportRequestDispatcher::_handleExportIndicationRequest(
|
201 mike 1.2 CIMExportIndicationRequestMessage* request)
202 {
|
203 kumpf 1.19 PEG_METHOD_ENTER(TRC_EXP_REQUEST_DISP,
|
204 kumpf 1.45 "CIMExportRequestDispatcher::_handleExportIndicationRequest");
|
205 kumpf 1.19
|
206 mike 1.2 OperationContext context;
207
|
208 kumpf 1.12 CIMException cimException;
|
209 mike 1.2
|
210 venkat.puvvada 1.51 Uint32 serviceId =find_service_qid(PEGASUS_QUEUENAME_PROVIDERMANAGER_CPP);
|
211 kumpf 1.26
|
212 marek 1.49 PEG_TRACE ((TRC_INDICATION_RECEIPT, Tracer::LEVEL4,
|
213 w.otsuka 1.46 "%s Indication %s received in export server for destination %s",
214 (const char*)(request->indicationInstance.getClassName().getString().
215 getCString()),
216 (const char*)(request->messageId.getCString()),
217 (const char*)(request->destinationPath.getCString())));
|
218 kumpf 1.45 AsyncOpNode* op = this->get_op();
|
219 kumpf 1.26
220 AsyncLegacyOperationStart * asyncRequest =
221 new AsyncLegacyOperationStart(
|
222 kumpf 1.45 op,
|
223 venkat.puvvada 1.51 serviceId,
|
224 venkat.puvvada 1.50 new CIMExportIndicationRequestMessage(*request));
|
225 kumpf 1.26
|
226 venkat.puvvada 1.51 asyncRequest->dest = serviceId;
|
227 kumpf 1.26
|
228 kumpf 1.29 //SendAsync(op,
229 // serviceIds[0],
230 // CIMExportRequestDispatcher::_forwardRequestCallback,
231 // this,
232 // (void *)request->queueIds.top());
233 AsyncReply *asyncReply = SendWait(asyncRequest);
234
235 CIMExportIndicationResponseMessage* response =
236 reinterpret_cast<CIMExportIndicationResponseMessage *>(
237 (static_cast<AsyncLegacyOperationResult *>(
238 asyncReply))->get_result());
239 response->dest = request->queueIds.top();
240
241 delete asyncReply; // Recipient deletes request
242 this->return_op(op);
243
|
244 kumpf 1.19 PEG_METHOD_EXIT();
|
245 kumpf 1.29 return response;
|
246 mike 1.2 }
247
248 PEGASUS_NAMESPACE_END
|