1 tony 1.1 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 tony 1.1 //==============================================================================
23 //
24 //
25 // Author: Dong Xiang, EMC Corporation (xiang_dong@emc.com)
26 //
27 // Modified By:
28 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "CIMListenerIndicationDispatcher.h"
32
33 #include <Pegasus/Common/Config.h>
34 #include <Pegasus/Common/Constants.h>
35 #include <Pegasus/Common/OperationContext.h>
36 #include <Pegasus/Common/CIMMessage.h>
37 #include <Pegasus/Common/Thread.h>
38 #include <Pegasus/Common/Tracer.h>
39
40 #include <Pegasus/Listener/List.h>
41 #include <Pegasus/Consumer/CIMIndicationConsumer.h>
42
43 tony 1.1 PEGASUS_NAMESPACE_BEGIN
44
45 ///////////////////////////////////////////////////////////////////////////////
46 // CIMListenerIndicationDispatchEvent
47 ///////////////////////////////////////////////////////////////////////////////
48 class CIMListenerIndicationDispatchEvent
49 {
50 public:
51 CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer, String url, CIMInstance instance);
52 ~CIMListenerIndicationDispatchEvent();
53
54 CIMIndicationConsumer* getConsumer() const;
55
56 String getURL() const;
57 CIMInstance getIndicationInstance() const;
58
59 private:
60 CIMIndicationConsumer* _consumer;
61 String _url;
62 CIMInstance _instance;
63 };
64 tony 1.1
65 CIMListenerIndicationDispatchEvent::CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer, String url, CIMInstance instance)
66 :_consumer(consumer),_url(url),_instance(instance)
67 {
68 }
69 CIMListenerIndicationDispatchEvent::~CIMListenerIndicationDispatchEvent()
70 {
71 }
72 CIMIndicationConsumer* CIMListenerIndicationDispatchEvent::getConsumer() const
73 {
74 return _consumer;
75 }
76 String CIMListenerIndicationDispatchEvent::getURL() const
77 {
78 return _url;
79 }
80 CIMInstance CIMListenerIndicationDispatchEvent::getIndicationInstance() const
81 {
82 return _instance;
83 }
84
85 tony 1.1 ///////////////////////////////////////////////////////////////////////////////
86 // CIMListenerIndicationDispatcherRep
87 ///////////////////////////////////////////////////////////////////////////////
88 class CIMListenerIndicationDispatcherRep
89 {
90 public:
91 CIMListenerIndicationDispatcherRep();
92 virtual ~CIMListenerIndicationDispatcherRep();
93
94 Boolean addConsumer(CIMIndicationConsumer* consumer);
95 Boolean removeConsumer(CIMIndicationConsumer* consumer);
96
97 CIMExportIndicationResponseMessage* handleIndicationRequest(CIMExportIndicationRequestMessage* request);
98
99
100 static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL deliver_routine(void *param);
101
102 private:
103 void deliverIndication(String url, CIMInstance instance);
104
105 ThreadPool* _thread_pool;
106 tony 1.1 PtrList* _consumers;
107 };
108
109 static struct timeval create_time = {0, 1};
110 static struct timeval destroy_time = {15, 0};
111 static struct timeval deadlock_time = {0, 0};
112
113
114 CIMListenerIndicationDispatcherRep::CIMListenerIndicationDispatcherRep()
115 :_thread_pool(new ThreadPool(0, "ListenerIndicationDispatcher", 0, 0,
116 create_time, destroy_time, deadlock_time))
117 ,_consumers(new PtrList())
118 {
119
120 }
121 CIMListenerIndicationDispatcherRep::~CIMListenerIndicationDispatcherRep()
122 {
123 if(_thread_pool!=NULL)
124 {
125 _thread_pool->kill_dead_threads();
126 delete _thread_pool;
127 tony 1.1 }
128 if(_consumers!=NULL)
129 delete _consumers;
130 }
131
132 Boolean CIMListenerIndicationDispatcherRep::addConsumer(CIMIndicationConsumer* consumer)
133 {
134 _consumers->add(consumer);
135 return true;
136 }
137 Boolean CIMListenerIndicationDispatcherRep::removeConsumer(CIMIndicationConsumer* consumer)
138 {
139 _consumers->remove(consumer);
140 return true;
141 }
142 CIMExportIndicationResponseMessage* CIMListenerIndicationDispatcherRep::handleIndicationRequest(CIMExportIndicationRequestMessage* request)
143 {
144 PEG_METHOD_ENTER(TRC_SERVER,
145 "CIMListenerIndicationDispatcherRep::handleIndicationRequest");
146
147 CIMInstance instance = request->indicationInstance;
148 tony 1.1 String url = request->destinationPath;
149
150 deliverIndication(url,instance);
151
152 // compose a response message
153 CIMException cimException;
154
155 CIMExportIndicationResponseMessage* response = new CIMExportIndicationResponseMessage(
156 request->messageId,
157 cimException,
158 request->queueIds.copyAndPop());
159
160 response->dest = request->queueIds.top();
161
162 PEG_METHOD_EXIT();
163
164 return response;
165 }
166
167 void CIMListenerIndicationDispatcherRep::deliverIndication(String url, CIMInstance instance)
168 {
169 tony 1.1 // go thru all consumers and broadcast the result; should be run in seperate thread
170 Iterator* it = _consumers->iterator();
171 while(it->hasNext()==true)
172 {
173 CIMIndicationConsumer* consumer = static_cast<CIMIndicationConsumer*>(it->next());
174 CIMListenerIndicationDispatchEvent* event = new CIMListenerIndicationDispatchEvent(consumer,url,instance);
175 _thread_pool->allocate_and_awaken(event,deliver_routine);
176 }
177 }
178 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL CIMListenerIndicationDispatcherRep::deliver_routine(void *param)
179 {
180 CIMListenerIndicationDispatchEvent* event = static_cast<CIMListenerIndicationDispatchEvent*>(param);
181
182 if(event!=NULL)
183 {
184 CIMIndicationConsumer* consumer = event->getConsumer();
185 OperationContext context;
186 if(consumer)
187 {
188 consumer->consumeIndication(context,event->getURL(),event->getIndicationInstance());
189 }
190 tony 1.1
191 delete event;
192 }
193
194 return (0);
195 }
196
197 ///////////////////////////////////////////////////////////////////////////////
198 // CIMListenerIndicationDispatcher
199 ///////////////////////////////////////////////////////////////////////////////
200 CIMListenerIndicationDispatcher::CIMListenerIndicationDispatcher()
201 :Base(PEGASUS_QUEUENAME_LISTENERINDICATIONDISPACTCHER)
202 ,_rep(new CIMListenerIndicationDispatcherRep())
203 {
204 }
205 CIMListenerIndicationDispatcher::~CIMListenerIndicationDispatcher()
206 {
207 if(_rep!=NULL)
208 delete _rep;
209
210 _rep=NULL;
211 tony 1.1 }
212
213 void CIMListenerIndicationDispatcher::handleEnqueue()
214 {
215 PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
216
217 Message *message = dequeue();
218 if(message)
219 handleEnqueue(message);
220
221 PEG_METHOD_EXIT();
222 }
223
224 void CIMListenerIndicationDispatcher::handleEnqueue(Message* message)
225 {
226 PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
227
228 if(message!=NULL)
229 {
230 switch (message->getType())
231 {
232 tony 1.1 case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
233 {
234 CIMExportIndicationRequestMessage* request = (CIMExportIndicationRequestMessage*)message;
235
236 CIMExportIndicationResponseMessage* response =
237 static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->handleIndicationRequest(request);
238
239 _enqueueResponse(request, response);
240 }
241 break;
242 default:
243 break;
244 }
245 delete message;
246 }
247
248 PEG_METHOD_EXIT();
249 }
250 Boolean CIMListenerIndicationDispatcher::addConsumer(CIMIndicationConsumer* consumer)
251 {
252 return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->addConsumer(consumer);
253 tony 1.1 }
254 Boolean CIMListenerIndicationDispatcher::removeConsumer(CIMIndicationConsumer* consumer)
255 {
256 return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->removeConsumer(consumer);
257 }
258
259 PEGASUS_NAMESPACE_END
|