(file) Return to CIMListenerIndicationDispatcher.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Listener

  1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
  2 tony  1.1  //
  3 karl  1.6  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.4  // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.6  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.7  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12            // EMC Corporation; Symantec Corporation; The Open Group.
 13 tony  1.1  //
 14            // Permission is hereby granted, free of charge, to any person obtaining a copy
 15            // of this software and associated documentation files (the "Software"), to
 16            // deal in the Software without restriction, including without limitation the
 17            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18            // sell copies of the Software, and to permit persons to whom the Software is
 19            // furnished to do so, subject to the following conditions:
 20            // 
 21            // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29            //
 30            //==============================================================================
 31            //
 32            //%/////////////////////////////////////////////////////////////////////////////
 33            
 34 tony  1.1  #include "CIMListenerIndicationDispatcher.h"
 35            
 36            #include <Pegasus/Common/Config.h>
 37            #include <Pegasus/Common/Constants.h>
 38            #include <Pegasus/Common/OperationContext.h>
 39            #include <Pegasus/Common/CIMMessage.h>
 40            #include <Pegasus/Common/Thread.h>
 41            #include <Pegasus/Common/Tracer.h>
 42            
 43            #include <Pegasus/Listener/List.h>
 44            #include <Pegasus/Consumer/CIMIndicationConsumer.h>
 45 kumpf 1.11 #include <Pegasus/Common/ContentLanguageList.h>
 46 tony  1.1  
 47            PEGASUS_NAMESPACE_BEGIN
 48            
 49            ///////////////////////////////////////////////////////////////////////////////
 50            // CIMListenerIndicationDispatchEvent
 51            ///////////////////////////////////////////////////////////////////////////////
 52            class CIMListenerIndicationDispatchEvent
 53            {
 54            public:
 55 chuck 1.2  	CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
 56                                                       String url,
 57                                                       CIMInstance instance,
 58 kumpf 1.11                                            ContentLanguageList contentLangs);
 59 tony  1.1  	~CIMListenerIndicationDispatchEvent();
 60            
 61            	CIMIndicationConsumer* getConsumer() const;
 62            
 63            	String getURL() const;
 64            	CIMInstance getIndicationInstance() const;
 65 kumpf 1.11         ContentLanguageList getContentLanguages() const; 
 66 tony  1.1  
 67            private:
 68            	CIMIndicationConsumer*	_consumer;
 69            	String									_url;
 70            	CIMInstance							_instance;
 71 kumpf 1.11         ContentLanguageList                    _contentLangs;
 72 tony  1.1  };
 73            
 74 chuck 1.2  CIMListenerIndicationDispatchEvent::CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
 75                                                                                   String url,
 76                                                                                   CIMInstance instance,
 77 kumpf 1.11                                                                        ContentLanguageList contentLangs)
 78 chuck 1.2  :_consumer(consumer),_url(url),_instance(instance), _contentLangs(contentLangs)
 79 tony  1.1  {
 80            }
 81            CIMListenerIndicationDispatchEvent::~CIMListenerIndicationDispatchEvent()
 82            {
 83            }
 84            CIMIndicationConsumer* CIMListenerIndicationDispatchEvent::getConsumer() const
 85            {
 86            	return _consumer;
 87            }
 88            String CIMListenerIndicationDispatchEvent::getURL() const
 89            {
 90            	return _url;
 91            }
 92            CIMInstance CIMListenerIndicationDispatchEvent::getIndicationInstance() const
 93            {
 94            	return _instance;
 95            }
 96 kumpf 1.11 ContentLanguageList CIMListenerIndicationDispatchEvent::getContentLanguages() const
 97 chuck 1.2  {
 98            	return _contentLangs;
 99            }
100 tony  1.1  
101            ///////////////////////////////////////////////////////////////////////////////
102            // CIMListenerIndicationDispatcherRep
103            ///////////////////////////////////////////////////////////////////////////////
104            class CIMListenerIndicationDispatcherRep
105            {
106            public:
107            	CIMListenerIndicationDispatcherRep();
108            	virtual ~CIMListenerIndicationDispatcherRep();
109            	
110            	Boolean addConsumer(CIMIndicationConsumer* consumer);
111            	Boolean removeConsumer(CIMIndicationConsumer* consumer);
112            
113            	CIMExportIndicationResponseMessage* handleIndicationRequest(CIMExportIndicationRequestMessage* request);
114            
115            	
116 mike  1.13 	static ThreadReturnType PEGASUS_THREAD_CDECL deliver_routine(void *param);
117 tony  1.1  
118            private:
119 kumpf 1.11 	void	deliverIndication(String url, CIMInstance instance, ContentLanguageList contentLangs);
120 tony  1.1  
121            	ThreadPool* _thread_pool;
122            	PtrList*		_consumers;
123            };
124            
125 kumpf 1.9  static struct timeval deallocateWait = {15, 0};
126 tony  1.1  
127            
128            CIMListenerIndicationDispatcherRep::CIMListenerIndicationDispatcherRep()
129            :_thread_pool(new ThreadPool(0, "ListenerIndicationDispatcher", 0, 0,
130 kumpf 1.9  	deallocateWait))
131 tony  1.1  ,_consumers(new PtrList())
132            {
133            	      
134            }
135            CIMListenerIndicationDispatcherRep::~CIMListenerIndicationDispatcherRep()
136            {
137 kumpf 1.9  	delete _thread_pool;
138            	delete _consumers;
139 tony  1.1  }
140            	
141            Boolean CIMListenerIndicationDispatcherRep::addConsumer(CIMIndicationConsumer* consumer)
142            {
143            	_consumers->add(consumer);
144            	return true;
145            }
146            Boolean CIMListenerIndicationDispatcherRep::removeConsumer(CIMIndicationConsumer* consumer)
147            {
148            	_consumers->remove(consumer);
149            	return true;
150            }
151            CIMExportIndicationResponseMessage* CIMListenerIndicationDispatcherRep::handleIndicationRequest(CIMExportIndicationRequestMessage* request)
152            {
153            	PEG_METHOD_ENTER(TRC_SERVER,
154            		"CIMListenerIndicationDispatcherRep::handleIndicationRequest");
155            
156            	CIMInstance instance = request->indicationInstance;
157            	String			url = request->destinationPath;
158 kumpf 1.11     ContentLanguageList contentLangs =((ContentLanguageListContainer)request->operationContext.
159 se.gupta 1.5  			                                    get(ContentLanguageListContainer::NAME)).getLanguages();
160 tony     1.1  
161 chuck    1.2  	deliverIndication(url,instance,contentLangs);
162 tony     1.1  
163               	// compose a response message  
164               	CIMException cimException;
165               
166               	CIMExportIndicationResponseMessage* response = new CIMExportIndicationResponseMessage(
167               		request->messageId,
168               		cimException,
169               		request->queueIds.copyAndPop());
170               
171               	response->dest = request->queueIds.top();
172               
173               	PEG_METHOD_EXIT();
174               
175               	return response;
176               }
177               
178 chuck    1.2  void CIMListenerIndicationDispatcherRep::deliverIndication(String url,
179                                                                          CIMInstance instance,
180 kumpf    1.11                                                            ContentLanguageList contentLangs)
181 tony     1.1  {
182               	// go thru all consumers and broadcast the result; should be run in seperate thread
183 venkat.puvvada 1.14.12.1 	AutoPtr<Iterator> it(_consumers->iterator());
184 tony           1.1       	while(it->hasNext()==true)
185                          	{
186                          		CIMIndicationConsumer* consumer = static_cast<CIMIndicationConsumer*>(it->next());
187 chuck          1.2       		CIMListenerIndicationDispatchEvent* event = new CIMListenerIndicationDispatchEvent(
188                                                                                                               consumer,
189                                                                                                               url,
190                                                                                                               instance,
191                                                                                                               contentLangs);
192 konrad.r       1.10      		ThreadStatus rtn = _thread_pool->allocate_and_awaken(event,deliver_routine);
193                          		
194                              		if (rtn != PEGASUS_THREAD_OK) 
195                              		{
196 yi.zhou        1.14      	    	    Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,
197 konrad.r       1.10      				"Not enough threads to allocate a worker to deliver the event. ");
198                           
199                          	    	    Tracer::trace(TRC_SERVER, Tracer::LEVEL2,
200                          				"Could not allocate thread to deliver event. Instead using current thread.");
201                          		    delete event;
202                          		    throw Exception(MessageLoaderParms("Listener.CIMListenerIndicationDispatcher.CANNOT_ALLOCATE_THREAD",
203                          					"Not enough threads to allocate a worker to deliver the event."));
204                              		}
205 tony           1.1       	}
206                          }
207 mike           1.13      ThreadReturnType PEGASUS_THREAD_CDECL CIMListenerIndicationDispatcherRep::deliver_routine(void *param)
208 tony           1.1       {
209                          	CIMListenerIndicationDispatchEvent* event = static_cast<CIMListenerIndicationDispatchEvent*>(param);
210                          
211                          	if(event!=NULL)
212                          	{
213                          		CIMIndicationConsumer* consumer = event->getConsumer();
214                          		OperationContext context;
215 chuck          1.2       	        context.insert(ContentLanguageListContainer(event->getContentLanguages()));
216 tony           1.1       		if(consumer)
217                          		{
218                          			consumer->consumeIndication(context,event->getURL(),event->getIndicationInstance());
219                          		}
220                          
221                          		delete event;
222                          	}
223                          		
224                          	return (0);
225                          }
226                          
227                          ///////////////////////////////////////////////////////////////////////////////
228                          // CIMListenerIndicationDispatcher
229                          ///////////////////////////////////////////////////////////////////////////////
230                          CIMListenerIndicationDispatcher::CIMListenerIndicationDispatcher()
231                          :Base(PEGASUS_QUEUENAME_LISTENERINDICATIONDISPACTCHER)
232                          ,_rep(new CIMListenerIndicationDispatcherRep())
233                          {
234                          }
235                          CIMListenerIndicationDispatcher::~CIMListenerIndicationDispatcher()
236                          {
237 tony           1.1       	if(_rep!=NULL)
238 tony           1.3       		delete static_cast<CIMListenerIndicationDispatcherRep*>(_rep);
239 tony           1.1       
240                          	_rep=NULL;
241                          }
242                          
243                          void CIMListenerIndicationDispatcher::handleEnqueue()
244                          {
245                          	PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
246                          	
247                          	Message *message = dequeue();
248                          	if(message)
249                          		handleEnqueue(message);
250                          	
251                          	PEG_METHOD_EXIT();
252                          }
253                          
254                          void CIMListenerIndicationDispatcher::handleEnqueue(Message* message)
255                          {
256                          	PEG_METHOD_ENTER(TRC_SERVER, "CIMListenerIndicationDispatcher::handleEnqueue");
257                          	
258                          	if(message!=NULL)
259                          	{
260 tony           1.1       		switch (message->getType())
261                              {
262                          			case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
263                          				{
264                          					CIMExportIndicationRequestMessage* request = (CIMExportIndicationRequestMessage*)message;
265                          
266                          					CIMExportIndicationResponseMessage* response = 
267                          						static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->handleIndicationRequest(request);
268                          
269                          					_enqueueResponse(request, response);
270                          				}
271                          				break;
272                          		default:
273                          			break;
274                              }	
275                              delete message;
276                          	}	
277                             
278                          	PEG_METHOD_EXIT();
279                          }
280                          Boolean CIMListenerIndicationDispatcher::addConsumer(CIMIndicationConsumer* consumer)
281 tony           1.1       {
282                          	return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->addConsumer(consumer);
283                          }
284                          Boolean CIMListenerIndicationDispatcher::removeConsumer(CIMIndicationConsumer* consumer)
285                          {
286                          	return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->removeConsumer(consumer);
287                          }
288                          
289                          PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2