(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 mday  1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
   2           //
   3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
   4           //
   5           // Permission is hereby granted, free of charge, to any person obtaining a copy
   6           // of this software and associated documentation files (the "Software"), to
   7           // deal in the Software without restriction, including without limitation the
   8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
   9           // sell copies of the Software, and to permit persons to whom the Software is
  10           // furnished to do so, subject to the following conditions:
  11           //
  12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  20           //
  21           //==============================================================================
  22 mday  1.1 //
  23           // Author: Mike Day (mdday@us.ibm.com)
  24           //
  25           // Modified By:
  26           //
  27           //%/////////////////////////////////////////////////////////////////////////////
  28           
  29           #include "MessageQueueService.h"
  30 mday  1.22 #include <Pegasus/Common/Tracer.h>
  31 mday  1.1  
  32            PEGASUS_NAMESPACE_BEGIN
  33            
  34 mday  1.15  
  35            cimom *MessageQueueService::_meta_dispatcher = 0;
  36            AtomicInt MessageQueueService::_service_count = 0;
  37            AtomicInt MessageQueueService::_xid(1);
  38 mday  1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  39 mday  1.15 
  40            
  41 mday  1.1  MessageQueueService::MessageQueueService(const char *name, 
  42            					 Uint32 queueID, 
  43            					 Uint32 capabilities, 
  44            					 Uint32 mask) 
  45 mday  1.15    : Base(name, true,  queueID),
  46 mday  1.22      
  47 mday  1.1       _mask(mask),
  48 mday  1.4       _die(0),
  49 mday  1.41      _incoming(true, 0),
  50 mday  1.39      _callback(true),
  51 mday  1.7       _incoming_queue_shutdown(0),
  52 mday  1.39      _callback_ready(0),
  53                 _req_thread(_req_proc, this, false),
  54                 _callback_thread(_callback_proc, this, false)
  55 mday  1.1  { 
  56 mday  1.22    _capabilities = (capabilities | module_capabilities::async);
  57               
  58 mday  1.1     _default_op_timeout.tv_sec = 30;
  59               _default_op_timeout.tv_usec = 100;
  60 mday  1.15 
  61               _meta_dispatcher_mutex.lock(pegasus_thread_self());
  62               
  63               if( _meta_dispatcher == 0 )
  64               {
  65                  PEGASUS_ASSERT( _service_count.value() == 0 );
  66                  _meta_dispatcher = new cimom();
  67                  if (_meta_dispatcher == NULL )
  68                  {
  69            	 _meta_dispatcher_mutex.unlock();
  70            	 
  71            	 throw NullPointer();
  72                  }
  73                  
  74               }
  75               _service_count++;
  76            
  77            
  78               if( false == register_service(name, _capabilities, _mask) )
  79               {
  80                  _meta_dispatcher_mutex.unlock();
  81 mday  1.15       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
  82               }
  83               
  84               _meta_dispatcher_mutex.unlock();
  85 mday  1.39 //   _callback_thread.run();
  86               
  87 mday  1.4     _req_thread.run();
  88 mday  1.1  }
  89            
  90 mday  1.4  
  91 mday  1.1  MessageQueueService::~MessageQueueService(void)
  92            {
  93               _die = 1;
  94 mday  1.7     if (_incoming_queue_shutdown.value() == 0 )
  95 mday  1.16    {
  96 mday  1.32       _shutdown_incoming_queue();
  97 mday  1.16        _req_thread.join();
  98               }
  99 mday  1.39    _callback_ready.signal();
 100               _callback_thread.join();
 101               
 102 mday  1.15    _meta_dispatcher_mutex.lock(pegasus_thread_self());
 103               _service_count--;
 104               if (_service_count.value() == 0 )
 105               {
 106                  _meta_dispatcher->_shutdown_routed_queue();
 107                  delete _meta_dispatcher;
 108               }
 109               _meta_dispatcher_mutex.unlock();
 110               
 111 mday  1.1  }
 112            
 113 mday  1.15 
 114 mday  1.1  
 115 mday  1.7  void MessageQueueService::_shutdown_incoming_queue(void)
 116            {
 117               
 118 mday  1.9     if (_incoming_queue_shutdown.value() > 0 )
 119                  return ;
 120 mday  1.8     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 121            				    0, 
 122            				    _queueId, 
 123            				    _queueId, 
 124            				    true, 
 125            				    AsyncIoctl::IO_CLOSE, 
 126            				    0, 
 127            				    0);
 128 mday  1.9  
 129 mday  1.8     msg->op = get_op();
 130 mday  1.41    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 131               msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 132            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 133 mday  1.32    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 134 mday  1.41 
 135 mday  1.32    msg->op->_op_dest = this;
 136 mday  1.8     msg->op->_request.insert_first(msg);
 137               
 138               _incoming.insert_last_wait(msg->op);
 139 mday  1.32 
 140 mday  1.16    _req_thread.join();
 141               
 142 mday  1.7  }
 143            
 144 mday  1.22 
 145            
 146            void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 147            {
 148 kumpf 1.28    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 149            
 150 mday  1.22    Base::enqueue(msg);
 151 kumpf 1.28 
 152               PEG_METHOD_EXIT();
 153 mday  1.22 }
 154            
 155            
 156 mday  1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 157            {
 158               Thread *myself = reinterpret_cast<Thread *>(parm);
 159               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 160               AsyncOpNode *operation = 0;
 161               
 162               while ( service->_die.value() == 0 ) 
 163               {
 164                  service->_callback_ready.wait();
 165                  
 166                  service->_callback.lock();
 167                  operation = service->_callback.next(0);
 168                  while( operation != NULL)
 169                  {
 170            	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 171            	 {
 172            	    operation = service->_callback.remove_no_lock(operation);
 173            	    PEGASUS_ASSERT(operation != NULL);
 174            	    operation->_thread_ptr = myself;
 175            	    operation->_service_ptr = service;
 176            	    service->_handle_async_callback(operation);
 177 mday  1.39 	    break;
 178            	 }
 179            	 operation = service->_callback.next(operation);
 180                  }
 181                  service->_callback.unlock();
 182               }
 183               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 184               return(0);
 185            }
 186            
 187 mday  1.22 
 188 mday  1.5  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 189 mday  1.1  {
 190 mday  1.5     Thread *myself = reinterpret_cast<Thread *>(parm);
 191               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 192            
 193               // pull messages off the incoming queue and dispatch them. then 
 194               // check pending messages that are non-blocking
 195 mday  1.7     AsyncOpNode *operation = 0;
 196               
 197 mday  1.6     while ( service->_die.value() == 0 ) 
 198 mday  1.1     {
 199 mday  1.41       if(service->_incoming.count() > 0 )
 200 mday  1.7        {
 201 mday  1.41 	 try 
 202            	 {
 203            	    operation = service->_incoming.remove_first();
 204            	 }
 205            	 catch(ListClosed & )
 206            	 {
 207            	    break;
 208            	 }
 209            	 if( operation )
 210            	 {
 211            	    operation->_thread_ptr = myself;
 212            	    operation->_service_ptr = service;
 213            	    service->_handle_incoming_operation(operation);
 214            	 }
 215 mday  1.7        }
 216 mday  1.41        if( service->_callback.count() > 0 )
 217 mday  1.7        {
 218 mday  1.41 	 service->_callback.lock(); 
 219            	 operation = service->_callback.next(0);
 220            	 while( operation != NULL )
 221            	 {
 222            	    if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 223            	    {
 224            	       operation = service->_callback.remove_no_lock(operation);
 225            	       service->_callback.unlock();
 226            	       PEGASUS_ASSERT(operation != NULL);
 227            	       operation->_thread_ptr = myself;
 228            	       operation->_service_ptr = service;
 229            	       service->_handle_async_callback(operation);
 230            	       service->_callback.lock();
 231            	       operation = service->_callback.next(operation);
 232            	       continue;
 233            	    }
 234            	    pegasus_yield();
 235            	    operation = service->_callback.next(operation);
 236            	 }
 237            	 service->_callback.unlock();
 238 mday  1.7        }
 239 mday  1.41       if( (service->_incoming.count() == 0) && (service->_callback.count() == 0 ) )
 240 mday  1.42 	 myself->sleep(1);
 241 mday  1.41       else 
 242 mday  1.42 	 myself->thread_switch();
 243               }
 244                  
 245 mday  1.5     myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 246               return(0);
 247 mday  1.1  }
 248            
 249 mday  1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 250            					     MessageQueue *q, 
 251            					     void *parm)
 252            {
 253               op->_client_sem.signal();
 254            }
 255            
 256 mday  1.30 
 257            // callback function is responsible for cleaning up all resources
 258            // including op, op->_callback_node, and op->_callback_ptr
 259 mday  1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 260            {
 261 mday  1.39    if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 262               {
 263            
 264                  Message *msg = op->get_request();
 265                  if( msg && ( msg->getMask() & message_mask::ha_async))
 266                  {
 267            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 268            	 {
 269            	    AsyncLegacyOperationStart *wrapper = 
 270            	       static_cast<AsyncLegacyOperationStart *>(msg);
 271            	    msg = wrapper->get_action();
 272            	    delete wrapper;
 273            	 }
 274            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 275            	 {
 276            	    AsyncModuleOperationStart *wrapper = 
 277            	       static_cast<AsyncModuleOperationStart *>(msg);
 278            	    msg = wrapper->get_action();
 279            	    delete wrapper;
 280            	 }
 281            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 282 mday  1.39 	 {
 283            	    AsyncModuleOperationStart *wrapper = 
 284            	       static_cast<AsyncModuleOperationStart *>(msg);
 285            	    msg = wrapper->get_action();
 286            	    delete wrapper;
 287            	 }
 288            	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 289            	 {
 290            	    AsyncOperationStart *wrapper = 
 291            	       static_cast<AsyncOperationStart *>(msg);
 292            	    msg = wrapper->get_action();
 293            	    delete wrapper;
 294            	 }
 295            	 delete msg;
 296                  }
 297            
 298                  msg = op->get_response();
 299                  if( msg && ( msg->getMask() & message_mask::ha_async))
 300                  {
 301            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 302            	 {
 303 mday  1.39 	    AsyncLegacyOperationResult *wrapper = 
 304            	       static_cast<AsyncLegacyOperationResult *>(msg);
 305            	    msg = wrapper->get_result();
 306            	    delete wrapper;
 307            	 }
 308            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 309            	 {
 310            	    AsyncModuleOperationResult *wrapper = 
 311            	       static_cast<AsyncModuleOperationResult *>(msg);
 312            	    msg = wrapper->get_result();
 313            	    delete wrapper;
 314            	 }
 315                  }
 316                  void (*callback)(Message *, void *, void *) = op->__async_callback;
 317                  void *handle = op->_callback_handle;
 318                  void *parm = op->_callback_parameter;
 319                  op->release();
 320                  return_op(op);
 321                  callback(msg, handle, parm);
 322               }
 323               else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 324 mday  1.39    {
 325                  // note that _callback_node may be different from op 
 326                  // op->_callback_response_q is a "this" pointer we can use for 
 327                  // static callback methods
 328                  op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 329               }
 330 mday  1.22 }
 331            
 332 mday  1.1  
 333 mday  1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 334            //						     Thread *thread, 
 335            //						     MessageQueue *queue)
 336 mday  1.1  {
 337 mday  1.5     if ( operation != 0 )
 338               {
 339 mday  1.29       
 340 mday  1.22 // ATTN: optimization 
 341            // << Tue Feb 19 14:10:38 2002 mdd >>
 342 mday  1.6        operation->lock();
 343 mday  1.29             
 344 mday  1.6        Message *rq = operation->_request.next(0);
 345 mday  1.29      
 346 mday  1.31 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 347            // move this to the bottom of the loop when the majority of 
 348            // messages become async messages. 
 349            
 350 mday  1.18       // divert legacy messages to handleEnqueue
 351 mday  1.29       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 352 mday  1.18       {
 353            	 rq = operation->_request.remove_first() ;
 354            	 operation->unlock();
 355            	 // delete the op node 
 356 mday  1.39 	 operation->release();
 357            	 return_op( operation);
 358 mday  1.24 
 359 mday  1.26 	 handleEnqueue(rq);
 360 mday  1.18 	 return;
 361                  }
 362            
 363 mday  1.39       if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 364            	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 365 mday  1.29 	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 366                  {
 367            	 operation->unlock();
 368            	 _handle_async_callback(operation);
 369                  }
 370                  else 
 371                  {
 372            	 PEGASUS_ASSERT(rq != 0 );
 373            	 // ATTN: optimization
 374            	 // << Wed Mar  6 15:00:39 2002 mdd >>
 375            	 // put thread and queue into the asyncopnode structure. 
 376 mday  1.34          //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
 377                     //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
 378            	 // done << Tue Mar 12 14:49:07 2002 mdd >>
 379 mday  1.29 	 operation->unlock();
 380            	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 381                  }
 382 mday  1.5     }
 383               return;
 384 mday  1.1  }
 385            
 386            void MessageQueueService::_handle_async_request(AsyncRequest *req)
 387            {
 388 mday  1.4     if ( req != 0 )
 389               {
 390                  req->op->processing();
 391 mday  1.1     
 392 mday  1.4        Uint32 type = req->getType();
 393                  if( type == async_messages::HEARTBEAT )
 394            	 handle_heartbeat_request(req);
 395                  else if (type == async_messages::IOCTL)
 396            	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 397                  else if (type == async_messages::CIMSERVICE_START)
 398            	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 399                  else if (type == async_messages::CIMSERVICE_STOP)
 400            	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 401                  else if (type == async_messages::CIMSERVICE_PAUSE)
 402            	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 403                  else if (type == async_messages::CIMSERVICE_RESUME)
 404            	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 405                  else if ( type == async_messages::ASYNC_OP_START)
 406            	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 407                  else 
 408                  {
 409            	 // we don't handle this request message 
 410            	 _make_response(req, async_results::CIM_NAK );
 411                  }
 412 mday  1.1     }
 413            }
 414            
 415 mday  1.17 
 416            Boolean MessageQueueService::_enqueueResponse(
 417               Message* request, 
 418               Message* response)
 419 mday  1.18    
 420 mday  1.17 {
 421 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 422                                "MessageQueueService::_enqueueResponse");
 423 mday  1.25 
 424               if( request->getMask() & message_mask::ha_async)
 425               {
 426                  if (response->getMask() & message_mask::ha_async )
 427                  {
 428            	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 429            				static_cast<AsyncReply *>(response), 
 430            				ASYNC_OPSTATE_COMPLETE, 0 );
 431 kumpf 1.37          PEG_METHOD_EXIT();
 432 mday  1.25 	 return true;
 433                  }
 434               }
 435               
 436 mday  1.17    if(request->_async != 0 )
 437               {
 438                  Uint32 mask = request->_async->getMask();
 439 mday  1.18       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 440                  
 441                  AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 442                  AsyncOpNode *op = async->op;
 443                  request->_async = 0;
 444 mday  1.27       // this request is probably going to be deleted !!
 445                  // remove it from the op node 
 446                  op->_request.remove(request);
 447 mday  1.18       
 448                  AsyncLegacyOperationResult *async_result = 
 449            	 new AsyncLegacyOperationResult( 
 450            	    async->getKey(),
 451            	    async->getRouting(),
 452            	    op,
 453            	    response);
 454                  _completeAsyncResponse(async,
 455            			     async_result,
 456            			     ASYNC_OPSTATE_COMPLETE, 
 457            			     0);
 458 kumpf 1.37       PEG_METHOD_EXIT();
 459 mday  1.18       return true;
 460 mday  1.17    }
 461 mday  1.18    
 462               // ensure that the destination queue is in response->dest
 463 kumpf 1.37    PEG_METHOD_EXIT();
 464 mday  1.24    return SendForget(response);
 465 mday  1.18    
 466 mday  1.17 }
 467            
 468 mday  1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
 469 mday  1.1  {
 470 mday  1.19    cimom::_make_response(req, code);
 471 mday  1.1  }
 472            
 473            
 474 mday  1.5  void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 475            						AsyncReply *reply, 
 476            						Uint32 state, 
 477            						Uint32 flag)
 478            {
 479 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 480                                "MessageQueueService::_completeAsyncResponse");
 481            
 482 mday  1.19    cimom::_completeAsyncResponse(request, reply, state, flag);
 483 kumpf 1.37 
 484               PEG_METHOD_EXIT();
 485 mday  1.5  }
 486            
 487            
 488 mday  1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 489            					    Uint32 state, 
 490            					    Uint32 flag, 
 491            					    Uint32 code)
 492            {
 493               cimom::_complete_op_node(op, state, flag, code);
 494            }
 495            
 496 mday  1.5  
 497            Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 498            {
 499 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 500                  return false;
 501               
 502 mday  1.20 // ATTN optimization remove the message checking altogether in the base 
 503            // << Mon Feb 18 14:02:20 2002 mdd >>
 504 mday  1.6     op->lock();
 505               Message *rq = op->_request.next(0);
 506 mday  1.20    Message *rp = op->_response.next(0);
 507 mday  1.6     op->unlock();
 508               
 509 mday  1.22    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 510            	_die.value() == 0  )
 511 mday  1.5     {
 512 mday  1.6        _incoming.insert_last_wait(op);
 513 mday  1.5        return true;
 514               }
 515 mday  1.32 //    else
 516            //    {
 517            //       if(  (rq != 0 && (true == MessageQueueService::messageOK(rq))) || 
 518            // 	   (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&  
 519            // 	   _die.value() == 0)
 520            //       {
 521            // 	 MessageQueueService::_incoming.insert_last_wait(op);
 522            // 	 return true;
 523            //       }
 524            //    }
 525               
 526 mday  1.5     return false;
 527            }
 528            
 529            Boolean MessageQueueService::messageOK(const Message *msg)
 530            {
 531 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 532                  return false;
 533 mday  1.18    return true;
 534            }
 535            
 536 mday  1.29 
 537            // made pure virtual 
 538            // << Wed Mar  6 15:11:31 2002 mdd >> 
 539 mday  1.25 // void MessageQueueService::handleEnqueue(Message *msg)
 540            // {
 541            //    if ( msg )
 542            //       delete msg;
 543            // }
 544 mday  1.5  
 545 mday  1.29 // made pure virtual 
 546            // << Wed Mar  6 15:11:56 2002 mdd >>
 547 mday  1.25 // void MessageQueueService::handleEnqueue(void)
 548            // {
 549            //     Message *msg = dequeue();
 550            //     handleEnqueue(msg);
 551            // }
 552 mday  1.5  
 553 mday  1.1  void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 554            {
 555               // default action is to echo a heartbeat response 
 556               
 557               AsyncReply *reply = 
 558                  new AsyncReply(async_messages::HEARTBEAT,
 559            		     req->getKey(),
 560            		     req->getRouting(),
 561            		     0,
 562            		     req->op, 
 563            		     async_results::OK, 
 564            		     req->resp,
 565            		     false);
 566 mday  1.4     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 567 mday  1.1  }
 568            
 569            
 570            void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 571            { 
 572               ;
 573            }
 574                  
 575            void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 576            {
 577 mday  1.8     
 578               switch( req->ctl )
 579               {
 580                  case AsyncIoctl::IO_CLOSE:
 581                  {
 582            	 // save my bearings 
 583 mday  1.34 	 Thread *myself = req->op->_thread_ptr;
 584            	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 585 mday  1.8  	 
 586            	 // respond to this message.
 587            	 _make_response(req, async_results::OK);
 588            	 // ensure we do not accept any further messages
 589            
 590            	 // ensure we don't recurse on IO_CLOSE
 591            	 if( _incoming_queue_shutdown.value() > 0 )
 592            	    break;
 593            	 
 594            	 // set the closing flag 
 595            	 service->_incoming_queue_shutdown = 1;
 596            	 // empty out the queue
 597            	 while( 1 )
 598            	 {
 599            	    AsyncOpNode *operation;
 600            	    try 
 601            	    {
 602            	       operation = service->_incoming.remove_first();
 603            	    }
 604            	    catch(IPCException & )
 605            	    {
 606 mday  1.8  	       break;
 607            	    }
 608            	    if( operation )
 609            	    {
 610 mday  1.34 	       operation->_thread_ptr = myself;
 611            	       operation->_service_ptr = service;
 612            	       service->_handle_incoming_operation(operation);
 613 mday  1.8  	    }
 614            	    else
 615            	       break;
 616            	 } // message processing loop
 617            
 618            	 // shutdown the AsyncDQueue
 619            	 service->_incoming.shutdown_queue();
 620            	 // exit the thread ! 
 621            	 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 622            	 return;
 623                  }
 624            
 625                  default:
 626            	 _make_response(req, async_results::CIM_NAK);
 627               }
 628 mday  1.1  }
 629 mday  1.8  
 630 mday  1.1  void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 631            {
 632 mday  1.10    // clear the stoped bit and update
 633 mday  1.13    _capabilities &= (~(module_capabilities::stopped));
 634 mday  1.10    _make_response(req, async_results::OK);
 635               // now tell the meta dispatcher we are stopped 
 636               update_service(_capabilities, _mask);
 637            
 638 mday  1.1  }
 639            void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 640            {
 641 mday  1.10    // set the stopeed bit and update
 642               _capabilities |= module_capabilities::stopped;
 643               _make_response(req, async_results::CIM_STOPPED);
 644               // now tell the meta dispatcher we are stopped 
 645               update_service(_capabilities, _mask);
 646               
 647 mday  1.1  }
 648            void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 649            {
 650 mday  1.10    // set the paused bit and update
 651 mday  1.13    _capabilities |= module_capabilities::paused;
 652 mday  1.11    update_service(_capabilities, _mask);
 653 mday  1.10    _make_response(req, async_results::CIM_PAUSED);
 654               // now tell the meta dispatcher we are stopped 
 655 mday  1.1  }
 656            void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 657            {
 658 mday  1.10    // clear the paused  bit and update
 659 mday  1.13    _capabilities &= (~(module_capabilities::paused));
 660 mday  1.11    update_service(_capabilities, _mask);
 661 mday  1.10    _make_response(req, async_results::OK);
 662               // now tell the meta dispatcher we are stopped 
 663 mday  1.1  }
 664                  
 665            void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 666            {
 667               _make_response(req, async_results::CIM_NAK);
 668            }
 669            
 670            void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 671            {
 672 mday  1.14    ;
 673            }
 674            
 675 mday  1.10 
 676 mday  1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 677            {
 678               // remove the legacy message from the request and enqueue it to its destination
 679               Uint32 result = async_results::CIM_NAK;
 680               
 681 mday  1.25    Message *legacy = req->_act;
 682 mday  1.14    if ( legacy != 0 )
 683               {
 684 mday  1.25       MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 685 mday  1.14       if( queue != 0 )
 686                  {
 687 mday  1.25 	 if(queue->isAsync() == true )
 688            	 {
 689            	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 690            	 }
 691            	 else 
 692            	 {
 693            	    // Enqueue the response:
 694            	    queue->enqueue(req->get_action());
 695            	 }
 696            	 
 697 mday  1.14 	 result = async_results::OK;
 698                  }
 699               }
 700               _make_response(req, result);
 701            }
 702            
 703            void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 704            {
 705               ;
 706 mday  1.1  }
 707            
 708            AsyncOpNode *MessageQueueService::get_op(void)
 709            {
 710 mday  1.4     AsyncOpNode *op = new AsyncOpNode();
 711 mday  1.1     
 712 mday  1.9     op->_state = ASYNC_OPSTATE_UNKNOWN;
 713               op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 714 mday  1.4     
 715 mday  1.1     return op;
 716            }
 717            
 718            void MessageQueueService::return_op(AsyncOpNode *op)
 719            {
 720               PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 721 mday  1.4     delete op;
 722 mday  1.1  }
 723            
 724 mday  1.18 
 725 mday  1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 726            				       Uint32 destination)
 727            {
 728               PEGASUS_ASSERT(op != 0 );
 729 mday  1.30    op->lock();
 730               op->_op_dest = MessageQueue::lookup(destination);
 731 mday  1.29    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 732               op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 733 mday  1.30    op->unlock();
 734               if ( op->_op_dest == 0 )
 735                  return false;
 736                  
 737 mday  1.29    return  _meta_dispatcher->route_async(op);
 738            }
 739            
 740 mday  1.39  
 741 mday  1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 742            				       Uint32 destination,
 743 mday  1.18 				       void (*callback)(AsyncOpNode *, 
 744 mday  1.32 							MessageQueue *,
 745 mday  1.30 							void *),
 746            				       MessageQueue *callback_response_q,
 747            				       void *callback_ptr)
 748 mday  1.20 { 
 749 mday  1.21    PEGASUS_ASSERT(op != 0 && callback != 0 );
 750 mday  1.18    
 751 mday  1.21    // get the queue handle for the destination
 752            
 753 mday  1.30    op->lock();
 754 mday  1.32    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 755 mday  1.22    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 756               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 757               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 758 mday  1.30    // initialize the callback data
 759 mday  1.32    op->_async_callback = callback;   // callback function to be executed by recpt. of response
 760               op->_callback_node = op;          // the op node
 761               op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 762               op->_callback_ptr = callback_ptr;   // user data for callback
 763               op->_callback_request_q = this;     // I am the originator of this request
 764 mday  1.30    
 765               op->unlock();
 766               if(op->_op_dest == 0) 
 767                  return false;
 768 mday  1.21    
 769               return  _meta_dispatcher->route_async(op);
 770 mday  1.18 }
 771            
 772            
 773 mday  1.39 Boolean MessageQueueService::SendAsync(Message *msg, 
 774            				       Uint32 destination,
 775            				       void (*callback)(Message *response, 
 776            							void *handle, 
 777            							void *parameter),
 778            				       void *handle, 
 779            				       void *parameter)
 780            {
 781               if(msg == NULL)
 782                  return false;
 783               if(callback == NULL)
 784                  return SendForget(msg);
 785               AsyncOpNode *op = get_op();
 786               if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 787               {
 788                  op->release();
 789                  return_op(op);
 790                  return false;
 791               }
 792               op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 793               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 794 mday  1.39    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 795               op->__async_callback = callback;
 796               op->_callback_node = op;
 797               op->_callback_handle = handle;
 798               op->_callback_parameter = parameter;
 799               op->_callback_response_q = this;
 800               
 801            
 802               if( ! (msg->getMask() & message_mask::ha_async) )
 803               {
 804                  AsyncLegacyOperationStart *wrapper = 
 805            	 new AsyncLegacyOperationStart(get_next_xid(),
 806            				       op, 
 807            				       destination, 
 808            				       msg, 
 809            				       destination);
 810                  msg = static_cast<Message *>(wrapper);
 811               }
 812               else 
 813               {
 814                  op->_request.insert_first(msg);
 815 mday  1.39       (static_cast<AsyncMessage *>(msg))->op = op;
 816               }
 817               
 818 mday  1.42    op->_callback_notify = _incoming.get_node_cond();
 819 mday  1.39    _callback.insert_last(op);
 820               return _meta_dispatcher->route_async(op);
 821            }
 822            
 823            
 824 mday  1.18 Boolean MessageQueueService::SendForget(Message *msg)
 825            {
 826            
 827 mday  1.24    
 828 mday  1.18    AsyncOpNode *op = 0;
 829 mday  1.22    Uint32 mask = msg->getMask();
 830               
 831               if (mask & message_mask::ha_async)
 832 mday  1.18    {
 833                  op = (static_cast<AsyncMessage *>(msg))->op ;
 834               }
 835 mday  1.22 
 836 mday  1.18    if( op == 0 )
 837 mday  1.20    {
 838 mday  1.18       op = get_op();
 839 mday  1.20       op->_request.insert_first(msg);
 840 mday  1.22       if (mask & message_mask::ha_async)
 841            	 (static_cast<AsyncMessage *>(msg))->op = op;
 842 mday  1.20    }
 843 mday  1.30    op->_op_dest = MessageQueue::lookup(msg->dest);
 844 mday  1.22    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 845 mday  1.41    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 846            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 847 mday  1.22    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 848 mday  1.30    if ( op->_op_dest == 0 )
 849 mday  1.39    {
 850                  op->release();
 851                  return_op(op);
 852 mday  1.21       return false;
 853 mday  1.39    }
 854 mday  1.24    
 855 mday  1.18    // now see if the meta dispatcher will take it
 856 mday  1.30    return  _meta_dispatcher->route_async(op);
 857 mday  1.18 }
 858 mday  1.2  
 859 mday  1.1  
 860 mday  1.4  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 861 mday  1.1  {
 862 mday  1.4     if ( request == 0 )
 863                  return 0 ;
 864 mday  1.5  
 865               Boolean destroy_op = false;
 866               
 867               if (request->op == false)
 868               {
 869                  request->op = get_op();
 870 mday  1.7        request->op->_request.insert_first(request);
 871 mday  1.5        destroy_op = true;
 872               }
 873 mday  1.4     
 874 mday  1.33    request->block = false;
 875 mday  1.35    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 876 mday  1.33    SendAsync(request->op, 
 877            	     request->dest,
 878 mday  1.36  	     _sendwait_callback,
 879 mday  1.33 	     this,
 880            	     (void *)0);
 881 mday  1.4     
 882 mday  1.33    request->op->_client_sem.wait();
 883 mday  1.6     request->op->lock();
 884               AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 885               rpl->op = 0;
 886               request->op->unlock();
 887               
 888 mday  1.5     if( destroy_op == true)
 889               {
 890 mday  1.6        request->op->lock();
 891                  request->op->_request.remove(request);
 892                  request->op->_state |= ASYNC_OPSTATE_RELEASED;
 893                  request->op->unlock();
 894                  return_op(request->op);
 895 mday  1.7        request->op = 0;
 896 mday  1.5     }
 897               return rpl;
 898 mday  1.1  }
 899            
 900            
 901            Boolean MessageQueueService::register_service(String name, 
 902            					      Uint32 capabilities, 
 903            					      Uint32 mask)
 904            
 905            {
 906               RegisterCimService *msg = new RegisterCimService(get_next_xid(),
 907 mday  1.5  						    0, 
 908 mday  1.1  						    true, 
 909            						    name, 
 910            						    capabilities, 
 911            						    mask,
 912            						    _queueId);
 913               Boolean registered = false;
 914 mday  1.7     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
 915 mday  1.1     
 916 mday  1.2     if ( reply != 0 )
 917 mday  1.1     {
 918                  if(reply->getMask() & message_mask:: ha_async)
 919                  {
 920            	 if(reply->getMask() & message_mask::ha_reply)
 921            	 {
 922 mday  1.15 	    if(reply->result == async_results::OK || 
 923            	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
 924 mday  1.1  	       registered = true;
 925            	 }
 926                  }
 927                  
 928 mday  1.7        delete reply; 
 929 mday  1.1     }
 930 mday  1.5     delete msg;
 931 mday  1.1     return registered;
 932            }
 933            
 934            Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 935            {
 936               
 937               
 938               UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
 939 mday  1.5  						0, 
 940 mday  1.1  						true, 
 941            						_queueId,
 942            						_capabilities, 
 943            						_mask);
 944               Boolean registered = false;
 945 mday  1.2  
 946               AsyncMessage *reply = SendWait(msg);
 947               if (reply)
 948 mday  1.1     {
 949                  if(reply->getMask() & message_mask:: ha_async)
 950                  {
 951            	 if(reply->getMask() & message_mask::ha_reply)
 952            	 {
 953 mday  1.2  	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
 954 mday  1.1  	       registered = true;
 955            	 }
 956                  }
 957                  delete reply;
 958               }
 959 mday  1.5     delete msg;
 960 mday  1.1     return registered;
 961            }
 962            
 963            
 964            Boolean MessageQueueService::deregister_service(void)
 965            {
 966 mday  1.3  
 967 mday  1.5     _meta_dispatcher->deregister_module(_queueId);
 968               return true;
 969 mday  1.1  }
 970            
 971            
 972            void MessageQueueService::find_services(String name, 
 973            					Uint32 capabilities, 
 974            					Uint32 mask, 
 975            					Array<Uint32> *results)
 976            {
 977               
 978               if( results == 0 )
 979                  throw NullPointer();
 980 mday  1.5      
 981 mday  1.1     results->clear();
 982               
 983               FindServiceQueue *req = 
 984                  new FindServiceQueue(get_next_xid(), 
 985 mday  1.5  			   0, 
 986 mday  1.1  			   _queueId, 
 987            			   true, 
 988            			   name, 
 989            			   capabilities, 
 990            			   mask);
 991               
 992 mday  1.2     AsyncMessage *reply = SendWait(req); 
 993               if(reply)
 994 mday  1.1     {
 995                  if( reply->getMask() & message_mask::ha_async)
 996                  {
 997            	 if(reply->getMask() & message_mask::ha_reply)
 998            	 {
 999            	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1000            	    {
1001            	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1002            		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1003            	    }
1004            	 }
1005                  }
1006                  delete reply;
1007               }
1008 mday  1.5     delete req;
1009 mday  1.1     return ;
1010            }
1011            
1012            void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1013            {
1014               if(result == 0)
1015                  throw NullPointer();
1016               
1017               EnumerateService *req 
1018                  = new EnumerateService(get_next_xid(),
1019 mday  1.5  			     0, 
1020 mday  1.1  			     _queueId, 
1021            			     true, 
1022            			     queue);
1023               
1024 mday  1.2     AsyncMessage *reply = SendWait(req);
1025 mday  1.1     
1026 mday  1.2     if (reply)
1027 mday  1.1     {
1028                  Boolean found = false;
1029                  
1030                  if( reply->getMask() & message_mask::ha_async)
1031                  {
1032            	 if(reply->getMask() & message_mask::ha_reply)
1033            	 {
1034            	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1035            	    {
1036            	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1037            	       {
1038            		  if( found == false)
1039            		  {
1040            		     found = true;
1041            		     
1042            		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1043            		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1044            		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1045            		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1046            		  }
1047            	       }
1048 mday  1.1  	    }
1049            	 }
1050                  }
1051                  delete reply;
1052               }
1053 mday  1.5     delete req;
1054               
1055 mday  1.1     return;
1056            }
1057            
1058            Uint32 MessageQueueService::get_next_xid(void)
1059            {
1060               _xid++;
1061               return _xid.value();
1062            }
1063            
1064            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2