(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mday  1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6           // of this software and associated documentation files (the "Software"), to
  7           // deal in the Software without restriction, including without limitation the
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9           // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11           //
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22 mday  1.1 //
 23           // Author: Mike Day (mdday@us.ibm.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29           #include "MessageQueueService.h"
 30           
 31           PEGASUS_NAMESPACE_BEGIN
 32           
 33           MessageQueueService::MessageQueueService(const char *name, 
 34           					 Uint32 queueID, 
 35           					 Uint32 capabilities, 
 36           					 Uint32 mask) 
 37 mday  1.4    : Base(name, false,  queueID),
 38 mday  1.1      _capabilities(capabilities),
 39                _mask(mask),
 40 mday  1.4      _die(0),
 41 mday  1.5      _pending(true), 
 42 mday  1.6      _incoming(true, 1000),
 43 mday  1.5      _req_thread(_req_proc, this, false)
 44 mday  1.1 { 
 45              _default_op_timeout.tv_sec = 30;
 46              _default_op_timeout.tv_usec = 100;
 47              _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));
 48              if(_meta_dispatcher == 0 )
 49                 throw NullPointer();
 50 mday  1.4    _req_thread.run();
 51              
 52 mday  1.1 }
 53           
 54 mday  1.4 
 55 mday  1.1 MessageQueueService::~MessageQueueService(void)
 56           {
 57              _die = 1;
 58 mday  1.6    _incoming.shutdown_queue();
 59              
 60 mday  1.4    _req_thread.join();
 61 mday  1.5 
 62 mday  1.1 }
 63           
 64 mday  1.4 AtomicInt MessageQueueService::_xid(1);
 65 mday  1.1 
 66           
 67 mday  1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 68 mday  1.1 {
 69 mday  1.5    Thread *myself = reinterpret_cast<Thread *>(parm);
 70              MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 71           
 72              // pull messages off the incoming queue and dispatch them. then 
 73              // check pending messages that are non-blocking
 74           
 75 mday  1.6    while ( service->_die.value() == 0 ) 
 76 mday  1.1    {
 77 mday  1.6       AsyncOpNode *operation = service->_incoming.remove_first_wait();
 78                 if ( operation == 0 )
 79           	 break;
 80 mday  1.5       
 81                 service->_handle_incoming_operation(operation);
 82 mday  1.1    }
 83 mday  1.5    
 84              myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 85              return(0);
 86 mday  1.1 }
 87           
 88           
 89 mday  1.5 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
 90 mday  1.1 {
 91 mday  1.5    if ( operation != 0 )
 92              {
 93 mday  1.6       operation->lock();
 94                 Message *rq = operation->_request.next(0);
 95                 operation->unlock();
 96                 
 97 mday  1.5       PEGASUS_ASSERT(rq != 0 );
 98                 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
 99                 PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
100                 _handle_async_request(static_cast<AsyncRequest *>(rq));
101              }
102 mday  1.1    
103 mday  1.5    return;
104 mday  1.1    
105           }
106           
107           void MessageQueueService::_handle_async_request(AsyncRequest *req)
108           {
109 mday  1.4    if ( req != 0 )
110              {
111                 req->op->processing();
112 mday  1.1    
113 mday  1.4       Uint32 type = req->getType();
114                 if( type == async_messages::HEARTBEAT )
115           	 handle_heartbeat_request(req);
116                 else if (type == async_messages::IOCTL)
117           	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
118                 else if (type == async_messages::CIMSERVICE_START)
119           	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
120                 else if (type == async_messages::CIMSERVICE_STOP)
121           	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
122                 else if (type == async_messages::CIMSERVICE_PAUSE)
123           	 handle_CimServicePause(static_cast<CimServicePause *>(req));
124                 else if (type == async_messages::CIMSERVICE_RESUME)
125           	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
126                 else if ( type == async_messages::ASYNC_OP_START)
127           	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
128                 else 
129                 {
130           	 // we don't handle this request message 
131           	 _make_response(req, async_results::CIM_NAK );
132                 }
133 mday  1.1    }
134           }
135           
136           void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code)
137           {
138              AsyncReply *reply = 
139                 new AsyncReply(async_messages::REPLY,
140           		     req->getKey(),
141           		     req->getRouting(),
142           		     0,
143           		     req->op, 
144           		     code, 
145           		     req->resp,
146           		     false);
147 mday  1.4    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
148 mday  1.1 }
149           
150           
151 mday  1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
152           						AsyncReply *reply, 
153           						Uint32 state, 
154           						Uint32 flag)
155           {
156              PEGASUS_ASSERT(request != 0  && reply != 0 );
157              
158              AsyncOpNode *op = request->op;
159              op->lock();
160              op->_state |= state ;
161              op->_flags |= flag;
162              gettimeofday(&(op->_updated), NULL);
163 mday  1.6    if ( false == op->_request.exists(reinterpret_cast<void *>(reply)) )
164                 op->_request.insert_last(reply);
165 mday  1.5    op->unlock();
166 mday  1.6 
167 mday  1.5    op->_client_sem.signal();
168 mday  1.6 
169 mday  1.5    
170           }
171           
172           
173           
174           Boolean MessageQueueService::accept_async(AsyncOpNode *op)
175           {
176 mday  1.6    op->lock();
177              Message *rq = op->_request.next(0);
178              op->unlock();
179              
180 mday  1.5    if( true == messageOK(rq) &&  _die.value() == 0  )
181              {
182 mday  1.6       _incoming.insert_last_wait(op);
183 mday  1.5       return true;
184              }
185              return false;
186           }
187           
188           Boolean MessageQueueService::messageOK(const Message *msg)
189           {
190              if ( msg != 0 )
191              {
192                 Uint32 mask = msg->getMask();
193                 if ( mask & message_mask::ha_async)
194           	 if ( mask & message_mask::ha_request)
195           	    return true;
196              }
197              return false;
198           }
199           
200           
201           void MessageQueueService::handleEnqueue(void)
202           {
203              Message *msg = dequeue();
204 mday  1.5    if( msg )
205              {
206                 delete msg;
207              }
208           }
209           
210 mday  1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
211           {
212              // default action is to echo a heartbeat response 
213              
214              AsyncReply *reply = 
215                 new AsyncReply(async_messages::HEARTBEAT,
216           		     req->getKey(),
217           		     req->getRouting(),
218           		     0,
219           		     req->op, 
220           		     async_results::OK, 
221           		     req->resp,
222           		     false);
223 mday  1.4    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
224 mday  1.1 }
225           
226           
227           void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
228           { 
229              ;
230           }
231                 
232           void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
233           {
234              _make_response(req, async_results::OK);
235           }
236           void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
237           {
238              _make_response(req, async_results::CIM_NAK);
239           }
240           void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
241           {
242              _make_response(req, async_results::CIM_NAK);
243           }
244           void MessageQueueService::handle_CimServicePause(CimServicePause *req)
245 mday  1.1 {
246              _make_response(req, async_results::CIM_NAK);
247           }
248           void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
249           {
250              _make_response(req, async_results::CIM_NAK);
251           }
252                 
253           void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
254           {
255              _make_response(req, async_results::CIM_NAK);
256           
257           }
258           
259           void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
260           {
261              ;
262           }
263           
264           AsyncOpNode *MessageQueueService::get_op(void)
265           {
266 mday  1.4    AsyncOpNode *op = new AsyncOpNode();
267 mday  1.1    
268              op->write_state(ASYNC_OPSTATE_UNKNOWN);
269 mday  1.4    op->write_flags(ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL );
270              
271 mday  1.1    return op;
272           }
273           
274           void MessageQueueService::return_op(AsyncOpNode *op)
275           {
276              PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
277 mday  1.4    delete op;
278 mday  1.1 }
279           
280 mday  1.2 
281 mday  1.1 
282 mday  1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
283 mday  1.1 {
284 mday  1.4    if ( request == 0 )
285                 return 0 ;
286 mday  1.5 
287              Boolean destroy_op = false;
288              
289              if (request->op == false)
290              {
291                 request->op = get_op();
292                 request->op->put_request(request);
293                 
294                 destroy_op = true;
295              }
296 mday  1.4    
297              request->block = true;
298              request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
299              request->op->put_response(0);
300 mday  1.1    
301 mday  1.4    // first link it on our pending list
302 mday  1.5    // _pending.insert_last_wait(request->op);
303 mday  1.2    
304 mday  1.4    // now see if the meta dispatcher will take it
305 mday  1.2 
306 mday  1.4    if (true == _meta_dispatcher->route_async(request->op))
307 mday  1.1    {
308 mday  1.4       request->op->_client_sem.wait();
309 mday  1.6       PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);
310                 
311 mday  1.1    }
312 mday  1.4    
313 mday  1.6    request->op->lock();
314              AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
315              rpl->op = 0;
316              request->op->unlock();
317              
318 mday  1.5    if( destroy_op == true)
319              {
320 mday  1.6       request->op->lock();
321                 request->op->_request.remove(request);
322                 request->op->_state |= ASYNC_OPSTATE_RELEASED;
323                 request->op->unlock();
324                 
325                 return_op(request->op);
326                 
327           //      delete request->op;
328           //      request->op = 0;
329 mday  1.5    }
330              
331              return rpl;
332 mday  1.1 }
333           
334           
335           Boolean MessageQueueService::register_service(String name, 
336           					      Uint32 capabilities, 
337           					      Uint32 mask)
338           
339           {
340              RegisterCimService *msg = new RegisterCimService(get_next_xid(),
341 mday  1.5 						    0, 
342 mday  1.1 						    true, 
343           						    name, 
344           						    capabilities, 
345           						    mask,
346           						    _queueId);
347              Boolean registered = false;
348 mday  1.2    AsyncMessage *reply = SendWait( msg );
349 mday  1.1    
350 mday  1.2    if ( reply != 0 )
351 mday  1.1    {
352                 if(reply->getMask() & message_mask:: ha_async)
353                 {
354           	 if(reply->getMask() & message_mask::ha_reply)
355           	 {
356 mday  1.2 	    if((static_cast<AsyncReply *>(reply))->result == async_results::OK)
357 mday  1.1 	       registered = true;
358           	 }
359                 }
360                 
361                 delete reply;
362              }
363 mday  1.5    delete msg;
364 mday  1.1    return registered;
365           }
366           
367           Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
368           {
369              
370              
371              UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
372 mday  1.5 						0, 
373 mday  1.1 						true, 
374           						_queueId,
375           						_capabilities, 
376           						_mask);
377              Boolean registered = false;
378 mday  1.2 
379              AsyncMessage *reply = SendWait(msg);
380              if (reply)
381 mday  1.1    {
382                 if(reply->getMask() & message_mask:: ha_async)
383                 {
384           	 if(reply->getMask() & message_mask::ha_reply)
385           	 {
386 mday  1.2 	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
387 mday  1.1 	       registered = true;
388           	 }
389                 }
390                 delete reply;
391              }
392 mday  1.5    delete msg;
393 mday  1.1    return registered;
394           }
395           
396           
397           Boolean MessageQueueService::deregister_service(void)
398           {
399 mday  1.3 
400 mday  1.5    _meta_dispatcher->deregister_module(_queueId);
401              return true;
402 mday  1.1 }
403           
404           
405           void MessageQueueService::find_services(String name, 
406           					Uint32 capabilities, 
407           					Uint32 mask, 
408           					Array<Uint32> *results)
409           {
410              
411              if( results == 0 )
412                 throw NullPointer();
413 mday  1.5     
414 mday  1.1    results->clear();
415              
416              FindServiceQueue *req = 
417                 new FindServiceQueue(get_next_xid(), 
418 mday  1.5 			   0, 
419 mday  1.1 			   _queueId, 
420           			   true, 
421           			   name, 
422           			   capabilities, 
423           			   mask);
424              
425 mday  1.2    AsyncMessage *reply = SendWait(req); 
426              if(reply)
427 mday  1.1    {
428                 if( reply->getMask() & message_mask::ha_async)
429                 {
430           	 if(reply->getMask() & message_mask::ha_reply)
431           	 {
432           	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
433           	    {
434           	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
435           		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
436           	    }
437           	 }
438                 }
439                 delete reply;
440              }
441 mday  1.5    delete req;
442 mday  1.1    return ;
443           }
444           
445           void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
446           {
447              if(result == 0)
448                 throw NullPointer();
449              
450              EnumerateService *req 
451                 = new EnumerateService(get_next_xid(),
452 mday  1.5 			     0, 
453 mday  1.1 			     _queueId, 
454           			     true, 
455           			     queue);
456              
457 mday  1.2    AsyncMessage *reply = SendWait(req);
458 mday  1.1    
459 mday  1.2    if (reply)
460 mday  1.1    {
461                 Boolean found = false;
462                 
463                 if( reply->getMask() & message_mask::ha_async)
464                 {
465           	 if(reply->getMask() & message_mask::ha_reply)
466           	 {
467           	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
468           	    {
469           	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
470           	       {
471           		  if( found == false)
472           		  {
473           		     found = true;
474           		     
475           		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
476           		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
477           		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
478           		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
479           		  }
480           	       }
481 mday  1.1 	    }
482           	 }
483                 }
484                 delete reply;
485              }
486 mday  1.5    delete req;
487              
488 mday  1.1    return;
489           }
490           
491           Uint32 MessageQueueService::get_next_xid(void)
492           {
493              _xid++;
494              return _xid.value();
495           }
496           
497           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2