(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 mday  1.53 //%////-*-c++-*-////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 kumpf 1.54 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
   4            // The Open Group, Tivoli Systems
   5 mday  1.1  //
   6            // Permission is hereby granted, free of charge, to any person obtaining a copy
   7            // of this software and associated documentation files (the "Software"), to
   8            // deal in the Software without restriction, including without limitation the
   9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10            // sell copies of the Software, and to permit persons to whom the Software is
  11            // furnished to do so, subject to the following conditions:
  12 mday  1.53 //
  13 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  14            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  16            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  19            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21            //
  22            //==============================================================================
  23            //
  24            // Author: Mike Day (mdday@us.ibm.com)
  25            //
  26            // Modified By:
  27            //
  28            //%/////////////////////////////////////////////////////////////////////////////
  29            
  30            #include "MessageQueueService.h"
  31 mday  1.22 #include <Pegasus/Common/Tracer.h>
  32 mday  1.1  
  33            PEGASUS_NAMESPACE_BEGIN
  34            
  35 mday  1.15  
  36            cimom *MessageQueueService::_meta_dispatcher = 0;
  37            AtomicInt MessageQueueService::_service_count = 0;
  38            AtomicInt MessageQueueService::_xid(1);
  39 mday  1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  40 mday  1.15 
  41 mday  1.58 static struct timeval create_time = {0, 1};
  42 mday  1.55 static struct timeval destroy_time = {5, 0};
  43 mday  1.56 static struct timeval deadlock_time = {1000, 0};
  44 mday  1.53 
  45 mday  1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
  46 mday  1.53 
  47            DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  48            
  49            int MessageQueueService::kill_idle_threads(void)
  50            {
  51               static struct timeval now, last;
  52               gettimeofday(&now, NULL);
  53 mday  1.56    int dead_threads = 0;
  54 mday  1.53    
  55               if( now.tv_sec - last.tv_sec > 0 )
  56               {
  57                  gettimeofday(&last, NULL);
  58 mday  1.56       try 
  59                  {
  60            	 dead_threads =  _thread_pool->kill_dead_threads();
  61                  }
  62                  catch(IPCException& )
  63                  {
  64            
  65                  }
  66 mday  1.53    }
  67 mday  1.56    return dead_threads;
  68 mday  1.53 }
  69            
  70            PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
  71            {
  72               Thread *myself = reinterpret_cast<Thread *>(parm);
  73               
  74               DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
  75               while ( _stop_polling.value()  == 0 ) 
  76               {
  77                  _polling_sem.wait();
  78                  list->lock();
  79                  MessageQueueService *service = list->next(0);
  80                  while(service != NULL)
  81                  {
  82            	 if(service->_incoming.count() > 0 )
  83            	 {
  84 mday  1.55 	    _thread_pool->allocate_and_awaken(service, _req_proc);
  85 mday  1.53 //	    service->_req_proc(service);
  86            	 }
  87            	 service = list->next(service);
  88                  }
  89                  list->unlock();
  90               }
  91               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
  92               return(0);
  93            }
  94            
  95            Thread MessageQueueService::_polling_thread(polling_routine, 
  96            					    reinterpret_cast<void *>(&_polling_list), 
  97            					    false);
  98            
  99            
 100            Semaphore MessageQueueService::_polling_sem(0);
 101            AtomicInt MessageQueueService::_stop_polling(0);
 102            
 103 mday  1.15 
 104 mday  1.1  MessageQueueService::MessageQueueService(const char *name, 
 105            					 Uint32 queueID, 
 106            					 Uint32 capabilities, 
 107            					 Uint32 mask) 
 108 mday  1.15    : Base(name, true,  queueID),
 109 mday  1.22      
 110 mday  1.1       _mask(mask),
 111 mday  1.4       _die(0),
 112 mday  1.41      _incoming(true, 0),
 113 mday  1.39      _callback(true),
 114 mday  1.7       _incoming_queue_shutdown(0),
 115 mday  1.39      _callback_ready(0),
 116                 _req_thread(_req_proc, this, false),
 117                 _callback_thread(_callback_proc, this, false)
 118 mday  1.53 
 119 mday  1.1  { 
 120 mday  1.22    _capabilities = (capabilities | module_capabilities::async);
 121               
 122 mday  1.1     _default_op_timeout.tv_sec = 30;
 123               _default_op_timeout.tv_usec = 100;
 124 mday  1.15 
 125               _meta_dispatcher_mutex.lock(pegasus_thread_self());
 126               
 127               if( _meta_dispatcher == 0 )
 128               {
 129                  PEGASUS_ASSERT( _service_count.value() == 0 );
 130                  _meta_dispatcher = new cimom();
 131                  if (_meta_dispatcher == NULL )
 132                  {
 133            	 _meta_dispatcher_mutex.unlock();
 134            	 throw NullPointer();
 135                  }
 136 mday  1.58       _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
 137 mday  1.55 				    create_time, destroy_time, deadlock_time);  
 138 mday  1.53       _polling_thread.run();
 139 mday  1.15    }
 140               _service_count++;
 141            
 142               if( false == register_service(name, _capabilities, _mask) )
 143               {
 144                  _meta_dispatcher_mutex.unlock();
 145                  throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
 146               }
 147               
 148 mday  1.53    _polling_list.insert_last(this);
 149               
 150 mday  1.15    _meta_dispatcher_mutex.unlock();
 151 mday  1.39 //   _callback_thread.run();
 152               
 153 mday  1.53 //   _req_thread.run();
 154 mday  1.1  }
 155            
 156 mday  1.4  
 157 mday  1.1  MessageQueueService::~MessageQueueService(void)
 158            {
 159               _die = 1;
 160 mday  1.7     if (_incoming_queue_shutdown.value() == 0 )
 161 mday  1.16    {
 162 mday  1.32       _shutdown_incoming_queue();
 163 mday  1.16    }
 164 mday  1.39    _callback_ready.signal();
 165 mday  1.53 //   _callback_thread.join();
 166 mday  1.39    
 167 mday  1.15    _meta_dispatcher_mutex.lock(pegasus_thread_self());
 168               _service_count--;
 169               if (_service_count.value() == 0 )
 170               {
 171 mday  1.53       _stop_polling++;
 172                  _polling_sem.signal();
 173                  _polling_thread.join();
 174 mday  1.15       _meta_dispatcher->_shutdown_routed_queue();
 175                  delete _meta_dispatcher;
 176 kumpf 1.45       _meta_dispatcher = 0;
 177 mday  1.53 
 178 mday  1.15    }
 179               _meta_dispatcher_mutex.unlock();
 180 mday  1.53    _polling_list.remove(this);
 181            } 
 182 mday  1.1  
 183 mday  1.7  void MessageQueueService::_shutdown_incoming_queue(void)
 184            {
 185               
 186 mday  1.9     if (_incoming_queue_shutdown.value() > 0 )
 187                  return ;
 188 mday  1.8     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 189            				    0, 
 190            				    _queueId, 
 191            				    _queueId, 
 192            				    true, 
 193            				    AsyncIoctl::IO_CLOSE, 
 194            				    0, 
 195            				    0);
 196 mday  1.9  
 197 mday  1.8     msg->op = get_op();
 198 mday  1.41    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 199               msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 200            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 201 mday  1.32    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 202 mday  1.41 
 203 mday  1.32    msg->op->_op_dest = this;
 204 mday  1.8     msg->op->_request.insert_first(msg);
 205               
 206               _incoming.insert_last_wait(msg->op);
 207 mday  1.32 
 208 mday  1.53 //   _req_thread.join();
 209 mday  1.16    
 210 mday  1.7  }
 211            
 212 mday  1.22 
 213            
 214            void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 215            {
 216 kumpf 1.28    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 217            
 218 mday  1.22    Base::enqueue(msg);
 219 kumpf 1.28 
 220               PEG_METHOD_EXIT();
 221 mday  1.22 }
 222            
 223            
 224 mday  1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 225            {
 226               Thread *myself = reinterpret_cast<Thread *>(parm);
 227               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 228               AsyncOpNode *operation = 0;
 229               
 230               while ( service->_die.value() == 0 ) 
 231               {
 232                  service->_callback_ready.wait();
 233                  
 234                  service->_callback.lock();
 235                  operation = service->_callback.next(0);
 236                  while( operation != NULL)
 237                  {
 238            	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 239            	 {
 240            	    operation = service->_callback.remove_no_lock(operation);
 241            	    PEGASUS_ASSERT(operation != NULL);
 242            	    operation->_thread_ptr = myself;
 243            	    operation->_service_ptr = service;
 244            	    service->_handle_async_callback(operation);
 245 mday  1.39 	    break;
 246            	 }
 247            	 operation = service->_callback.next(operation);
 248                  }
 249                  service->_callback.unlock();
 250               }
 251               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 252               return(0);
 253            }
 254            
 255 mday  1.22 
 256 mday  1.5  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 257 mday  1.1  {
 258 mday  1.53    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 259 mday  1.5     // pull messages off the incoming queue and dispatch them. then 
 260               // check pending messages that are non-blocking
 261 mday  1.7     AsyncOpNode *operation = 0;
 262               
 263 mday  1.56    if ( service->_die.value() == 0 ) 
 264                {
 265 mday  1.41 	 try 
 266            	 {
 267 mday  1.53 	    operation = service->_incoming.remove_first();
 268 mday  1.41 	 }
 269            	 catch(ListClosed & )
 270            	 {
 271 mday  1.56 	    operation = 0;
 272            	    
 273            	    return(0);
 274 mday  1.41 	 }
 275            	 if( operation )
 276            	 {
 277            	    operation->_service_ptr = service;
 278            	    service->_handle_incoming_operation(operation);
 279            	 }
 280 mday  1.56     }
 281 mday  1.44 
 282 mday  1.5     return(0);
 283 mday  1.1  }
 284            
 285 mday  1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
 286            {
 287               return _callback.count();
 288            }
 289            
 290            
 291            
 292 mday  1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 293            					     MessageQueue *q, 
 294            					     void *parm)
 295            {
 296               op->_client_sem.signal();
 297            }
 298            
 299 mday  1.30 
 300            // callback function is responsible for cleaning up all resources
 301            // including op, op->_callback_node, and op->_callback_ptr
 302 mday  1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 303            {
 304 mday  1.39    if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 305               {
 306            
 307                  Message *msg = op->get_request();
 308                  if( msg && ( msg->getMask() & message_mask::ha_async))
 309                  {
 310            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 311            	 {
 312            	    AsyncLegacyOperationStart *wrapper = 
 313            	       static_cast<AsyncLegacyOperationStart *>(msg);
 314            	    msg = wrapper->get_action();
 315            	    delete wrapper;
 316            	 }
 317            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 318            	 {
 319            	    AsyncModuleOperationStart *wrapper = 
 320            	       static_cast<AsyncModuleOperationStart *>(msg);
 321            	    msg = wrapper->get_action();
 322            	    delete wrapper;
 323            	 }
 324            	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 325 mday  1.39 	 {
 326            	    AsyncOperationStart *wrapper = 
 327            	       static_cast<AsyncOperationStart *>(msg);
 328            	    msg = wrapper->get_action();
 329            	    delete wrapper;
 330            	 }
 331            	 delete msg;
 332                  }
 333            
 334                  msg = op->get_response();
 335                  if( msg && ( msg->getMask() & message_mask::ha_async))
 336                  {
 337            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 338            	 {
 339            	    AsyncLegacyOperationResult *wrapper = 
 340            	       static_cast<AsyncLegacyOperationResult *>(msg);
 341            	    msg = wrapper->get_result();
 342            	    delete wrapper;
 343            	 }
 344            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 345            	 {
 346 mday  1.39 	    AsyncModuleOperationResult *wrapper = 
 347            	       static_cast<AsyncModuleOperationResult *>(msg);
 348            	    msg = wrapper->get_result();
 349            	    delete wrapper;
 350            	 }
 351                  }
 352                  void (*callback)(Message *, void *, void *) = op->__async_callback;
 353                  void *handle = op->_callback_handle;
 354                  void *parm = op->_callback_parameter;
 355                  op->release();
 356                  return_op(op);
 357                  callback(msg, handle, parm);
 358               }
 359               else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 360               {
 361                  // note that _callback_node may be different from op 
 362                  // op->_callback_response_q is a "this" pointer we can use for 
 363                  // static callback methods
 364                  op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 365               }
 366 mday  1.22 }
 367            
 368 mday  1.1  
 369 mday  1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 370            //						     Thread *thread, 
 371            //						     MessageQueue *queue)
 372 mday  1.1  {
 373 mday  1.5     if ( operation != 0 )
 374               {
 375 mday  1.29       
 376 mday  1.22 // ATTN: optimization 
 377            // << Tue Feb 19 14:10:38 2002 mdd >>
 378 mday  1.6        operation->lock();
 379 mday  1.29             
 380 mday  1.6        Message *rq = operation->_request.next(0);
 381 mday  1.29      
 382 mday  1.31 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 383            // move this to the bottom of the loop when the majority of 
 384            // messages become async messages. 
 385            
 386 mday  1.18       // divert legacy messages to handleEnqueue
 387 mday  1.29       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 388 mday  1.18       {
 389            	 rq = operation->_request.remove_first() ;
 390            	 operation->unlock();
 391            	 // delete the op node 
 392 mday  1.39 	 operation->release();
 393            	 return_op( operation);
 394 mday  1.24 
 395 mday  1.26 	 handleEnqueue(rq);
 396 mday  1.18 	 return;
 397                  }
 398            
 399 mday  1.39       if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 400            	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 401 mday  1.29 	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 402                  {
 403 mday  1.49 	 
 404 mday  1.29 	 operation->unlock();
 405            	 _handle_async_callback(operation);
 406                  }
 407                  else 
 408                  {
 409            	 PEGASUS_ASSERT(rq != 0 );
 410            	 // ATTN: optimization
 411            	 // << Wed Mar  6 15:00:39 2002 mdd >>
 412            	 // put thread and queue into the asyncopnode structure. 
 413 mday  1.34          //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
 414                     //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
 415            	 // done << Tue Mar 12 14:49:07 2002 mdd >>
 416 mday  1.29 	 operation->unlock();
 417            	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 418                  }
 419 mday  1.5     }
 420               return;
 421 mday  1.1  }
 422            
 423            void MessageQueueService::_handle_async_request(AsyncRequest *req)
 424            {
 425 mday  1.4     if ( req != 0 )
 426               {
 427                  req->op->processing();
 428 mday  1.1     
 429 mday  1.4        Uint32 type = req->getType();
 430                  if( type == async_messages::HEARTBEAT )
 431            	 handle_heartbeat_request(req);
 432                  else if (type == async_messages::IOCTL)
 433            	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 434                  else if (type == async_messages::CIMSERVICE_START)
 435            	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 436                  else if (type == async_messages::CIMSERVICE_STOP)
 437            	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 438                  else if (type == async_messages::CIMSERVICE_PAUSE)
 439            	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 440                  else if (type == async_messages::CIMSERVICE_RESUME)
 441            	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 442                  else if ( type == async_messages::ASYNC_OP_START)
 443            	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 444                  else 
 445                  {
 446            	 // we don't handle this request message 
 447            	 _make_response(req, async_results::CIM_NAK );
 448                  }
 449 mday  1.1     }
 450            }
 451            
 452 mday  1.17 
 453            Boolean MessageQueueService::_enqueueResponse(
 454               Message* request, 
 455               Message* response)
 456 mday  1.18    
 457 mday  1.17 {
 458 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 459                                "MessageQueueService::_enqueueResponse");
 460 mday  1.25 
 461               if( request->getMask() & message_mask::ha_async)
 462               {
 463                  if (response->getMask() & message_mask::ha_async )
 464                  {
 465            	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 466            				static_cast<AsyncReply *>(response), 
 467            				ASYNC_OPSTATE_COMPLETE, 0 );
 468 kumpf 1.37          PEG_METHOD_EXIT();
 469 mday  1.25 	 return true;
 470                  }
 471               }
 472               
 473 mday  1.17    if(request->_async != 0 )
 474               {
 475                  Uint32 mask = request->_async->getMask();
 476 mday  1.18       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 477                  
 478                  AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 479                  AsyncOpNode *op = async->op;
 480                  request->_async = 0;
 481 mday  1.27       // this request is probably going to be deleted !!
 482                  // remove it from the op node 
 483                  op->_request.remove(request);
 484 mday  1.57     
 485 mday  1.18       
 486                  AsyncLegacyOperationResult *async_result = 
 487            	 new AsyncLegacyOperationResult( 
 488            	    async->getKey(),
 489            	    async->getRouting(),
 490            	    op,
 491            	    response);
 492                  _completeAsyncResponse(async,
 493            			     async_result,
 494            			     ASYNC_OPSTATE_COMPLETE, 
 495            			     0);
 496 kumpf 1.37       PEG_METHOD_EXIT();
 497 mday  1.18       return true;
 498 mday  1.17    }
 499 mday  1.18    
 500               // ensure that the destination queue is in response->dest
 501 kumpf 1.37    PEG_METHOD_EXIT();
 502 mday  1.24    return SendForget(response);
 503 mday  1.18    
 504 mday  1.17 }
 505            
 506 mday  1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
 507 mday  1.1  {
 508 mday  1.19    cimom::_make_response(req, code);
 509 mday  1.1  }
 510            
 511            
 512 mday  1.5  void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 513            						AsyncReply *reply, 
 514            						Uint32 state, 
 515            						Uint32 flag)
 516            {
 517 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 518                                "MessageQueueService::_completeAsyncResponse");
 519            
 520 mday  1.19    cimom::_completeAsyncResponse(request, reply, state, flag);
 521 kumpf 1.37 
 522               PEG_METHOD_EXIT();
 523 mday  1.5  }
 524            
 525            
 526 mday  1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 527            					    Uint32 state, 
 528            					    Uint32 flag, 
 529            					    Uint32 code)
 530            {
 531               cimom::_complete_op_node(op, state, flag, code);
 532            }
 533            
 534 mday  1.5  
 535            Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 536            {
 537 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 538                  return false;
 539               
 540 mday  1.20 // ATTN optimization remove the message checking altogether in the base 
 541            // << Mon Feb 18 14:02:20 2002 mdd >>
 542 mday  1.6     op->lock();
 543               Message *rq = op->_request.next(0);
 544 mday  1.20    Message *rp = op->_response.next(0);
 545 mday  1.6     op->unlock();
 546               
 547 mday  1.22    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 548            	_die.value() == 0  )
 549 mday  1.5     {
 550 mday  1.6        _incoming.insert_last_wait(op);
 551 mday  1.53       _polling_sem.signal();
 552 mday  1.5        return true;
 553               }
 554 mday  1.32 //    else
 555            //    {
 556            //       if(  (rq != 0 && (true == MessageQueueService::messageOK(rq))) || 
 557            // 	   (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&  
 558            // 	   _die.value() == 0)
 559            //       {
 560            // 	 MessageQueueService::_incoming.insert_last_wait(op);
 561            // 	 return true;
 562            //       }
 563            //    }
 564               
 565 mday  1.5     return false;
 566            }
 567            
 568            Boolean MessageQueueService::messageOK(const Message *msg)
 569            {
 570 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 571                  return false;
 572 mday  1.18    return true;
 573            }
 574            
 575 mday  1.29 
 576            // made pure virtual 
 577            // << Wed Mar  6 15:11:31 2002 mdd >> 
 578 mday  1.25 // void MessageQueueService::handleEnqueue(Message *msg)
 579            // {
 580            //    if ( msg )
 581            //       delete msg;
 582            // }
 583 mday  1.5  
 584 mday  1.29 // made pure virtual 
 585            // << Wed Mar  6 15:11:56 2002 mdd >>
 586 mday  1.25 // void MessageQueueService::handleEnqueue(void)
 587            // {
 588            //     Message *msg = dequeue();
 589            //     handleEnqueue(msg);
 590            // }
 591 mday  1.5  
 592 mday  1.1  void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 593            {
 594               // default action is to echo a heartbeat response 
 595               
 596               AsyncReply *reply = 
 597                  new AsyncReply(async_messages::HEARTBEAT,
 598            		     req->getKey(),
 599            		     req->getRouting(),
 600            		     0,
 601            		     req->op, 
 602            		     async_results::OK, 
 603            		     req->resp,
 604            		     false);
 605 mday  1.4     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 606 mday  1.1  }
 607            
 608            
 609            void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 610            { 
 611               ;
 612            }
 613                  
 614            void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 615            {
 616 mday  1.8     
 617               switch( req->ctl )
 618               {
 619                  case AsyncIoctl::IO_CLOSE:
 620                  {
 621            	 // save my bearings 
 622 mday  1.53 //	 Thread *myself = req->op->_thread_ptr;
 623 mday  1.34 	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 624 mday  1.8  	 
 625            	 // respond to this message.
 626            	 _make_response(req, async_results::OK);
 627            	 // ensure we do not accept any further messages
 628            
 629            	 // ensure we don't recurse on IO_CLOSE
 630            	 if( _incoming_queue_shutdown.value() > 0 )
 631            	    break;
 632            	 
 633            	 // set the closing flag 
 634            	 service->_incoming_queue_shutdown = 1;
 635            	 // empty out the queue
 636            	 while( 1 )
 637            	 {
 638            	    AsyncOpNode *operation;
 639            	    try 
 640            	    {
 641            	       operation = service->_incoming.remove_first();
 642            	    }
 643            	    catch(IPCException & )
 644            	    {
 645 mday  1.8  	       break;
 646            	    }
 647            	    if( operation )
 648            	    {
 649 mday  1.53 //	       operation->_thread_ptr = myself;
 650 mday  1.34 	       operation->_service_ptr = service;
 651            	       service->_handle_incoming_operation(operation);
 652 mday  1.8  	    }
 653            	    else
 654            	       break;
 655            	 } // message processing loop
 656            
 657            	 // shutdown the AsyncDQueue
 658            	 service->_incoming.shutdown_queue();
 659            	 // exit the thread ! 
 660 mday  1.53 //	 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 661 mday  1.8  	 return;
 662                  }
 663            
 664                  default:
 665            	 _make_response(req, async_results::CIM_NAK);
 666               }
 667 mday  1.1  }
 668 mday  1.8  
 669 mday  1.1  void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 670            {
 671 mday  1.10    // clear the stoped bit and update
 672 mday  1.13    _capabilities &= (~(module_capabilities::stopped));
 673 mday  1.10    _make_response(req, async_results::OK);
 674               // now tell the meta dispatcher we are stopped 
 675               update_service(_capabilities, _mask);
 676            
 677 mday  1.1  }
 678            void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 679            {
 680 mday  1.10    // set the stopeed bit and update
 681               _capabilities |= module_capabilities::stopped;
 682               _make_response(req, async_results::CIM_STOPPED);
 683               // now tell the meta dispatcher we are stopped 
 684               update_service(_capabilities, _mask);
 685               
 686 mday  1.1  }
 687            void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 688            {
 689 mday  1.10    // set the paused bit and update
 690 mday  1.13    _capabilities |= module_capabilities::paused;
 691 mday  1.11    update_service(_capabilities, _mask);
 692 mday  1.10    _make_response(req, async_results::CIM_PAUSED);
 693               // now tell the meta dispatcher we are stopped 
 694 mday  1.1  }
 695            void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 696            {
 697 mday  1.10    // clear the paused  bit and update
 698 mday  1.13    _capabilities &= (~(module_capabilities::paused));
 699 mday  1.11    update_service(_capabilities, _mask);
 700 mday  1.10    _make_response(req, async_results::OK);
 701               // now tell the meta dispatcher we are stopped 
 702 mday  1.1  }
 703                  
 704            void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 705            {
 706               _make_response(req, async_results::CIM_NAK);
 707            }
 708            
 709            void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 710            {
 711 mday  1.14    ;
 712            }
 713            
 714 mday  1.10 
 715 mday  1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 716            {
 717               // remove the legacy message from the request and enqueue it to its destination
 718               Uint32 result = async_results::CIM_NAK;
 719               
 720 mday  1.25    Message *legacy = req->_act;
 721 mday  1.14    if ( legacy != 0 )
 722               {
 723 mday  1.25       MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 724 mday  1.14       if( queue != 0 )
 725                  {
 726 mday  1.25 	 if(queue->isAsync() == true )
 727            	 {
 728            	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 729            	 }
 730            	 else 
 731            	 {
 732            	    // Enqueue the response:
 733            	    queue->enqueue(req->get_action());
 734            	 }
 735            	 
 736 mday  1.14 	 result = async_results::OK;
 737                  }
 738               }
 739               _make_response(req, result);
 740            }
 741            
 742            void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 743            {
 744               ;
 745 mday  1.1  }
 746            
 747            AsyncOpNode *MessageQueueService::get_op(void)
 748            {
 749 mday  1.4     AsyncOpNode *op = new AsyncOpNode();
 750 mday  1.1     
 751 mday  1.9     op->_state = ASYNC_OPSTATE_UNKNOWN;
 752               op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 753 mday  1.4     
 754 mday  1.1     return op;
 755            }
 756            
 757            void MessageQueueService::return_op(AsyncOpNode *op)
 758            {
 759               PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 760 mday  1.4     delete op;
 761 mday  1.1  }
 762            
 763 mday  1.18 
 764 mday  1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 765            				       Uint32 destination)
 766            {
 767               PEGASUS_ASSERT(op != 0 );
 768 mday  1.30    op->lock();
 769               op->_op_dest = MessageQueue::lookup(destination);
 770 mday  1.29    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 771               op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 772 mday  1.30    op->unlock();
 773               if ( op->_op_dest == 0 )
 774                  return false;
 775                  
 776 mday  1.29    return  _meta_dispatcher->route_async(op);
 777            }
 778            
 779 mday  1.39  
 780 mday  1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 781            				       Uint32 destination,
 782 mday  1.18 				       void (*callback)(AsyncOpNode *, 
 783 mday  1.32 							MessageQueue *,
 784 mday  1.30 							void *),
 785            				       MessageQueue *callback_response_q,
 786            				       void *callback_ptr)
 787 mday  1.20 { 
 788 mday  1.21    PEGASUS_ASSERT(op != 0 && callback != 0 );
 789 mday  1.18    
 790 mday  1.21    // get the queue handle for the destination
 791            
 792 mday  1.30    op->lock();
 793 mday  1.32    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 794 mday  1.22    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 795               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 796               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 797 mday  1.30    // initialize the callback data
 798 mday  1.32    op->_async_callback = callback;   // callback function to be executed by recpt. of response
 799               op->_callback_node = op;          // the op node
 800               op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 801               op->_callback_ptr = callback_ptr;   // user data for callback
 802               op->_callback_request_q = this;     // I am the originator of this request
 803 mday  1.30    
 804               op->unlock();
 805               if(op->_op_dest == 0) 
 806                  return false;
 807 mday  1.21    
 808               return  _meta_dispatcher->route_async(op);
 809 mday  1.18 }
 810            
 811            
 812 mday  1.39 Boolean MessageQueueService::SendAsync(Message *msg, 
 813            				       Uint32 destination,
 814            				       void (*callback)(Message *response, 
 815            							void *handle, 
 816            							void *parameter),
 817            				       void *handle, 
 818            				       void *parameter)
 819            {
 820               if(msg == NULL)
 821                  return false;
 822               if(callback == NULL)
 823                  return SendForget(msg);
 824               AsyncOpNode *op = get_op();
 825 mday  1.43    msg->dest = destination;
 826 mday  1.39    if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 827               {
 828                  op->release();
 829                  return_op(op);
 830                  return false;
 831               }
 832               op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 833               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 834               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 835               op->__async_callback = callback;
 836               op->_callback_node = op;
 837               op->_callback_handle = handle;
 838               op->_callback_parameter = parameter;
 839               op->_callback_response_q = this;
 840               
 841            
 842               if( ! (msg->getMask() & message_mask::ha_async) )
 843               {
 844                  AsyncLegacyOperationStart *wrapper = 
 845            	 new AsyncLegacyOperationStart(get_next_xid(),
 846            				       op, 
 847 mday  1.39 				       destination, 
 848            				       msg, 
 849            				       destination);
 850               }
 851               else 
 852               {
 853                  op->_request.insert_first(msg);
 854                  (static_cast<AsyncMessage *>(msg))->op = op;
 855               }
 856               
 857               _callback.insert_last(op);
 858               return _meta_dispatcher->route_async(op);
 859            }
 860            
 861            
 862 mday  1.18 Boolean MessageQueueService::SendForget(Message *msg)
 863            {
 864            
 865 mday  1.24    
 866 mday  1.18    AsyncOpNode *op = 0;
 867 mday  1.22    Uint32 mask = msg->getMask();
 868               
 869               if (mask & message_mask::ha_async)
 870 mday  1.18    {
 871                  op = (static_cast<AsyncMessage *>(msg))->op ;
 872               }
 873 mday  1.22 
 874 mday  1.18    if( op == 0 )
 875 mday  1.20    {
 876 mday  1.18       op = get_op();
 877 mday  1.20       op->_request.insert_first(msg);
 878 mday  1.22       if (mask & message_mask::ha_async)
 879            	 (static_cast<AsyncMessage *>(msg))->op = op;
 880 mday  1.20    }
 881 mday  1.30    op->_op_dest = MessageQueue::lookup(msg->dest);
 882 mday  1.22    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 883 mday  1.41    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 884            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 885 mday  1.22    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 886 mday  1.30    if ( op->_op_dest == 0 )
 887 mday  1.39    {
 888                  op->release();
 889                  return_op(op);
 890 mday  1.21       return false;
 891 mday  1.39    }
 892 mday  1.24    
 893 mday  1.18    // now see if the meta dispatcher will take it
 894 mday  1.30    return  _meta_dispatcher->route_async(op);
 895 mday  1.18 }
 896 mday  1.2  
 897 mday  1.1  
 898 mday  1.4  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 899 mday  1.1  {
 900 mday  1.4     if ( request == 0 )
 901                  return 0 ;
 902 mday  1.5  
 903               Boolean destroy_op = false;
 904               
 905               if (request->op == false)
 906               {
 907                  request->op = get_op();
 908 mday  1.7        request->op->_request.insert_first(request);
 909 mday  1.5        destroy_op = true;
 910               }
 911 mday  1.4     
 912 mday  1.33    request->block = false;
 913 mday  1.35    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 914 mday  1.33    SendAsync(request->op, 
 915            	     request->dest,
 916 mday  1.36  	     _sendwait_callback,
 917 mday  1.33 	     this,
 918            	     (void *)0);
 919 mday  1.4     
 920 mday  1.33    request->op->_client_sem.wait();
 921 mday  1.6     request->op->lock();
 922               AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 923               rpl->op = 0;
 924               request->op->unlock();
 925               
 926 mday  1.5     if( destroy_op == true)
 927               {
 928 mday  1.6        request->op->lock();
 929                  request->op->_request.remove(request);
 930                  request->op->_state |= ASYNC_OPSTATE_RELEASED;
 931                  request->op->unlock();
 932                  return_op(request->op);
 933 mday  1.7        request->op = 0;
 934 mday  1.5     }
 935               return rpl;
 936 mday  1.1  }
 937            
 938            
 939            Boolean MessageQueueService::register_service(String name, 
 940            					      Uint32 capabilities, 
 941            					      Uint32 mask)
 942            
 943            {
 944               RegisterCimService *msg = new RegisterCimService(get_next_xid(),
 945 mday  1.5  						    0, 
 946 mday  1.1  						    true, 
 947            						    name, 
 948            						    capabilities, 
 949            						    mask,
 950            						    _queueId);
 951 mday  1.44    msg->dest = CIMOM_Q_ID;
 952               
 953 mday  1.1     Boolean registered = false;
 954 mday  1.7     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
 955 mday  1.1     
 956 mday  1.2     if ( reply != 0 )
 957 mday  1.1     {
 958                  if(reply->getMask() & message_mask:: ha_async)
 959                  {
 960            	 if(reply->getMask() & message_mask::ha_reply)
 961            	 {
 962 mday  1.15 	    if(reply->result == async_results::OK || 
 963            	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
 964 mday  1.1  	       registered = true;
 965            	 }
 966                  }
 967                  
 968 mday  1.7        delete reply; 
 969 mday  1.1     }
 970 mday  1.5     delete msg;
 971 mday  1.1     return registered;
 972            }
 973            
 974            Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 975            {
 976               
 977               
 978               UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
 979 mday  1.5  						0, 
 980 mday  1.1  						true, 
 981            						_queueId,
 982            						_capabilities, 
 983            						_mask);
 984               Boolean registered = false;
 985 mday  1.2  
 986               AsyncMessage *reply = SendWait(msg);
 987               if (reply)
 988 mday  1.1     {
 989                  if(reply->getMask() & message_mask:: ha_async)
 990                  {
 991            	 if(reply->getMask() & message_mask::ha_reply)
 992            	 {
 993 mday  1.2  	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
 994 mday  1.1  	       registered = true;
 995            	 }
 996                  }
 997                  delete reply;
 998               }
 999 mday  1.5     delete msg;
1000 mday  1.1     return registered;
1001            }
1002            
1003            
1004            Boolean MessageQueueService::deregister_service(void)
1005            {
1006 mday  1.3  
1007 mday  1.5     _meta_dispatcher->deregister_module(_queueId);
1008               return true;
1009 mday  1.1  }
1010            
1011            
1012            void MessageQueueService::find_services(String name, 
1013            					Uint32 capabilities, 
1014            					Uint32 mask, 
1015            					Array<Uint32> *results)
1016            {
1017               
1018               if( results == 0 )
1019                  throw NullPointer();
1020 mday  1.5      
1021 mday  1.1     results->clear();
1022               
1023               FindServiceQueue *req = 
1024                  new FindServiceQueue(get_next_xid(), 
1025 mday  1.5  			   0, 
1026 mday  1.1  			   _queueId, 
1027            			   true, 
1028            			   name, 
1029            			   capabilities, 
1030            			   mask);
1031 mday  1.44    
1032               req->dest = CIMOM_Q_ID;
1033 mday  1.1     
1034 mday  1.2     AsyncMessage *reply = SendWait(req); 
1035               if(reply)
1036 mday  1.1     {
1037                  if( reply->getMask() & message_mask::ha_async)
1038                  {
1039            	 if(reply->getMask() & message_mask::ha_reply)
1040            	 {
1041            	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1042            	    {
1043            	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1044            		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1045            	    }
1046            	 }
1047                  }
1048                  delete reply;
1049               }
1050 mday  1.5     delete req;
1051 mday  1.1     return ;
1052            }
1053            
1054            void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1055            {
1056               if(result == 0)
1057                  throw NullPointer();
1058               
1059               EnumerateService *req 
1060                  = new EnumerateService(get_next_xid(),
1061 mday  1.5  			     0, 
1062 mday  1.1  			     _queueId, 
1063            			     true, 
1064            			     queue);
1065               
1066 mday  1.2     AsyncMessage *reply = SendWait(req);
1067 mday  1.1     
1068 mday  1.2     if (reply)
1069 mday  1.1     {
1070                  Boolean found = false;
1071                  
1072                  if( reply->getMask() & message_mask::ha_async)
1073                  {
1074            	 if(reply->getMask() & message_mask::ha_reply)
1075            	 {
1076            	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1077            	    {
1078            	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1079            	       {
1080            		  if( found == false)
1081            		  {
1082            		     found = true;
1083            		     
1084            		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1085            		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1086            		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1087            		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1088            		  }
1089            	       }
1090 mday  1.1  	    }
1091            	 }
1092                  }
1093                  delete reply;
1094               }
1095 mday  1.5     delete req;
1096               
1097 mday  1.1     return;
1098            }
1099            
1100            Uint32 MessageQueueService::get_next_xid(void)
1101            {
1102               _xid++;
1103               return _xid.value();
1104            }
1105            
1106            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2