1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 tony 1.1 //
|
3 karl 1.6 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.4 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.6 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.7 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 tony 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //
33 // Author: Dong Xiang, EMC Corporation (xiang_dong@emc.com)
34 tony 1.1 //
|
35 se.gupta 1.5 // Modified By: Seema Gupta (gseema@in.ibm.com) for PEP135
|
36 tony 1.1 //
37 //%/////////////////////////////////////////////////////////////////////////////
38
39 #include "CIMListenerIndicationDispatcher.h"
40
41 #include <Pegasus/Common/Config.h>
42 #include <Pegasus/Common/Constants.h>
43 #include <Pegasus/Common/OperationContext.h>
44 #include <Pegasus/Common/CIMMessage.h>
45 #include <Pegasus/Common/Thread.h>
46 #include <Pegasus/Common/Tracer.h>
47
48 #include <Pegasus/Listener/List.h>
49 #include <Pegasus/Consumer/CIMIndicationConsumer.h>
|
50 kumpf 1.11 #include <Pegasus/Common/ContentLanguageList.h>
|
51 tony 1.1
52 PEGASUS_NAMESPACE_BEGIN
53
54 ///////////////////////////////////////////////////////////////////////////////
55 // CIMListenerIndicationDispatchEvent
56 ///////////////////////////////////////////////////////////////////////////////
57 class CIMListenerIndicationDispatchEvent
58 {
59 public:
|
60 chuck 1.2 CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
61 String url,
62 CIMInstance instance,
|
63 kumpf 1.11 ContentLanguageList contentLangs);
|
64 tony 1.1 ~CIMListenerIndicationDispatchEvent();
65
66 CIMIndicationConsumer* getConsumer() const;
67
68 String getURL() const;
69 CIMInstance getIndicationInstance() const;
|
70 kumpf 1.11 ContentLanguageList getContentLanguages() const;
|
71 tony 1.1
72 private:
73 CIMIndicationConsumer* _consumer;
74 String _url;
75 CIMInstance _instance;
|
76 kumpf 1.11 ContentLanguageList _contentLangs;
|
77 tony 1.1 };
78
|
79 chuck 1.2 CIMListenerIndicationDispatchEvent::CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
80 String url,
81 CIMInstance instance,
|
82 kumpf 1.11 ContentLanguageList contentLangs)
|
83 chuck 1.2 :_consumer(consumer),_url(url),_instance(instance), _contentLangs(contentLangs)
|
84 tony 1.1 {
85 }
86 CIMListenerIndicationDispatchEvent::~CIMListenerIndicationDispatchEvent()
87 {
88 }
89 CIMIndicationConsumer* CIMListenerIndicationDispatchEvent::getConsumer() const
90 {
91 return _consumer;
92 }
93 String CIMListenerIndicationDispatchEvent::getURL() const
94 {
95 return _url;
96 }
97 CIMInstance CIMListenerIndicationDispatchEvent::getIndicationInstance() const
98 {
99 return _instance;
100 }
|
101 kumpf 1.11 ContentLanguageList CIMListenerIndicationDispatchEvent::getContentLanguages() const
|
102 chuck 1.2 {
103 return _contentLangs;
104 }
|
105 tony 1.1
106 ///////////////////////////////////////////////////////////////////////////////
107 // CIMListenerIndicationDispatcherRep
108 ///////////////////////////////////////////////////////////////////////////////
109 class CIMListenerIndicationDispatcherRep
110 {
111 public:
112 CIMListenerIndicationDispatcherRep();
113 virtual ~CIMListenerIndicationDispatcherRep();
114
115 Boolean addConsumer(CIMIndicationConsumer* consumer);
116 Boolean removeConsumer(CIMIndicationConsumer* consumer);
117
118 CIMExportIndicationResponseMessage* handleIndicationRequest(CIMExportIndicationRequestMessage* request);
119
120
|
121 mike 1.13 static ThreadReturnType PEGASUS_THREAD_CDECL deliver_routine(void *param);
|
122 tony 1.1
123 private:
|
124 kumpf 1.11 void deliverIndication(String url, CIMInstance instance, ContentLanguageList contentLangs);
|
125 tony 1.1
126 ThreadPool* _thread_pool;
127 PtrList* _consumers;
128 };
129
|
130 kumpf 1.9 static struct timeval deallocateWait = {15, 0};
|
131 tony 1.1
132
133 CIMListenerIndicationDispatcherRep::CIMListenerIndicationDispatcherRep()
134 :_thread_pool(new ThreadPool(0, "ListenerIndicationDispatcher", 0, 0,
|
135 kumpf 1.9 deallocateWait))
|
136 tony 1.1 ,_consumers(new PtrList())
137 {
138
139 }
140 CIMListenerIndicationDispatcherRep::~CIMListenerIndicationDispatcherRep()
141 {
|
142 kumpf 1.9 delete _thread_pool;
143 delete _consumers;
|
144 tony 1.1 }
145
146 Boolean CIMListenerIndicationDispatcherRep::addConsumer(CIMIndicationConsumer* consumer)
147 {
148 _consumers->add(consumer);
149 return true;
150 }
151 Boolean CIMListenerIndicationDispatcherRep::removeConsumer(CIMIndicationConsumer* consumer)
152 {
153 _consumers->remove(consumer);
154 return true;
155 }
156 CIMExportIndicationResponseMessage* CIMListenerIndicationDispatcherRep::handleIndicationRequest(CIMExportIndicationRequestMessage* request)
157 {
158 PEG_METHOD_ENTER(TRC_SERVER,
159 "CIMListenerIndicationDispatcherRep::handleIndicationRequest");
160
161 CIMInstance instance = request->indicationInstance;
162 String url = request->destinationPath;
|
163 kumpf 1.11 ContentLanguageList contentLangs =((ContentLanguageListContainer)request->operationContext.
|
164 se.gupta 1.5 get(ContentLanguageListContainer::NAME)).getLanguages();
|
165 tony 1.1
|
166 chuck 1.2 deliverIndication(url,instance,contentLangs);
|
167 tony 1.1
168 // compose a response message
169 CIMException cimException;
170
171 CIMExportIndicationResponseMessage* response = new CIMExportIndicationResponseMessage(
172 request->messageId,
173 cimException,
174 request->queueIds.copyAndPop());
175
176 response->dest = request->queueIds.top();
177
178 PEG_METHOD_EXIT();
179
180 return response;
181 }
182
|
183 chuck 1.2 void CIMListenerIndicationDispatcherRep::deliverIndication(String url,
184 CIMInstance instance,
|
185 kumpf 1.11 ContentLanguageList contentLangs)
|
186 tony 1.1 {
187 // go thru all consumers and broadcast the result; should be run in seperate thread
188 Iterator* it = _consumers->iterator();
189 while(it->hasNext()==true)
190 {
191 CIMIndicationConsumer* consumer = static_cast<CIMIndicationConsumer*>(it->next());
|
192 chuck 1.2 CIMListenerIndicationDispatchEvent* event = new CIMListenerIndicationDispatchEvent(
193 consumer,
194 url,
195 instance,
196 contentLangs);
|
197 konrad.r 1.10 ThreadStatus rtn = _thread_pool->allocate_and_awaken(event,deliver_routine);
198
199 if (rtn != PEGASUS_THREAD_OK)
200 {
201 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
202 "Not enough threads to allocate a worker to deliver the event. ");
203
204 Tracer::trace(TRC_SERVER, Tracer::LEVEL2,
205 "Could not allocate thread to deliver event. Instead using current thread.");
206 delete event;
207 throw Exception(MessageLoaderParms("Listener.CIMListenerIndicationDispatcher.CANNOT_ALLOCATE_THREAD",
208 "Not enough threads to allocate a worker to deliver the event."));
209 }
|
210 tony 1.1 }
211 }
|
212 mike 1.13 ThreadReturnType PEGASUS_THREAD_CDECL CIMListenerIndicationDispatcherRep::deliver_routine(void *param)
|
213 tony 1.1 {
214 CIMListenerIndicationDispatchEvent* event = static_cast<CIMListenerIndicationDispatchEvent*>(param);
215
216 if(event!=NULL)
217 {
218 CIMIndicationConsumer* consumer = event->getConsumer();
219 OperationContext context;
|
220 chuck 1.2 context.insert(ContentLanguageListContainer(event->getContentLanguages()));
|
221 tony 1.1 if(consumer)
222 {
223 consumer->consumeIndication(context,event->getURL(),event->getIndicationInstance());
224 }
225
226 delete event;
227 }
228
229 return (0);
230 }
231
232 ///////////////////////////////////////////////////////////////////////////////
233 // CIMListenerIndicationDispatcher
234 ///////////////////////////////////////////////////////////////////////////////
235 CIMListenerIndicationDispatcher::CIMListenerIndicationDispatcher()
236 :Base(PEGASUS_QUEUENAME_LISTENERINDICATIONDISPACTCHER)
237 ,_rep(new CIMListenerIndicationDispatcherRep())
238 {
239 }
240 CIMListenerIndicationDispatcher::~CIMListenerIndicationDispatcher()
241 {
242 tony 1.1 if(_rep!=NULL)
|
243 tony 1.3 delete static_cast<CIMListenerIndicationDispatcherRep*>(_rep);
|
244 tony 1.1
245 _rep=NULL;
246 }
247
248 void CIMListenerIndicationDispatcher::handleEnqueue()
249 {
250 PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
251
252 Message *message = dequeue();
253 if(message)
254 handleEnqueue(message);
255
256 PEG_METHOD_EXIT();
257 }
258
259 void CIMListenerIndicationDispatcher::handleEnqueue(Message* message)
260 {
261 PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
262
263 if(message!=NULL)
264 {
265 tony 1.1 switch (message->getType())
266 {
267 case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
268 {
269 CIMExportIndicationRequestMessage* request = (CIMExportIndicationRequestMessage*)message;
270
271 CIMExportIndicationResponseMessage* response =
272 static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->handleIndicationRequest(request);
273
274 _enqueueResponse(request, response);
275 }
276 break;
277 default:
278 break;
279 }
280 delete message;
281 }
282
283 PEG_METHOD_EXIT();
284 }
285 Boolean CIMListenerIndicationDispatcher::addConsumer(CIMIndicationConsumer* consumer)
286 tony 1.1 {
287 return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->addConsumer(consumer);
288 }
289 Boolean CIMListenerIndicationDispatcher::removeConsumer(CIMIndicationConsumer* consumer)
290 {
291 return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->removeConsumer(consumer);
292 }
293
294 PEGASUS_NAMESPACE_END
|