(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 mday  1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
   2           //
   3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
   4           //
   5           // Permission is hereby granted, free of charge, to any person obtaining a copy
   6           // of this software and associated documentation files (the "Software"), to
   7           // deal in the Software without restriction, including without limitation the
   8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
   9           // sell copies of the Software, and to permit persons to whom the Software is
  10           // furnished to do so, subject to the following conditions:
  11           //
  12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  20           //
  21           //==============================================================================
  22 mday  1.1 //
  23           // Author: Mike Day (mdday@us.ibm.com)
  24           //
  25           // Modified By:
  26           //
  27           //%/////////////////////////////////////////////////////////////////////////////
  28           
  29           #include "MessageQueueService.h"
  30 mday  1.22 #include <Pegasus/Common/Tracer.h>
  31 mday  1.1  
  32            PEGASUS_NAMESPACE_BEGIN
  33            
  34 mday  1.15  
  35            cimom *MessageQueueService::_meta_dispatcher = 0;
  36            AtomicInt MessageQueueService::_service_count = 0;
  37            AtomicInt MessageQueueService::_xid(1);
  38 mday  1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  39 mday  1.15 
  40            
  41 mday  1.1  MessageQueueService::MessageQueueService(const char *name, 
  42            					 Uint32 queueID, 
  43            					 Uint32 capabilities, 
  44            					 Uint32 mask) 
  45 mday  1.15    : Base(name, true,  queueID),
  46 mday  1.22      
  47 mday  1.1       _mask(mask),
  48 mday  1.4       _die(0),
  49 mday  1.41      _incoming(true, 0),
  50 mday  1.39      _callback(true),
  51 mday  1.7       _incoming_queue_shutdown(0),
  52 mday  1.39      _callback_ready(0),
  53                 _req_thread(_req_proc, this, false),
  54                 _callback_thread(_callback_proc, this, false)
  55 mday  1.1  { 
  56 mday  1.22    _capabilities = (capabilities | module_capabilities::async);
  57               
  58 mday  1.1     _default_op_timeout.tv_sec = 30;
  59               _default_op_timeout.tv_usec = 100;
  60 mday  1.15 
  61               _meta_dispatcher_mutex.lock(pegasus_thread_self());
  62               
  63               if( _meta_dispatcher == 0 )
  64               {
  65                  PEGASUS_ASSERT( _service_count.value() == 0 );
  66                  _meta_dispatcher = new cimom();
  67                  if (_meta_dispatcher == NULL )
  68                  {
  69            	 _meta_dispatcher_mutex.unlock();
  70            	 
  71            	 throw NullPointer();
  72                  }
  73                  
  74               }
  75               _service_count++;
  76            
  77               if( false == register_service(name, _capabilities, _mask) )
  78               {
  79                  _meta_dispatcher_mutex.unlock();
  80                  throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
  81 mday  1.15    }
  82               
  83               _meta_dispatcher_mutex.unlock();
  84 mday  1.39 //   _callback_thread.run();
  85               
  86 mday  1.4     _req_thread.run();
  87 mday  1.1  }
  88            
  89 mday  1.4  
  90 mday  1.1  MessageQueueService::~MessageQueueService(void)
  91            {
  92               _die = 1;
  93 mday  1.7     if (_incoming_queue_shutdown.value() == 0 )
  94 mday  1.16    {
  95 mday  1.32       _shutdown_incoming_queue();
  96 kumpf 1.46        _req_thread.join();  // ATTN-RK-P3-20020521: Is this redundant?
  97 mday  1.16    }
  98 mday  1.39    _callback_ready.signal();
  99               _callback_thread.join();
 100               
 101 mday  1.15    _meta_dispatcher_mutex.lock(pegasus_thread_self());
 102               _service_count--;
 103               if (_service_count.value() == 0 )
 104               {
 105                  _meta_dispatcher->_shutdown_routed_queue();
 106                  delete _meta_dispatcher;
 107 kumpf 1.45       _meta_dispatcher = 0;
 108 mday  1.15    }
 109               _meta_dispatcher_mutex.unlock();
 110               
 111 mday  1.1  }
 112            
 113 mday  1.15 
 114 mday  1.1  
 115 mday  1.7  void MessageQueueService::_shutdown_incoming_queue(void)
 116            {
 117               
 118 mday  1.9     if (_incoming_queue_shutdown.value() > 0 )
 119                  return ;
 120 mday  1.8     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 121            				    0, 
 122            				    _queueId, 
 123            				    _queueId, 
 124            				    true, 
 125            				    AsyncIoctl::IO_CLOSE, 
 126            				    0, 
 127            				    0);
 128 mday  1.9  
 129 mday  1.8     msg->op = get_op();
 130 mday  1.41    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 131               msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 132            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 133 mday  1.32    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 134 mday  1.41 
 135 mday  1.32    msg->op->_op_dest = this;
 136 mday  1.8     msg->op->_request.insert_first(msg);
 137               
 138               _incoming.insert_last_wait(msg->op);
 139 mday  1.32 
 140 mday  1.16    _req_thread.join();
 141               
 142 mday  1.7  }
 143            
 144 mday  1.22 
 145            
 146            void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 147            {
 148 kumpf 1.28    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 149            
 150 mday  1.22    Base::enqueue(msg);
 151 kumpf 1.28 
 152               PEG_METHOD_EXIT();
 153 mday  1.22 }
 154            
 155            
 156 mday  1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 157            {
 158               Thread *myself = reinterpret_cast<Thread *>(parm);
 159               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 160               AsyncOpNode *operation = 0;
 161               
 162               while ( service->_die.value() == 0 ) 
 163               {
 164                  service->_callback_ready.wait();
 165                  
 166                  service->_callback.lock();
 167                  operation = service->_callback.next(0);
 168                  while( operation != NULL)
 169                  {
 170            	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 171            	 {
 172            	    operation = service->_callback.remove_no_lock(operation);
 173            	    PEGASUS_ASSERT(operation != NULL);
 174            	    operation->_thread_ptr = myself;
 175            	    operation->_service_ptr = service;
 176            	    service->_handle_async_callback(operation);
 177 mday  1.39 	    break;
 178            	 }
 179            	 operation = service->_callback.next(operation);
 180                  }
 181                  service->_callback.unlock();
 182               }
 183               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 184               return(0);
 185            }
 186            
 187 mday  1.22 
 188 mday  1.5  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 189 mday  1.1  {
 190 mday  1.5     Thread *myself = reinterpret_cast<Thread *>(parm);
 191               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 192            
 193               // pull messages off the incoming queue and dispatch them. then 
 194               // check pending messages that are non-blocking
 195 mday  1.7     AsyncOpNode *operation = 0;
 196               
 197 mday  1.6     while ( service->_die.value() == 0 ) 
 198 mday  1.1     {
 199 mday  1.41 	 try 
 200            	 {
 201 mday  1.44 	    operation = service->_incoming.remove_first_wait();
 202 mday  1.41 	 }
 203            	 catch(ListClosed & )
 204            	 {
 205            	    break;
 206            	 }
 207            	 if( operation )
 208            	 {
 209            	    operation->_thread_ptr = myself;
 210            	    operation->_service_ptr = service;
 211            	    service->_handle_incoming_operation(operation);
 212            	 }
 213 mday  1.42    }
 214 mday  1.44 
 215 mday  1.5     myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 216               return(0);
 217 mday  1.1  }
 218            
 219 mday  1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
 220            {
 221               return _callback.count();
 222            }
 223            
 224            
 225            
 226 mday  1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 227            					     MessageQueue *q, 
 228            					     void *parm)
 229            {
 230               op->_client_sem.signal();
 231            }
 232            
 233 mday  1.30 
 234            // callback function is responsible for cleaning up all resources
 235            // including op, op->_callback_node, and op->_callback_ptr
 236 mday  1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 237            {
 238 mday  1.39    if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 239               {
 240            
 241                  Message *msg = op->get_request();
 242                  if( msg && ( msg->getMask() & message_mask::ha_async))
 243                  {
 244            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 245            	 {
 246            	    AsyncLegacyOperationStart *wrapper = 
 247            	       static_cast<AsyncLegacyOperationStart *>(msg);
 248            	    msg = wrapper->get_action();
 249            	    delete wrapper;
 250            	 }
 251            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 252            	 {
 253            	    AsyncModuleOperationStart *wrapper = 
 254            	       static_cast<AsyncModuleOperationStart *>(msg);
 255            	    msg = wrapper->get_action();
 256            	    delete wrapper;
 257            	 }
 258            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 259 mday  1.39 	 {
 260            	    AsyncModuleOperationStart *wrapper = 
 261            	       static_cast<AsyncModuleOperationStart *>(msg);
 262            	    msg = wrapper->get_action();
 263            	    delete wrapper;
 264            	 }
 265            	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 266            	 {
 267            	    AsyncOperationStart *wrapper = 
 268            	       static_cast<AsyncOperationStart *>(msg);
 269            	    msg = wrapper->get_action();
 270            	    delete wrapper;
 271            	 }
 272            	 delete msg;
 273                  }
 274            
 275                  msg = op->get_response();
 276                  if( msg && ( msg->getMask() & message_mask::ha_async))
 277                  {
 278            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 279            	 {
 280 mday  1.39 	    AsyncLegacyOperationResult *wrapper = 
 281            	       static_cast<AsyncLegacyOperationResult *>(msg);
 282            	    msg = wrapper->get_result();
 283            	    delete wrapper;
 284            	 }
 285            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 286            	 {
 287            	    AsyncModuleOperationResult *wrapper = 
 288            	       static_cast<AsyncModuleOperationResult *>(msg);
 289            	    msg = wrapper->get_result();
 290            	    delete wrapper;
 291            	 }
 292                  }
 293                  void (*callback)(Message *, void *, void *) = op->__async_callback;
 294                  void *handle = op->_callback_handle;
 295                  void *parm = op->_callback_parameter;
 296                  op->release();
 297                  return_op(op);
 298                  callback(msg, handle, parm);
 299               }
 300               else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 301 mday  1.39    {
 302                  // note that _callback_node may be different from op 
 303                  // op->_callback_response_q is a "this" pointer we can use for 
 304                  // static callback methods
 305                  op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 306               }
 307 mday  1.22 }
 308            
 309 mday  1.1  
 310 mday  1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 311            //						     Thread *thread, 
 312            //						     MessageQueue *queue)
 313 mday  1.1  {
 314 mday  1.5     if ( operation != 0 )
 315               {
 316 mday  1.29       
 317 mday  1.22 // ATTN: optimization 
 318            // << Tue Feb 19 14:10:38 2002 mdd >>
 319 mday  1.6        operation->lock();
 320 mday  1.29             
 321 mday  1.6        Message *rq = operation->_request.next(0);
 322 mday  1.29      
 323 mday  1.31 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 324            // move this to the bottom of the loop when the majority of 
 325            // messages become async messages. 
 326            
 327 mday  1.18       // divert legacy messages to handleEnqueue
 328 mday  1.29       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 329 mday  1.18       {
 330            	 rq = operation->_request.remove_first() ;
 331            	 operation->unlock();
 332            	 // delete the op node 
 333 mday  1.39 	 operation->release();
 334            	 return_op( operation);
 335 mday  1.24 
 336 mday  1.26 	 handleEnqueue(rq);
 337 mday  1.18 	 return;
 338                  }
 339            
 340 mday  1.39       if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 341            	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 342 mday  1.29 	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 343                  {
 344            	 operation->unlock();
 345            	 _handle_async_callback(operation);
 346                  }
 347                  else 
 348                  {
 349            	 PEGASUS_ASSERT(rq != 0 );
 350            	 // ATTN: optimization
 351            	 // << Wed Mar  6 15:00:39 2002 mdd >>
 352            	 // put thread and queue into the asyncopnode structure. 
 353 mday  1.34          //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
 354                     //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
 355            	 // done << Tue Mar 12 14:49:07 2002 mdd >>
 356 mday  1.29 	 operation->unlock();
 357            	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 358                  }
 359 mday  1.5     }
 360               return;
 361 mday  1.1  }
 362            
 363            void MessageQueueService::_handle_async_request(AsyncRequest *req)
 364            {
 365 mday  1.4     if ( req != 0 )
 366               {
 367                  req->op->processing();
 368 mday  1.1     
 369 mday  1.4        Uint32 type = req->getType();
 370                  if( type == async_messages::HEARTBEAT )
 371            	 handle_heartbeat_request(req);
 372                  else if (type == async_messages::IOCTL)
 373            	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 374                  else if (type == async_messages::CIMSERVICE_START)
 375            	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 376                  else if (type == async_messages::CIMSERVICE_STOP)
 377            	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 378                  else if (type == async_messages::CIMSERVICE_PAUSE)
 379            	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 380                  else if (type == async_messages::CIMSERVICE_RESUME)
 381            	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 382                  else if ( type == async_messages::ASYNC_OP_START)
 383            	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 384                  else 
 385                  {
 386            	 // we don't handle this request message 
 387            	 _make_response(req, async_results::CIM_NAK );
 388                  }
 389 mday  1.1     }
 390            }
 391            
 392 mday  1.17 
 393            Boolean MessageQueueService::_enqueueResponse(
 394               Message* request, 
 395               Message* response)
 396 mday  1.18    
 397 mday  1.17 {
 398 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 399                                "MessageQueueService::_enqueueResponse");
 400 mday  1.25 
 401               if( request->getMask() & message_mask::ha_async)
 402               {
 403                  if (response->getMask() & message_mask::ha_async )
 404                  {
 405            	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 406            				static_cast<AsyncReply *>(response), 
 407            				ASYNC_OPSTATE_COMPLETE, 0 );
 408 kumpf 1.37          PEG_METHOD_EXIT();
 409 mday  1.25 	 return true;
 410                  }
 411               }
 412               
 413 mday  1.17    if(request->_async != 0 )
 414               {
 415                  Uint32 mask = request->_async->getMask();
 416 mday  1.18       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 417                  
 418                  AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 419                  AsyncOpNode *op = async->op;
 420                  request->_async = 0;
 421 mday  1.27       // this request is probably going to be deleted !!
 422                  // remove it from the op node 
 423                  op->_request.remove(request);
 424 mday  1.18       
 425                  AsyncLegacyOperationResult *async_result = 
 426            	 new AsyncLegacyOperationResult( 
 427            	    async->getKey(),
 428            	    async->getRouting(),
 429            	    op,
 430            	    response);
 431                  _completeAsyncResponse(async,
 432            			     async_result,
 433            			     ASYNC_OPSTATE_COMPLETE, 
 434            			     0);
 435 kumpf 1.37       PEG_METHOD_EXIT();
 436 mday  1.18       return true;
 437 mday  1.17    }
 438 mday  1.18    
 439               // ensure that the destination queue is in response->dest
 440 kumpf 1.37    PEG_METHOD_EXIT();
 441 mday  1.24    return SendForget(response);
 442 mday  1.18    
 443 mday  1.17 }
 444            
 445 mday  1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
 446 mday  1.1  {
 447 mday  1.19    cimom::_make_response(req, code);
 448 mday  1.1  }
 449            
 450            
 451 mday  1.5  void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 452            						AsyncReply *reply, 
 453            						Uint32 state, 
 454            						Uint32 flag)
 455            {
 456 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 457                                "MessageQueueService::_completeAsyncResponse");
 458            
 459 mday  1.19    cimom::_completeAsyncResponse(request, reply, state, flag);
 460 kumpf 1.37 
 461               PEG_METHOD_EXIT();
 462 mday  1.5  }
 463            
 464            
 465 mday  1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 466            					    Uint32 state, 
 467            					    Uint32 flag, 
 468            					    Uint32 code)
 469            {
 470               cimom::_complete_op_node(op, state, flag, code);
 471            }
 472            
 473 mday  1.5  
 474            Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 475            {
 476 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 477                  return false;
 478               
 479 mday  1.20 // ATTN optimization remove the message checking altogether in the base 
 480            // << Mon Feb 18 14:02:20 2002 mdd >>
 481 mday  1.6     op->lock();
 482               Message *rq = op->_request.next(0);
 483 mday  1.20    Message *rp = op->_response.next(0);
 484 mday  1.6     op->unlock();
 485               
 486 mday  1.22    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 487            	_die.value() == 0  )
 488 mday  1.5     {
 489 mday  1.6        _incoming.insert_last_wait(op);
 490 mday  1.5        return true;
 491               }
 492 mday  1.32 //    else
 493            //    {
 494            //       if(  (rq != 0 && (true == MessageQueueService::messageOK(rq))) || 
 495            // 	   (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&  
 496            // 	   _die.value() == 0)
 497            //       {
 498            // 	 MessageQueueService::_incoming.insert_last_wait(op);
 499            // 	 return true;
 500            //       }
 501            //    }
 502               
 503 mday  1.5     return false;
 504            }
 505            
 506            Boolean MessageQueueService::messageOK(const Message *msg)
 507            {
 508 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 509                  return false;
 510 mday  1.18    return true;
 511            }
 512            
 513 mday  1.29 
 514            // made pure virtual 
 515            // << Wed Mar  6 15:11:31 2002 mdd >> 
 516 mday  1.25 // void MessageQueueService::handleEnqueue(Message *msg)
 517            // {
 518            //    if ( msg )
 519            //       delete msg;
 520            // }
 521 mday  1.5  
 522 mday  1.29 // made pure virtual 
 523            // << Wed Mar  6 15:11:56 2002 mdd >>
 524 mday  1.25 // void MessageQueueService::handleEnqueue(void)
 525            // {
 526            //     Message *msg = dequeue();
 527            //     handleEnqueue(msg);
 528            // }
 529 mday  1.5  
 530 mday  1.1  void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 531            {
 532               // default action is to echo a heartbeat response 
 533               
 534               AsyncReply *reply = 
 535                  new AsyncReply(async_messages::HEARTBEAT,
 536            		     req->getKey(),
 537            		     req->getRouting(),
 538            		     0,
 539            		     req->op, 
 540            		     async_results::OK, 
 541            		     req->resp,
 542            		     false);
 543 mday  1.4     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 544 mday  1.1  }
 545            
 546            
 547            void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 548            { 
 549               ;
 550            }
 551                  
 552            void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 553            {
 554 mday  1.8     
 555               switch( req->ctl )
 556               {
 557                  case AsyncIoctl::IO_CLOSE:
 558                  {
 559            	 // save my bearings 
 560 mday  1.34 	 Thread *myself = req->op->_thread_ptr;
 561            	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 562 mday  1.8  	 
 563            	 // respond to this message.
 564            	 _make_response(req, async_results::OK);
 565            	 // ensure we do not accept any further messages
 566            
 567            	 // ensure we don't recurse on IO_CLOSE
 568            	 if( _incoming_queue_shutdown.value() > 0 )
 569            	    break;
 570            	 
 571            	 // set the closing flag 
 572            	 service->_incoming_queue_shutdown = 1;
 573            	 // empty out the queue
 574            	 while( 1 )
 575            	 {
 576            	    AsyncOpNode *operation;
 577            	    try 
 578            	    {
 579            	       operation = service->_incoming.remove_first();
 580            	    }
 581            	    catch(IPCException & )
 582            	    {
 583 mday  1.8  	       break;
 584            	    }
 585            	    if( operation )
 586            	    {
 587 mday  1.34 	       operation->_thread_ptr = myself;
 588            	       operation->_service_ptr = service;
 589            	       service->_handle_incoming_operation(operation);
 590 mday  1.8  	    }
 591            	    else
 592            	       break;
 593            	 } // message processing loop
 594            
 595            	 // shutdown the AsyncDQueue
 596            	 service->_incoming.shutdown_queue();
 597            	 // exit the thread ! 
 598            	 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 599            	 return;
 600                  }
 601            
 602                  default:
 603            	 _make_response(req, async_results::CIM_NAK);
 604               }
 605 mday  1.1  }
 606 mday  1.8  
 607 mday  1.1  void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 608            {
 609 mday  1.10    // clear the stoped bit and update
 610 mday  1.13    _capabilities &= (~(module_capabilities::stopped));
 611 mday  1.10    _make_response(req, async_results::OK);
 612               // now tell the meta dispatcher we are stopped 
 613               update_service(_capabilities, _mask);
 614            
 615 mday  1.1  }
 616            void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 617            {
 618 mday  1.10    // set the stopeed bit and update
 619               _capabilities |= module_capabilities::stopped;
 620               _make_response(req, async_results::CIM_STOPPED);
 621               // now tell the meta dispatcher we are stopped 
 622               update_service(_capabilities, _mask);
 623               
 624 mday  1.1  }
 625            void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 626            {
 627 mday  1.10    // set the paused bit and update
 628 mday  1.13    _capabilities |= module_capabilities::paused;
 629 mday  1.11    update_service(_capabilities, _mask);
 630 mday  1.10    _make_response(req, async_results::CIM_PAUSED);
 631               // now tell the meta dispatcher we are stopped 
 632 mday  1.1  }
 633            void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 634            {
 635 mday  1.10    // clear the paused  bit and update
 636 mday  1.13    _capabilities &= (~(module_capabilities::paused));
 637 mday  1.11    update_service(_capabilities, _mask);
 638 mday  1.10    _make_response(req, async_results::OK);
 639               // now tell the meta dispatcher we are stopped 
 640 mday  1.1  }
 641                  
 642            void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 643            {
 644               _make_response(req, async_results::CIM_NAK);
 645            }
 646            
 647            void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 648            {
 649 mday  1.14    ;
 650            }
 651            
 652 mday  1.10 
 653 mday  1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 654            {
 655               // remove the legacy message from the request and enqueue it to its destination
 656               Uint32 result = async_results::CIM_NAK;
 657               
 658 mday  1.25    Message *legacy = req->_act;
 659 mday  1.14    if ( legacy != 0 )
 660               {
 661 mday  1.25       MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 662 mday  1.14       if( queue != 0 )
 663                  {
 664 mday  1.25 	 if(queue->isAsync() == true )
 665            	 {
 666            	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 667            	 }
 668            	 else 
 669            	 {
 670            	    // Enqueue the response:
 671            	    queue->enqueue(req->get_action());
 672            	 }
 673            	 
 674 mday  1.14 	 result = async_results::OK;
 675                  }
 676               }
 677               _make_response(req, result);
 678            }
 679            
 680            void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 681            {
 682               ;
 683 mday  1.1  }
 684            
 685            AsyncOpNode *MessageQueueService::get_op(void)
 686            {
 687 mday  1.4     AsyncOpNode *op = new AsyncOpNode();
 688 mday  1.1     
 689 mday  1.9     op->_state = ASYNC_OPSTATE_UNKNOWN;
 690               op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 691 mday  1.4     
 692 mday  1.1     return op;
 693            }
 694            
 695            void MessageQueueService::return_op(AsyncOpNode *op)
 696            {
 697               PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 698 mday  1.4     delete op;
 699 mday  1.1  }
 700            
 701 mday  1.18 
 702 mday  1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 703            				       Uint32 destination)
 704            {
 705               PEGASUS_ASSERT(op != 0 );
 706 mday  1.30    op->lock();
 707               op->_op_dest = MessageQueue::lookup(destination);
 708 mday  1.29    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 709               op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 710 mday  1.30    op->unlock();
 711               if ( op->_op_dest == 0 )
 712                  return false;
 713                  
 714 mday  1.29    return  _meta_dispatcher->route_async(op);
 715            }
 716            
 717 mday  1.39  
 718 mday  1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 719            				       Uint32 destination,
 720 mday  1.18 				       void (*callback)(AsyncOpNode *, 
 721 mday  1.32 							MessageQueue *,
 722 mday  1.30 							void *),
 723            				       MessageQueue *callback_response_q,
 724            				       void *callback_ptr)
 725 mday  1.20 { 
 726 mday  1.21    PEGASUS_ASSERT(op != 0 && callback != 0 );
 727 mday  1.18    
 728 mday  1.21    // get the queue handle for the destination
 729            
 730 mday  1.30    op->lock();
 731 mday  1.32    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 732 mday  1.22    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 733               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 734               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 735 mday  1.30    // initialize the callback data
 736 mday  1.32    op->_async_callback = callback;   // callback function to be executed by recpt. of response
 737               op->_callback_node = op;          // the op node
 738               op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 739               op->_callback_ptr = callback_ptr;   // user data for callback
 740               op->_callback_request_q = this;     // I am the originator of this request
 741 mday  1.30    
 742               op->unlock();
 743               if(op->_op_dest == 0) 
 744                  return false;
 745 mday  1.21    
 746               return  _meta_dispatcher->route_async(op);
 747 mday  1.18 }
 748            
 749            
 750 mday  1.39 Boolean MessageQueueService::SendAsync(Message *msg, 
 751            				       Uint32 destination,
 752            				       void (*callback)(Message *response, 
 753            							void *handle, 
 754            							void *parameter),
 755            				       void *handle, 
 756            				       void *parameter)
 757            {
 758               if(msg == NULL)
 759                  return false;
 760               if(callback == NULL)
 761                  return SendForget(msg);
 762               AsyncOpNode *op = get_op();
 763 mday  1.43    msg->dest = destination;
 764 mday  1.39    if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 765               {
 766                  op->release();
 767                  return_op(op);
 768                  return false;
 769               }
 770               op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 771               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 772               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 773               op->__async_callback = callback;
 774               op->_callback_node = op;
 775               op->_callback_handle = handle;
 776               op->_callback_parameter = parameter;
 777               op->_callback_response_q = this;
 778               
 779            
 780               if( ! (msg->getMask() & message_mask::ha_async) )
 781               {
 782                  AsyncLegacyOperationStart *wrapper = 
 783            	 new AsyncLegacyOperationStart(get_next_xid(),
 784            				       op, 
 785 mday  1.39 				       destination, 
 786            				       msg, 
 787            				       destination);
 788               }
 789               else 
 790               {
 791                  op->_request.insert_first(msg);
 792                  (static_cast<AsyncMessage *>(msg))->op = op;
 793               }
 794               
 795               _callback.insert_last(op);
 796               return _meta_dispatcher->route_async(op);
 797            }
 798            
 799            
 800 mday  1.18 Boolean MessageQueueService::SendForget(Message *msg)
 801            {
 802            
 803 mday  1.24    
 804 mday  1.18    AsyncOpNode *op = 0;
 805 mday  1.22    Uint32 mask = msg->getMask();
 806               
 807               if (mask & message_mask::ha_async)
 808 mday  1.18    {
 809                  op = (static_cast<AsyncMessage *>(msg))->op ;
 810               }
 811 mday  1.22 
 812 mday  1.18    if( op == 0 )
 813 mday  1.20    {
 814 mday  1.18       op = get_op();
 815 mday  1.20       op->_request.insert_first(msg);
 816 mday  1.22       if (mask & message_mask::ha_async)
 817            	 (static_cast<AsyncMessage *>(msg))->op = op;
 818 mday  1.20    }
 819 mday  1.30    op->_op_dest = MessageQueue::lookup(msg->dest);
 820 mday  1.22    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 821 mday  1.41    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 822            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 823 mday  1.22    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 824 mday  1.30    if ( op->_op_dest == 0 )
 825 mday  1.39    {
 826                  op->release();
 827                  return_op(op);
 828 mday  1.21       return false;
 829 mday  1.39    }
 830 mday  1.24    
 831 mday  1.18    // now see if the meta dispatcher will take it
 832 mday  1.30    return  _meta_dispatcher->route_async(op);
 833 mday  1.18 }
 834 mday  1.2  
 835 mday  1.1  
 836 mday  1.4  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 837 mday  1.1  {
 838 mday  1.4     if ( request == 0 )
 839                  return 0 ;
 840 mday  1.5  
 841               Boolean destroy_op = false;
 842               
 843               if (request->op == false)
 844               {
 845                  request->op = get_op();
 846 mday  1.7        request->op->_request.insert_first(request);
 847 mday  1.5        destroy_op = true;
 848               }
 849 mday  1.4     
 850 mday  1.33    request->block = false;
 851 mday  1.35    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 852 mday  1.33    SendAsync(request->op, 
 853            	     request->dest,
 854 mday  1.36  	     _sendwait_callback,
 855 mday  1.33 	     this,
 856            	     (void *)0);
 857 mday  1.4     
 858 mday  1.33    request->op->_client_sem.wait();
 859 mday  1.6     request->op->lock();
 860               AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 861               rpl->op = 0;
 862               request->op->unlock();
 863               
 864 mday  1.5     if( destroy_op == true)
 865               {
 866 mday  1.6        request->op->lock();
 867                  request->op->_request.remove(request);
 868                  request->op->_state |= ASYNC_OPSTATE_RELEASED;
 869                  request->op->unlock();
 870                  return_op(request->op);
 871 mday  1.7        request->op = 0;
 872 mday  1.5     }
 873               return rpl;
 874 mday  1.1  }
 875            
 876            
 877            Boolean MessageQueueService::register_service(String name, 
 878            					      Uint32 capabilities, 
 879            					      Uint32 mask)
 880            
 881            {
 882               RegisterCimService *msg = new RegisterCimService(get_next_xid(),
 883 mday  1.5  						    0, 
 884 mday  1.1  						    true, 
 885            						    name, 
 886            						    capabilities, 
 887            						    mask,
 888            						    _queueId);
 889 mday  1.44    msg->dest = CIMOM_Q_ID;
 890               
 891 mday  1.1     Boolean registered = false;
 892 mday  1.7     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
 893 mday  1.1     
 894 mday  1.2     if ( reply != 0 )
 895 mday  1.1     {
 896                  if(reply->getMask() & message_mask:: ha_async)
 897                  {
 898            	 if(reply->getMask() & message_mask::ha_reply)
 899            	 {
 900 mday  1.15 	    if(reply->result == async_results::OK || 
 901            	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
 902 mday  1.1  	       registered = true;
 903            	 }
 904                  }
 905                  
 906 mday  1.7        delete reply; 
 907 mday  1.1     }
 908 mday  1.5     delete msg;
 909 mday  1.1     return registered;
 910            }
 911            
 912            Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 913            {
 914               
 915               
 916               UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
 917 mday  1.5  						0, 
 918 mday  1.1  						true, 
 919            						_queueId,
 920            						_capabilities, 
 921            						_mask);
 922               Boolean registered = false;
 923 mday  1.2  
 924               AsyncMessage *reply = SendWait(msg);
 925               if (reply)
 926 mday  1.1     {
 927                  if(reply->getMask() & message_mask:: ha_async)
 928                  {
 929            	 if(reply->getMask() & message_mask::ha_reply)
 930            	 {
 931 mday  1.2  	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
 932 mday  1.1  	       registered = true;
 933            	 }
 934                  }
 935                  delete reply;
 936               }
 937 mday  1.5     delete msg;
 938 mday  1.1     return registered;
 939            }
 940            
 941            
 942            Boolean MessageQueueService::deregister_service(void)
 943            {
 944 mday  1.3  
 945 mday  1.5     _meta_dispatcher->deregister_module(_queueId);
 946               return true;
 947 mday  1.1  }
 948            
 949            
 950            void MessageQueueService::find_services(String name, 
 951            					Uint32 capabilities, 
 952            					Uint32 mask, 
 953            					Array<Uint32> *results)
 954            {
 955               
 956               if( results == 0 )
 957                  throw NullPointer();
 958 mday  1.5      
 959 mday  1.1     results->clear();
 960               
 961               FindServiceQueue *req = 
 962                  new FindServiceQueue(get_next_xid(), 
 963 mday  1.5  			   0, 
 964 mday  1.1  			   _queueId, 
 965            			   true, 
 966            			   name, 
 967            			   capabilities, 
 968            			   mask);
 969 mday  1.44    
 970               req->dest = CIMOM_Q_ID;
 971 mday  1.1     
 972 mday  1.2     AsyncMessage *reply = SendWait(req); 
 973               if(reply)
 974 mday  1.1     {
 975                  if( reply->getMask() & message_mask::ha_async)
 976                  {
 977            	 if(reply->getMask() & message_mask::ha_reply)
 978            	 {
 979            	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
 980            	    {
 981            	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
 982            		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
 983            	    }
 984            	 }
 985                  }
 986                  delete reply;
 987               }
 988 mday  1.5     delete req;
 989 mday  1.1     return ;
 990            }
 991            
 992            void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
 993            {
 994               if(result == 0)
 995                  throw NullPointer();
 996               
 997               EnumerateService *req 
 998                  = new EnumerateService(get_next_xid(),
 999 mday  1.5  			     0, 
1000 mday  1.1  			     _queueId, 
1001            			     true, 
1002            			     queue);
1003               
1004 mday  1.2     AsyncMessage *reply = SendWait(req);
1005 mday  1.1     
1006 mday  1.2     if (reply)
1007 mday  1.1     {
1008                  Boolean found = false;
1009                  
1010                  if( reply->getMask() & message_mask::ha_async)
1011                  {
1012            	 if(reply->getMask() & message_mask::ha_reply)
1013            	 {
1014            	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1015            	    {
1016            	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1017            	       {
1018            		  if( found == false)
1019            		  {
1020            		     found = true;
1021            		     
1022            		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1023            		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1024            		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1025            		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1026            		  }
1027            	       }
1028 mday  1.1  	    }
1029            	 }
1030                  }
1031                  delete reply;
1032               }
1033 mday  1.5     delete req;
1034               
1035 mday  1.1     return;
1036            }
1037            
1038            Uint32 MessageQueueService::get_next_xid(void)
1039            {
1040               _xid++;
1041               return _xid.value();
1042            }
1043            
1044            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2