(file) Return to CIMListenerIndicationDispatcher.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Listener

  1 martin 1.21 //%LICENSE////////////////////////////////////////////////////////////////
  2 martin 1.22 //
  3 martin 1.21 // Licensed to The Open Group (TOG) under one or more contributor license
  4             // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5             // this work for additional information regarding copyright ownership.
  6             // Each contributor licenses this file to you under the OpenPegasus Open
  7             // Source License; you may not use this file except in compliance with the
  8             // License.
  9 martin 1.22 //
 10 martin 1.21 // Permission is hereby granted, free of charge, to any person obtaining a
 11             // copy of this software and associated documentation files (the "Software"),
 12             // to deal in the Software without restriction, including without limitation
 13             // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14             // and/or sell copies of the Software, and to permit persons to whom the
 15             // Software is furnished to do so, subject to the following conditions:
 16 martin 1.22 //
 17 martin 1.21 // The above copyright notice and this permission notice shall be included
 18             // in all copies or substantial portions of the Software.
 19 martin 1.22 //
 20 martin 1.21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21 martin 1.22 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 martin 1.21 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23             // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24             // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25             // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26             // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27 martin 1.22 //
 28 martin 1.21 //////////////////////////////////////////////////////////////////////////
 29 tony   1.1  //
 30             //%/////////////////////////////////////////////////////////////////////////////
 31             
 32             #include "CIMListenerIndicationDispatcher.h"
 33             
 34             #include <Pegasus/Common/Config.h>
 35             #include <Pegasus/Common/Constants.h>
 36             #include <Pegasus/Common/OperationContext.h>
 37             #include <Pegasus/Common/CIMMessage.h>
 38             #include <Pegasus/Common/Thread.h>
 39             #include <Pegasus/Common/Tracer.h>
 40             
 41             #include <Pegasus/Listener/List.h>
 42             #include <Pegasus/Consumer/CIMIndicationConsumer.h>
 43 kumpf  1.11 #include <Pegasus/Common/ContentLanguageList.h>
 44 tony   1.1  
 45             PEGASUS_NAMESPACE_BEGIN
 46             
 47             ///////////////////////////////////////////////////////////////////////////////
 48             // CIMListenerIndicationDispatchEvent
 49             ///////////////////////////////////////////////////////////////////////////////
 50             class CIMListenerIndicationDispatchEvent
 51             {
 52             public:
 53 karl   1.17     CIMListenerIndicationDispatchEvent(CIMIndicationConsumer* consumer,
 54 chuck  1.2                                             String url,
 55                                                        CIMInstance instance,
 56 kumpf  1.11                                            ContentLanguageList contentLangs);
 57 karl   1.17     ~CIMListenerIndicationDispatchEvent();
 58 tony   1.1  
 59 karl   1.17     CIMIndicationConsumer* getConsumer() const;
 60 tony   1.1  
 61 karl   1.17     String getURL() const;
 62                 CIMInstance getIndicationInstance() const;
 63 venkat.puvvada 1.15         ContentLanguageList getContentLanguages() const;
 64 tony           1.1  
 65                     private:
 66 karl           1.17     CIMIndicationConsumer*  _consumer;
 67                         String                  _url;
 68                         CIMInstance             _instance;
 69                         ContentLanguageList     _contentLangs;
 70 tony           1.1  };
 71                     
 72 karl           1.17 CIMListenerIndicationDispatchEvent::CIMListenerIndicationDispatchEvent(
 73                         CIMIndicationConsumer* consumer,
 74                         String url,
 75                         CIMInstance instance,
 76                         ContentLanguageList contentLangs)
 77 chuck          1.2  :_consumer(consumer),_url(url),_instance(instance), _contentLangs(contentLangs)
 78 tony           1.1  {
 79                     }
 80                     CIMListenerIndicationDispatchEvent::~CIMListenerIndicationDispatchEvent()
 81                     {
 82                     }
 83                     CIMIndicationConsumer* CIMListenerIndicationDispatchEvent::getConsumer() const
 84                     {
 85 karl           1.17     return _consumer;
 86 tony           1.1  }
 87                     String CIMListenerIndicationDispatchEvent::getURL() const
 88                     {
 89 karl           1.17     return _url;
 90 tony           1.1  }
 91                     CIMInstance CIMListenerIndicationDispatchEvent::getIndicationInstance() const
 92                     {
 93 karl           1.17     return _instance;
 94 tony           1.1  }
 95 karl           1.17 ContentLanguageList
 96                         CIMListenerIndicationDispatchEvent::getContentLanguages() const
 97 chuck          1.2  {
 98 karl           1.17     return _contentLangs;
 99 chuck          1.2  }
100 tony           1.1  
101                     ///////////////////////////////////////////////////////////////////////////////
102                     // CIMListenerIndicationDispatcherRep
103                     ///////////////////////////////////////////////////////////////////////////////
104                     class CIMListenerIndicationDispatcherRep
105                     {
106                     public:
107 karl           1.17     CIMListenerIndicationDispatcherRep();
108                         virtual ~CIMListenerIndicationDispatcherRep();
109 venkat.puvvada 1.15 
110 karl           1.17     Boolean addConsumer(CIMIndicationConsumer* consumer);
111                         Boolean removeConsumer(CIMIndicationConsumer* consumer);
112 tony           1.1  
113 karl           1.17     CIMExportIndicationResponseMessage* handleIndicationRequest(
114                             CIMExportIndicationRequestMessage* request);
115 tony           1.1  
116 venkat.puvvada 1.15 
117 karl           1.17     static ThreadReturnType PEGASUS_THREAD_CDECL deliver_routine(void *param);
118 tony           1.1  
119                     private:
120 karl           1.17     void    deliverIndication(String url,
121                                                   CIMInstance instance,
122                                                   ContentLanguageList contentLangs);
123 tony           1.1  
124 karl           1.17     ThreadPool* _thread_pool;
125                         PtrList*        _consumers;
126 tony           1.1  };
127                     
128 kumpf          1.9  static struct timeval deallocateWait = {15, 0};
129 tony           1.1  
130                     
131                     CIMListenerIndicationDispatcherRep::CIMListenerIndicationDispatcherRep()
132 karl           1.17     : _thread_pool(new ThreadPool(0, "ListenerIndicationDispatcher", 0, 0,
133                         deallocateWait)), _consumers(new PtrList())
134 tony           1.1  {
135 venkat.puvvada 1.15 
136 tony           1.1  }
137                     CIMListenerIndicationDispatcherRep::~CIMListenerIndicationDispatcherRep()
138                     {
139 karl           1.17     delete _thread_pool;
140                         delete _consumers;
141 tony           1.1  }
142 venkat.puvvada 1.15 
143 karl           1.17 Boolean CIMListenerIndicationDispatcherRep::addConsumer(
144                         CIMIndicationConsumer* consumer)
145 tony           1.1  {
146 karl           1.17     _consumers->add(consumer);
147                         return true;
148 tony           1.1  }
149 karl           1.17 Boolean CIMListenerIndicationDispatcherRep::removeConsumer(
150                         CIMIndicationConsumer* consumer)
151 tony           1.1  {
152 karl           1.17     _consumers->remove(consumer);
153                         return true;
154 tony           1.1  }
155 karl           1.17 CIMExportIndicationResponseMessage*
156                     CIMListenerIndicationDispatcherRep::handleIndicationRequest(
157                         CIMExportIndicationRequestMessage* request)
158 tony           1.1  {
159 karl           1.17     PEG_METHOD_ENTER(TRC_SERVER,
160                             "CIMListenerIndicationDispatcherRep::handleIndicationRequest");
161 tony           1.1  
162 karl           1.17     CIMInstance instance = request->indicationInstance;
163                         String          url = request->destinationPath;
164                         ContentLanguageList contentLangs =
165                             ((ContentLanguageListContainer)request->operationContext.
166                                 get(ContentLanguageListContainer::NAME)).getLanguages();
167 tony           1.1  
168 karl           1.17     deliverIndication(url,instance,contentLangs);
169 tony           1.1  
170 karl           1.17     // compose a response message
171                         CIMException cimException;
172 tony           1.1  
173 karl           1.17     CIMExportIndicationResponseMessage* response =
174 kumpf          1.18         dynamic_cast<CIMExportIndicationResponseMessage*>(
175                                 request->buildResponse());
176                         response->cimException = cimException;
177 karl           1.17     response->dest = request->queueIds.top();
178 tony           1.1  
179 karl           1.17     PEG_METHOD_EXIT();
180 tony           1.1  
181 karl           1.17     return response;
182 tony           1.1  }
183                     
184 chuck          1.2  void CIMListenerIndicationDispatcherRep::deliverIndication(String url,
185 karl           1.17     CIMInstance instance,
186                         ContentLanguageList contentLangs)
187 tony           1.1  {
188 karl           1.17     // go thru all consumers and broadcast the result;
189                         // should be run in seperate thread
190 venkat.puvvada 1.15         AutoPtr<Iterator> it( _consumers->iterator() );
191                     
192 karl           1.17     while(it->hasNext()==true)
193                         {
194                             CIMIndicationConsumer* consumer =
195                                 static_cast<CIMIndicationConsumer*>(it->next());
196                             CIMListenerIndicationDispatchEvent* event =
197                                 new CIMListenerIndicationDispatchEvent(consumer,
198                                                                        url,
199                                                                        instance,
200                                                                        contentLangs);
201                             ThreadStatus rtn = _thread_pool->allocate_and_awaken(
202                                                     event,deliver_routine);
203                     
204                                 if (rtn != PEGASUS_THREAD_OK)
205                                 {
206 marek          1.19                 PEG_TRACE_CSTRING(
207                                         TRC_SERVER,
208 marek          1.20                     Tracer::LEVEL1,
209 karl           1.17                     "Could not allocate thread to deliver event."
210                                             " Instead using current thread.");
211                                 delete event;
212                                 throw Exception(MessageLoaderParms(
213                                     "Listener.CIMListenerIndicationDispatcher."
214                                         "CANNOT_ALLOCATE_THREAD",
215                                     "Not enough threads to allocate a worker to deliver the"
216                                         " event."));
217                                 }
218                         }
219                     }
220                     ThreadReturnType PEGASUS_THREAD_CDECL
221                     CIMListenerIndicationDispatcherRep::deliver_routine(void *param)
222                     {
223                         CIMListenerIndicationDispatchEvent* event =
224                             static_cast<CIMListenerIndicationDispatchEvent*>(param);
225 tony           1.1  
226 karl           1.17     if(event!=NULL)
227                         {
228                             CIMIndicationConsumer* consumer = event->getConsumer();
229                             OperationContext context;
230                                 context.insert(ContentLanguageListContainer(
231                                             event->getContentLanguages()));
232                             if(consumer)
233                             {
234                                 consumer->consumeIndication(context,
235                                     event->getURL(),event->getIndicationInstance());
236                             }
237 venkat.puvvada 1.15 
238 karl           1.17         delete event;
239                         }
240                     
241                         return (0);
242 tony           1.1  }
243                     
244                     ///////////////////////////////////////////////////////////////////////////////
245                     // CIMListenerIndicationDispatcher
246                     ///////////////////////////////////////////////////////////////////////////////
247                     CIMListenerIndicationDispatcher::CIMListenerIndicationDispatcher()
248                     :Base(PEGASUS_QUEUENAME_LISTENERINDICATIONDISPACTCHER)
249                     ,_rep(new CIMListenerIndicationDispatcherRep())
250                     {
251                     }
252                     CIMListenerIndicationDispatcher::~CIMListenerIndicationDispatcher()
253                     {
254 karl           1.17     if(_rep!=NULL)
255                             delete static_cast<CIMListenerIndicationDispatcherRep*>(_rep);
256 tony           1.1  
257 karl           1.17     _rep=NULL;
258 tony           1.1  }
259                     
260                     void CIMListenerIndicationDispatcher::handleEnqueue()
261                     {
262 karl           1.17     PEG_METHOD_ENTER(TRC_SERVER,
263                             "CIMListenerIndicationDispatcher::handleEnqueue");
264 venkat.puvvada 1.15 
265 karl           1.17     Message *message = dequeue();
266                         if(message)
267                             handleEnqueue(message);
268 venkat.puvvada 1.15 
269 karl           1.17     PEG_METHOD_EXIT();
270 tony           1.1  }
271                     
272                     void CIMListenerIndicationDispatcher::handleEnqueue(Message* message)
273                     {
274 karl           1.17     PEG_METHOD_ENTER(TRC_SERVER,
275                             "CIMListenerIndicationDispatcher::handleEnqueue");
276 venkat.puvvada 1.15 
277 karl           1.17     if(message!=NULL)
278 tony           1.1      {
279 karl           1.17         switch (message->getType())
280                             {
281                                 case CIM_EXPORT_INDICATION_REQUEST_MESSAGE:
282                                     {
283                                         CIMExportIndicationRequestMessage* request =
284                                             (CIMExportIndicationRequestMessage*)message;
285                     
286                                         CIMExportIndicationResponseMessage* response =
287                                             static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->
288                                                 handleIndicationRequest(request);
289                     
290 kumpf          1.23                     MessageQueue* queue = MessageQueue::lookup(response->dest);
291                                         PEGASUS_ASSERT(queue != 0);
292                                         queue->enqueue(response);
293 karl           1.17                 }
294                                     break;
295                                 default:
296                                     break;
297                             }
298                         delete message;
299 venkat.puvvada 1.15     }
300                     
301 karl           1.17     PEG_METHOD_EXIT();
302 tony           1.1  }
303 karl           1.17 Boolean CIMListenerIndicationDispatcher::addConsumer(
304                         CIMIndicationConsumer* consumer)
305 tony           1.1  {
306 karl           1.17     return static_cast<CIMListenerIndicationDispatcherRep*>(_rep)->addConsumer(
307                                 consumer);
308 tony           1.1  }
309 karl           1.17 Boolean CIMListenerIndicationDispatcher::removeConsumer(
310                             CIMIndicationConsumer* consumer)
311 tony           1.1  {
312 karl           1.17     return static_cast<CIMListenerIndicationDispatcherRep*>
313                             (_rep)->removeConsumer(consumer);
314 tony           1.1  }
315                     
316                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2