1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 tony 1.1 //
|
3 karl 1.6 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.4 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.6 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.7 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 tony 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 tony 1.1 #include "CIMListenerIndicationDispatcher.h"
35
36 #include <Pegasus/Common/Config.h>
37 #include <Pegasus/Common/Constants.h>
38 #include <Pegasus/Common/OperationContext.h>
39 #include <Pegasus/Common/CIMMessage.h>
40 #include <Pegasus/Common/Thread.h>
41 #include <Pegasus/Common/Tracer.h>
42
43 #include <Pegasus/Listener/List.h>
44 #include <Pegasus/Consumer/CIMIndicationConsumer.h>
|
45 kumpf 1.11 #include <Pegasus/Common/ContentLanguageList.h>
|
46 tony 1.1
47 PEGASUS_NAMESPACE_BEGIN
48
49 ///////////////////////////////////////////////////////////////////////////////
50 // CIMListenerIndicationDispatchEvent
51 ///////////////////////////////////////////////////////////////////////////////
52 class CIMListenerIndicationDispatchEvent
53 {
54 public:
|
55 chuck 1.2 CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
56 String url,
57 CIMInstance instance,
|
58 kumpf 1.11 ContentLanguageList contentLangs);
|
59 tony 1.1 ~CIMListenerIndicationDispatchEvent();
60
61 CIMIndicationConsumer* getConsumer() const;
62
63 String getURL() const;
64 CIMInstance getIndicationInstance() const;
|
65 kumpf 1.11 ContentLanguageList getContentLanguages() const;
|
66 tony 1.1
67 private:
68 CIMIndicationConsumer* _consumer;
69 String _url;
70 CIMInstance _instance;
|
71 kumpf 1.11 ContentLanguageList _contentLangs;
|
72 tony 1.1 };
73
|
74 chuck 1.2 CIMListenerIndicationDispatchEvent::CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
75 String url,
76 CIMInstance instance,
|
77 kumpf 1.11 ContentLanguageList contentLangs)
|
78 chuck 1.2 :_consumer(consumer),_url(url),_instance(instance), _contentLangs(contentLangs)
|
79 tony 1.1 {
80 }
81 CIMListenerIndicationDispatchEvent::~CIMListenerIndicationDispatchEvent()
82 {
83 }
84 CIMIndicationConsumer* CIMListenerIndicationDispatchEvent::getConsumer() const
85 {
86 return _consumer;
87 }
88 String CIMListenerIndicationDispatchEvent::getURL() const
89 {
90 return _url;
91 }
92 CIMInstance CIMListenerIndicationDispatchEvent::getIndicationInstance() const
93 {
94 return _instance;
95 }
|
96 kumpf 1.11 ContentLanguageList CIMListenerIndicationDispatchEvent::getContentLanguages() const
|
97 chuck 1.2 {
98 return _contentLangs;
99 }
|
100 tony 1.1
101 ///////////////////////////////////////////////////////////////////////////////
102 // CIMListenerIndicationDispatcherRep
103 ///////////////////////////////////////////////////////////////////////////////
104 class CIMListenerIndicationDispatcherRep
105 {
106 public:
107 CIMListenerIndicationDispatcherRep();
108 virtual ~CIMListenerIndicationDispatcherRep();
109
110 Boolean addConsumer(CIMIndicationConsumer* consumer);
111 Boolean removeConsumer(CIMIndicationConsumer* consumer);
112
113 CIMExportIndicationResponseMessage* handleIndicationRequest(CIMExportIndicationRequestMessage* request);
114
115
|
116 mike 1.13 static ThreadReturnType PEGASUS_THREAD_CDECL deliver_routine(void *param);
|
117 tony 1.1
118 private:
|
119 kumpf 1.11 void deliverIndication(String url, CIMInstance instance, ContentLanguageList contentLangs);
|
120 tony 1.1
121 ThreadPool* _thread_pool;
122 PtrList* _consumers;
123 };
124
|
125 kumpf 1.9 static struct timeval deallocateWait = {15, 0};
|
126 tony 1.1
127
128 CIMListenerIndicationDispatcherRep::CIMListenerIndicationDispatcherRep()
129 :_thread_pool(new ThreadPool(0, "ListenerIndicationDispatcher", 0, 0,
|
130 kumpf 1.9 deallocateWait))
|
131 tony 1.1 ,_consumers(new PtrList())
132 {
133
134 }
135 CIMListenerIndicationDispatcherRep::~CIMListenerIndicationDispatcherRep()
136 {
|
137 kumpf 1.9 delete _thread_pool;
138 delete _consumers;
|
139 tony 1.1 }
140
141 Boolean CIMListenerIndicationDispatcherRep::addConsumer(CIMIndicationConsumer* consumer)
142 {
143 _consumers->add(consumer);
144 return true;
145 }
146 Boolean CIMListenerIndicationDispatcherRep::removeConsumer(CIMIndicationConsumer* consumer)
147 {
148 _consumers->remove(consumer);
149 return true;
150 }
151 CIMExportIndicationResponseMessage* CIMListenerIndicationDispatcherRep::handleIndicationRequest(CIMExportIndicationRequestMessage* request)
152 {
153 PEG_METHOD_ENTER(TRC_SERVER,
154 "CIMListenerIndicationDispatcherRep::handleIndicationRequest");
155
156 CIMInstance instance = request->indicationInstance;
157 String url = request->destinationPath;
|
158 kumpf 1.11 ContentLanguageList contentLangs =((ContentLanguageListContainer)request->operationContext.
|
159 se.gupta 1.5 get(ContentLanguageListContainer::NAME)).getLanguages();
|
160 tony 1.1
|
161 chuck 1.2 deliverIndication(url,instance,contentLangs);
|
162 tony 1.1
163 // compose a response message
164 CIMException cimException;
165
166 CIMExportIndicationResponseMessage* response = new CIMExportIndicationResponseMessage(
167 request->messageId,
168 cimException,
169 request->queueIds.copyAndPop());
170
171 response->dest = request->queueIds.top();
172
173 PEG_METHOD_EXIT();
174
175 return response;
176 }
177
|
178 chuck 1.2 void CIMListenerIndicationDispatcherRep::deliverIndication(String url,
179 CIMInstance instance,
|
180 kumpf 1.11 ContentLanguageList contentLangs)
|
181 tony 1.1 {
182 // go thru all consumers and broadcast the result; should be run in seperate thread
|
183 venkat.puvvada 1.14.12.1 AutoPtr<Iterator> it(_consumers->iterator());
|
184 tony 1.1 while(it->hasNext()==true)
185 {
186 CIMIndicationConsumer* consumer = static_cast<CIMIndicationConsumer*>(it->next());
|
187 chuck 1.2 CIMListenerIndicationDispatchEvent* event = new CIMListenerIndicationDispatchEvent(
188 consumer,
189 url,
190 instance,
191 contentLangs);
|
192 konrad.r 1.10 ThreadStatus rtn = _thread_pool->allocate_and_awaken(event,deliver_routine);
193
194 if (rtn != PEGASUS_THREAD_OK)
195 {
|
196 yi.zhou 1.14 Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,
|
197 konrad.r 1.10 "Not enough threads to allocate a worker to deliver the event. ");
198
199 Tracer::trace(TRC_SERVER, Tracer::LEVEL2,
200 "Could not allocate thread to deliver event. Instead using current thread.");
201 delete event;
202 throw Exception(MessageLoaderParms("Listener.CIMListenerIndicationDispatcher.CANNOT_ALLOCATE_THREAD",
203 "Not enough threads to allocate a worker to deliver the event."));
204 }
|
205 tony 1.1 }
206 }
|
207 mike 1.13 ThreadReturnType PEGASUS_THREAD_CDECL CIMListenerIndicationDispatcherRep::deliver_routine(void *param)
|
208 tony 1.1 {
209 CIMListenerIndicationDispatchEvent* event = static_cast<CIMListenerIndicationDispatchEvent*>(param);
210
211 if(event!=NULL)
212 {
213 CIMIndicationConsumer* consumer = event->getConsumer();
214 OperationContext context;
|
215 chuck 1.2 context.insert(ContentLanguageListContainer(event->getContentLanguages()));
|
216 tony 1.1 if(consumer)
217 {
218 consumer->consumeIndication(context,event->getURL(),event->getIndicationInstance());
219 }
220
221 delete event;
222 }
223
224 return (0);
225 }
226
227 ///////////////////////////////////////////////////////////////////////////////
228 // CIMListenerIndicationDispatcher
229 ///////////////////////////////////////////////////////////////////////////////
230 CIMListenerIndicationDispatcher::CIMListenerIndicationDispatcher()
231 :Base(PEGASUS_QUEUENAME_LISTENERINDICATIONDISPACTCHER)
232 ,_rep(new CIMListenerIndicationDispatcherRep())
233 {
234 }
235 CIMListenerIndicationDispatcher::~CIMListenerIndicationDispatcher()
236 {
237 tony 1.1 if(_rep!=NULL)
|
238 tony 1.3 delete static_cast<CIMListenerIndicationDispatcherRep*>(_rep);
|
239 tony 1.1
240 _rep=NULL;
241 }
242
243 void CIMListenerIndicationDispatcher::handleEnqueue()
244 {
245 PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
246
247 Message *message = dequeue();
248 if(message)
249 handleEnqueue(message);
250
251 PEG_METHOD_EXIT();
252 }
253
254 void CIMListenerIndicationDispatcher::handleEnqueue(Message* message)
255 {
256 PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
257
258 if(message!=NULL)
259 {
260 tony 1.1 switch (message->getType())
261 {
262 case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
263 {
264 CIMExportIndicationRequestMessage* request = (CIMExportIndicationRequestMessage*)message;
265
266 CIMExportIndicationResponseMessage* response =
267 static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->handleIndicationRequest(request);
268
269 _enqueueResponse(request, response);
270 }
271 break;
272 default:
273 break;
274 }
275 delete message;
276 }
277
278 PEG_METHOD_EXIT();
279 }
280 Boolean CIMListenerIndicationDispatcher::addConsumer(CIMIndicationConsumer* consumer)
281 tony 1.1 {
282 return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->addConsumer(consumer);
283 }
284 Boolean CIMListenerIndicationDispatcher::removeConsumer(CIMIndicationConsumer* consumer)
285 {
286 return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->removeConsumer(consumer);
287 }
288
289 PEGASUS_NAMESPACE_END
|