(file) Return to CIMListenerIndicationDispatcher.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Listener

  1 tony  1.1 //%/////////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
  4           // The Open Group, Tivoli Systems
  5           //
  6           // Permission is hereby granted, free of charge, to any person obtaining a copy
  7           // of this software and associated documentation files (the "Software"), to
  8           // deal in the Software without restriction, including without limitation the
  9           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 10           // sell copies of the Software, and to permit persons to whom the Software is
 11           // furnished to do so, subject to the following conditions:
 12           // 
 13           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 14           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 15           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 16           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 17           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 18           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 19           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 20           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 21           //
 22 tony  1.1 //==============================================================================
 23           //
 24           //
 25           // Author: Dong Xiang, EMC Corporation (xiang_dong@emc.com)
 26           //
 27           // Modified By:
 28           //
 29           //%/////////////////////////////////////////////////////////////////////////////
 30           
 31           #include "CIMListenerIndicationDispatcher.h"
 32           
 33           #include <Pegasus/Common/Config.h>
 34           #include <Pegasus/Common/Constants.h>
 35           #include <Pegasus/Common/OperationContext.h>
 36           #include <Pegasus/Common/CIMMessage.h>
 37           #include <Pegasus/Common/Thread.h>
 38           #include <Pegasus/Common/Tracer.h>
 39           
 40           #include <Pegasus/Listener/List.h>
 41           #include <Pegasus/Consumer/CIMIndicationConsumer.h>
 42           
 43 tony  1.1 PEGASUS_NAMESPACE_BEGIN
 44           
 45           ///////////////////////////////////////////////////////////////////////////////
 46           // CIMListenerIndicationDispatchEvent
 47           ///////////////////////////////////////////////////////////////////////////////
 48           class CIMListenerIndicationDispatchEvent
 49           {
 50           public:
 51           	CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer, String url, CIMInstance instance);
 52           	~CIMListenerIndicationDispatchEvent();
 53           
 54           	CIMIndicationConsumer* getConsumer() const;
 55           
 56           	String getURL() const;
 57           	CIMInstance getIndicationInstance() const;
 58           
 59           private:
 60           	CIMIndicationConsumer*	_consumer;
 61           	String									_url;
 62           	CIMInstance							_instance;
 63           };
 64 tony  1.1 
 65           CIMListenerIndicationDispatchEvent::CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer, String url, CIMInstance instance)
 66           :_consumer(consumer),_url(url),_instance(instance)
 67           {
 68           }
 69           CIMListenerIndicationDispatchEvent::~CIMListenerIndicationDispatchEvent()
 70           {
 71           }
 72           CIMIndicationConsumer* CIMListenerIndicationDispatchEvent::getConsumer() const
 73           {
 74           	return _consumer;
 75           }
 76           String CIMListenerIndicationDispatchEvent::getURL() const
 77           {
 78           	return _url;
 79           }
 80           CIMInstance CIMListenerIndicationDispatchEvent::getIndicationInstance() const
 81           {
 82           	return _instance;
 83           }
 84           
 85 tony  1.1 ///////////////////////////////////////////////////////////////////////////////
 86           // CIMListenerIndicationDispatcherRep
 87           ///////////////////////////////////////////////////////////////////////////////
 88           class CIMListenerIndicationDispatcherRep
 89           {
 90           public:
 91           	CIMListenerIndicationDispatcherRep();
 92           	virtual ~CIMListenerIndicationDispatcherRep();
 93           	
 94           	Boolean addConsumer(CIMIndicationConsumer* consumer);
 95           	Boolean removeConsumer(CIMIndicationConsumer* consumer);
 96           
 97           	CIMExportIndicationResponseMessage* handleIndicationRequest(CIMExportIndicationRequestMessage* request);
 98           
 99           	
100           	static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL deliver_routine(void *param);
101           
102           private:
103           	void	deliverIndication(String url, CIMInstance instance);
104           
105           	ThreadPool* _thread_pool;
106 tony  1.1 	PtrList*		_consumers;
107           };
108           
109           static struct timeval create_time = {0, 1};
110           static struct timeval destroy_time = {15, 0};
111           static struct timeval deadlock_time = {0, 0};
112           
113           
114           CIMListenerIndicationDispatcherRep::CIMListenerIndicationDispatcherRep()
115           :_thread_pool(new ThreadPool(0, "ListenerIndicationDispatcher", 0, 0,
116           	create_time, destroy_time, deadlock_time))
117           ,_consumers(new PtrList())
118           {
119           	      
120           }
121           CIMListenerIndicationDispatcherRep::~CIMListenerIndicationDispatcherRep()
122           {
123           	if(_thread_pool!=NULL)
124           	{
125           		 _thread_pool->kill_dead_threads();
126           		 delete _thread_pool;
127 tony  1.1   }
128           	if(_consumers!=NULL)
129           		delete _consumers;
130           }
131           	
132           Boolean CIMListenerIndicationDispatcherRep::addConsumer(CIMIndicationConsumer* consumer)
133           {
134           	_consumers->add(consumer);
135           	return true;
136           }
137           Boolean CIMListenerIndicationDispatcherRep::removeConsumer(CIMIndicationConsumer* consumer)
138           {
139           	_consumers->remove(consumer);
140           	return true;
141           }
142           CIMExportIndicationResponseMessage* CIMListenerIndicationDispatcherRep::handleIndicationRequest(CIMExportIndicationRequestMessage* request)
143           {
144           	PEG_METHOD_ENTER(TRC_SERVER,
145           		"CIMListenerIndicationDispatcherRep::handleIndicationRequest");
146           
147           	CIMInstance instance = request->indicationInstance;
148 tony  1.1 	String			url = request->destinationPath;
149           
150           	deliverIndication(url,instance);
151           
152           	// compose a response message  
153           	CIMException cimException;
154           
155           	CIMExportIndicationResponseMessage* response = new CIMExportIndicationResponseMessage(
156           		request->messageId,
157           		cimException,
158           		request->queueIds.copyAndPop());
159           
160           	response->dest = request->queueIds.top();
161           
162           	PEG_METHOD_EXIT();
163           
164           	return response;
165           }
166           
167           void CIMListenerIndicationDispatcherRep::deliverIndication(String url, CIMInstance instance)
168           {
169 tony  1.1 	// go thru all consumers and broadcast the result; should be run in seperate thread
170           	Iterator* it = _consumers->iterator();
171           	while(it->hasNext()==true)
172           	{
173           		CIMIndicationConsumer* consumer = static_cast<CIMIndicationConsumer*>(it->next());
174           		CIMListenerIndicationDispatchEvent* event = new CIMListenerIndicationDispatchEvent(consumer,url,instance);
175           		_thread_pool->allocate_and_awaken(event,deliver_routine);
176           	}
177           }
178           PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL CIMListenerIndicationDispatcherRep::deliver_routine(void *param)
179           {
180           	CIMListenerIndicationDispatchEvent* event = static_cast<CIMListenerIndicationDispatchEvent*>(param);
181           
182           	if(event!=NULL)
183           	{
184           		CIMIndicationConsumer* consumer = event->getConsumer();
185           		OperationContext context;
186           		if(consumer)
187           		{
188           			consumer->consumeIndication(context,event->getURL(),event->getIndicationInstance());
189           		}
190 tony  1.1 
191           		delete event;
192           	}
193           		
194           	return (0);
195           }
196           
197           ///////////////////////////////////////////////////////////////////////////////
198           // CIMListenerIndicationDispatcher
199           ///////////////////////////////////////////////////////////////////////////////
200           CIMListenerIndicationDispatcher::CIMListenerIndicationDispatcher()
201           :Base(PEGASUS_QUEUENAME_LISTENERINDICATIONDISPACTCHER)
202           ,_rep(new CIMListenerIndicationDispatcherRep())
203           {
204           }
205           CIMListenerIndicationDispatcher::~CIMListenerIndicationDispatcher()
206           {
207           	if(_rep!=NULL)
208           		delete _rep;
209           
210           	_rep=NULL;
211 tony  1.1 }
212           
213           void CIMListenerIndicationDispatcher::handleEnqueue()
214           {
215           	PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
216           	
217           	Message *message = dequeue();
218           	if(message)
219           		handleEnqueue(message);
220           	
221           	PEG_METHOD_EXIT();
222           }
223           
224           void CIMListenerIndicationDispatcher::handleEnqueue(Message* message)
225           {
226           	PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
227           	
228           	if(message!=NULL)
229           	{
230           		switch (message->getType())
231               {
232 tony  1.1 			case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
233           				{
234           					CIMExportIndicationRequestMessage* request = (CIMExportIndicationRequestMessage*)message;
235           
236           					CIMExportIndicationResponseMessage* response = 
237           						static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->handleIndicationRequest(request);
238           
239           					_enqueueResponse(request, response);
240           				}
241           				break;
242           		default:
243           			break;
244               }	
245               delete message;
246           	}	
247              
248           	PEG_METHOD_EXIT();
249           }
250           Boolean CIMListenerIndicationDispatcher::addConsumer(CIMIndicationConsumer* consumer)
251           {
252           	return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->addConsumer(consumer);
253 tony  1.1 }
254           Boolean CIMListenerIndicationDispatcher::removeConsumer(CIMIndicationConsumer* consumer)
255           {
256           	return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->removeConsumer(consumer);
257           }
258           
259           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2