(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 mday  1.53 //%////-*-c++-*-////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 mday  1.53 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
   4 mday  1.1  //
   5            // Permission is hereby granted, free of charge, to any person obtaining a copy
   6            // of this software and associated documentation files (the "Software"), to
   7            // deal in the Software without restriction, including without limitation the
   8            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
   9            // sell copies of the Software, and to permit persons to whom the Software is
  10            // furnished to do so, subject to the following conditions:
  11 mday  1.53 //
  12 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  13            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  14            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  15            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  16            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  17            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  18            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  19            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  20            //
  21            //==============================================================================
  22            //
  23            // Author: Mike Day (mdday@us.ibm.com)
  24            //
  25            // Modified By:
  26            //
  27            //%/////////////////////////////////////////////////////////////////////////////
  28            
  29            #include "MessageQueueService.h"
  30 mday  1.22 #include <Pegasus/Common/Tracer.h>
  31 mday  1.1  
  32            PEGASUS_NAMESPACE_BEGIN
  33            
  34 mday  1.15  
  35            cimom *MessageQueueService::_meta_dispatcher = 0;
  36            AtomicInt MessageQueueService::_service_count = 0;
  37            AtomicInt MessageQueueService::_xid(1);
  38 mday  1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  39 mday  1.15 
  40 mday  1.53 static struct timeval create_time = {0, 100};
  41            static struct timeval destroy_time = {1, 0};
  42            static struct timeval deadlock_time = {10, 0};
  43            
  44            ThreadPool MessageQueueService::_thread_pool(2, "MessageQueueService", 2, 20,
  45            					     create_time, destroy_time, deadlock_time);  
  46            
  47            DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  48            
  49            int MessageQueueService::kill_idle_threads(void)
  50            {
  51               static struct timeval now, last;
  52               gettimeofday(&now, NULL);
  53               
  54               if( now.tv_sec - last.tv_sec > 0 )
  55               {
  56                  gettimeofday(&last, NULL);
  57                  return _thread_pool.kill_dead_threads();
  58               }
  59               return 0;
  60            }
  61 mday  1.53 
  62            PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
  63            {
  64               Thread *myself = reinterpret_cast<Thread *>(parm);
  65               
  66               DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
  67               
  68               while ( _stop_polling.value()  == 0 ) 
  69               {
  70                  _polling_sem.wait();
  71                  list->lock();
  72                  MessageQueueService *service = list->next(0);
  73                  while(service != NULL)
  74                  {
  75            	 if(service->_incoming.count() > 0 )
  76            	 {
  77            	    _thread_pool.allocate_and_awaken(service, _req_proc);
  78            	    
  79            //	    service->_req_proc(service);
  80            	 }
  81            	 
  82 mday  1.53 	 service = list->next(service);
  83                  }
  84                  list->unlock();
  85               }
  86               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
  87               return(0);
  88            }
  89            
  90            Thread MessageQueueService::_polling_thread(polling_routine, 
  91            					    reinterpret_cast<void *>(&_polling_list), 
  92            					    false);
  93            
  94            
  95            Semaphore MessageQueueService::_polling_sem(0);
  96            AtomicInt MessageQueueService::_stop_polling(0);
  97            
  98 mday  1.15 
  99 mday  1.1  MessageQueueService::MessageQueueService(const char *name, 
 100            					 Uint32 queueID, 
 101            					 Uint32 capabilities, 
 102            					 Uint32 mask) 
 103 mday  1.15    : Base(name, true,  queueID),
 104 mday  1.22      
 105 mday  1.1       _mask(mask),
 106 mday  1.4       _die(0),
 107 mday  1.41      _incoming(true, 0),
 108 mday  1.39      _callback(true),
 109 mday  1.7       _incoming_queue_shutdown(0),
 110 mday  1.39      _callback_ready(0),
 111                 _req_thread(_req_proc, this, false),
 112                 _callback_thread(_callback_proc, this, false)
 113 mday  1.53 
 114 mday  1.1  { 
 115 mday  1.22    _capabilities = (capabilities | module_capabilities::async);
 116               
 117 mday  1.1     _default_op_timeout.tv_sec = 30;
 118               _default_op_timeout.tv_usec = 100;
 119 mday  1.15 
 120               _meta_dispatcher_mutex.lock(pegasus_thread_self());
 121               
 122               if( _meta_dispatcher == 0 )
 123               {
 124                  PEGASUS_ASSERT( _service_count.value() == 0 );
 125                  _meta_dispatcher = new cimom();
 126                  if (_meta_dispatcher == NULL )
 127                  {
 128            	 _meta_dispatcher_mutex.unlock();
 129            	 throw NullPointer();
 130                  }
 131 mday  1.53       _polling_thread.run();
 132 mday  1.15    }
 133               _service_count++;
 134            
 135               if( false == register_service(name, _capabilities, _mask) )
 136               {
 137                  _meta_dispatcher_mutex.unlock();
 138                  throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
 139               }
 140               
 141 mday  1.53    _polling_list.insert_last(this);
 142               
 143 mday  1.15    _meta_dispatcher_mutex.unlock();
 144 mday  1.39 //   _callback_thread.run();
 145               
 146 mday  1.53 //   _req_thread.run();
 147 mday  1.1  }
 148            
 149 mday  1.4  
 150 mday  1.1  MessageQueueService::~MessageQueueService(void)
 151            {
 152               _die = 1;
 153 mday  1.7     if (_incoming_queue_shutdown.value() == 0 )
 154 mday  1.16    {
 155 mday  1.32       _shutdown_incoming_queue();
 156 mday  1.16    }
 157 mday  1.39    _callback_ready.signal();
 158 mday  1.53 //   _callback_thread.join();
 159 mday  1.39    
 160 mday  1.15    _meta_dispatcher_mutex.lock(pegasus_thread_self());
 161               _service_count--;
 162               if (_service_count.value() == 0 )
 163               {
 164 mday  1.53       _stop_polling++;
 165                  _polling_sem.signal();
 166                  _polling_thread.join();
 167 mday  1.15       _meta_dispatcher->_shutdown_routed_queue();
 168                  delete _meta_dispatcher;
 169 kumpf 1.45       _meta_dispatcher = 0;
 170 mday  1.53 
 171 mday  1.15    }
 172               _meta_dispatcher_mutex.unlock();
 173 mday  1.53    _polling_list.remove(this);
 174            } 
 175 mday  1.1  
 176 mday  1.7  void MessageQueueService::_shutdown_incoming_queue(void)
 177            {
 178               
 179 mday  1.9     if (_incoming_queue_shutdown.value() > 0 )
 180                  return ;
 181 mday  1.8     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 182            				    0, 
 183            				    _queueId, 
 184            				    _queueId, 
 185            				    true, 
 186            				    AsyncIoctl::IO_CLOSE, 
 187            				    0, 
 188            				    0);
 189 mday  1.9  
 190 mday  1.8     msg->op = get_op();
 191 mday  1.41    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 192               msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 193            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 194 mday  1.32    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 195 mday  1.41 
 196 mday  1.32    msg->op->_op_dest = this;
 197 mday  1.8     msg->op->_request.insert_first(msg);
 198               
 199               _incoming.insert_last_wait(msg->op);
 200 mday  1.32 
 201 mday  1.53 //   _req_thread.join();
 202 mday  1.16    
 203 mday  1.7  }
 204            
 205 mday  1.22 
 206            
 207            void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 208            {
 209 kumpf 1.28    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 210            
 211 mday  1.22    Base::enqueue(msg);
 212 kumpf 1.28 
 213               PEG_METHOD_EXIT();
 214 mday  1.22 }
 215            
 216            
 217 mday  1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 218            {
 219               Thread *myself = reinterpret_cast<Thread *>(parm);
 220               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 221               AsyncOpNode *operation = 0;
 222               
 223               while ( service->_die.value() == 0 ) 
 224               {
 225                  service->_callback_ready.wait();
 226                  
 227                  service->_callback.lock();
 228                  operation = service->_callback.next(0);
 229                  while( operation != NULL)
 230                  {
 231            	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 232            	 {
 233            	    operation = service->_callback.remove_no_lock(operation);
 234            	    PEGASUS_ASSERT(operation != NULL);
 235            	    operation->_thread_ptr = myself;
 236            	    operation->_service_ptr = service;
 237            	    service->_handle_async_callback(operation);
 238 mday  1.39 	    break;
 239            	 }
 240            	 operation = service->_callback.next(operation);
 241                  }
 242                  service->_callback.unlock();
 243               }
 244               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 245               return(0);
 246            }
 247            
 248 mday  1.22 
 249 mday  1.5  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 250 mday  1.1  {
 251 mday  1.53 //   Thread *myself = reinterpret_cast<Thread *>(parm);
 252            //   MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 253               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 254 mday  1.5     // pull messages off the incoming queue and dispatch them. then 
 255               // check pending messages that are non-blocking
 256 mday  1.7     AsyncOpNode *operation = 0;
 257               
 258 mday  1.53 //    while ( service->_die.value() == 0 ) 
 259            //    {
 260 mday  1.41 	 try 
 261            	 {
 262 mday  1.53 	    operation = service->_incoming.remove_first();
 263 mday  1.41 	 }
 264            	 catch(ListClosed & )
 265            	 {
 266 mday  1.53 //	    break;
 267 mday  1.41 	 }
 268            	 if( operation )
 269            	 {
 270 mday  1.53 //	    operation->_thread_ptr = pegasus_thread_self();
 271 mday  1.41 	    operation->_service_ptr = service;
 272            	    service->_handle_incoming_operation(operation);
 273            	 }
 274 mday  1.53 //    }
 275 mday  1.44 
 276 mday  1.53 //    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 277 mday  1.5     return(0);
 278 mday  1.1  }
 279            
 280 mday  1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
 281            {
 282               return _callback.count();
 283            }
 284            
 285            
 286            
 287 mday  1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 288            					     MessageQueue *q, 
 289            					     void *parm)
 290            {
 291               op->_client_sem.signal();
 292            }
 293            
 294 mday  1.30 
 295            // callback function is responsible for cleaning up all resources
 296            // including op, op->_callback_node, and op->_callback_ptr
 297 mday  1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 298            {
 299 mday  1.39    if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 300               {
 301            
 302                  Message *msg = op->get_request();
 303                  if( msg && ( msg->getMask() & message_mask::ha_async))
 304                  {
 305            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 306            	 {
 307            	    AsyncLegacyOperationStart *wrapper = 
 308            	       static_cast<AsyncLegacyOperationStart *>(msg);
 309            	    msg = wrapper->get_action();
 310            	    delete wrapper;
 311            	 }
 312            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 313            	 {
 314            	    AsyncModuleOperationStart *wrapper = 
 315            	       static_cast<AsyncModuleOperationStart *>(msg);
 316            	    msg = wrapper->get_action();
 317            	    delete wrapper;
 318            	 }
 319            	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 320 mday  1.39 	 {
 321            	    AsyncOperationStart *wrapper = 
 322            	       static_cast<AsyncOperationStart *>(msg);
 323            	    msg = wrapper->get_action();
 324            	    delete wrapper;
 325            	 }
 326            	 delete msg;
 327                  }
 328            
 329                  msg = op->get_response();
 330                  if( msg && ( msg->getMask() & message_mask::ha_async))
 331                  {
 332            	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 333            	 {
 334            	    AsyncLegacyOperationResult *wrapper = 
 335            	       static_cast<AsyncLegacyOperationResult *>(msg);
 336            	    msg = wrapper->get_result();
 337            	    delete wrapper;
 338            	 }
 339            	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 340            	 {
 341 mday  1.39 	    AsyncModuleOperationResult *wrapper = 
 342            	       static_cast<AsyncModuleOperationResult *>(msg);
 343            	    msg = wrapper->get_result();
 344            	    delete wrapper;
 345            	 }
 346                  }
 347                  void (*callback)(Message *, void *, void *) = op->__async_callback;
 348                  void *handle = op->_callback_handle;
 349                  void *parm = op->_callback_parameter;
 350                  op->release();
 351                  return_op(op);
 352                  callback(msg, handle, parm);
 353               }
 354               else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 355               {
 356                  // note that _callback_node may be different from op 
 357                  // op->_callback_response_q is a "this" pointer we can use for 
 358                  // static callback methods
 359                  op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 360               }
 361 mday  1.22 }
 362            
 363 mday  1.1  
 364 mday  1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 365            //						     Thread *thread, 
 366            //						     MessageQueue *queue)
 367 mday  1.1  {
 368 mday  1.5     if ( operation != 0 )
 369               {
 370 mday  1.29       
 371 mday  1.22 // ATTN: optimization 
 372            // << Tue Feb 19 14:10:38 2002 mdd >>
 373 mday  1.6        operation->lock();
 374 mday  1.29             
 375 mday  1.6        Message *rq = operation->_request.next(0);
 376 mday  1.29      
 377 mday  1.31 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 378            // move this to the bottom of the loop when the majority of 
 379            // messages become async messages. 
 380            
 381 mday  1.18       // divert legacy messages to handleEnqueue
 382 mday  1.29       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 383 mday  1.18       {
 384            	 rq = operation->_request.remove_first() ;
 385            	 operation->unlock();
 386            	 // delete the op node 
 387 mday  1.39 	 operation->release();
 388            	 return_op( operation);
 389 mday  1.24 
 390 mday  1.26 	 handleEnqueue(rq);
 391 mday  1.18 	 return;
 392                  }
 393            
 394 mday  1.39       if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 395            	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 396 mday  1.29 	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 397                  {
 398 mday  1.49 	 
 399 mday  1.29 	 operation->unlock();
 400            	 _handle_async_callback(operation);
 401                  }
 402                  else 
 403                  {
 404            	 PEGASUS_ASSERT(rq != 0 );
 405            	 // ATTN: optimization
 406            	 // << Wed Mar  6 15:00:39 2002 mdd >>
 407            	 // put thread and queue into the asyncopnode structure. 
 408 mday  1.34          //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
 409                     //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
 410            	 // done << Tue Mar 12 14:49:07 2002 mdd >>
 411 mday  1.29 	 operation->unlock();
 412            	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 413                  }
 414 mday  1.5     }
 415               return;
 416 mday  1.1  }
 417            
 418            void MessageQueueService::_handle_async_request(AsyncRequest *req)
 419            {
 420 mday  1.4     if ( req != 0 )
 421               {
 422                  req->op->processing();
 423 mday  1.1     
 424 mday  1.4        Uint32 type = req->getType();
 425                  if( type == async_messages::HEARTBEAT )
 426            	 handle_heartbeat_request(req);
 427                  else if (type == async_messages::IOCTL)
 428            	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 429                  else if (type == async_messages::CIMSERVICE_START)
 430            	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 431                  else if (type == async_messages::CIMSERVICE_STOP)
 432            	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 433                  else if (type == async_messages::CIMSERVICE_PAUSE)
 434            	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 435                  else if (type == async_messages::CIMSERVICE_RESUME)
 436            	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 437                  else if ( type == async_messages::ASYNC_OP_START)
 438            	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 439                  else 
 440                  {
 441            	 // we don't handle this request message 
 442            	 _make_response(req, async_results::CIM_NAK );
 443                  }
 444 mday  1.1     }
 445            }
 446            
 447 mday  1.17 
 448            Boolean MessageQueueService::_enqueueResponse(
 449               Message* request, 
 450               Message* response)
 451 mday  1.18    
 452 mday  1.17 {
 453 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 454                                "MessageQueueService::_enqueueResponse");
 455 mday  1.25 
 456               if( request->getMask() & message_mask::ha_async)
 457               {
 458                  if (response->getMask() & message_mask::ha_async )
 459                  {
 460            	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 461            				static_cast<AsyncReply *>(response), 
 462            				ASYNC_OPSTATE_COMPLETE, 0 );
 463 kumpf 1.37          PEG_METHOD_EXIT();
 464 mday  1.25 	 return true;
 465                  }
 466               }
 467               
 468 mday  1.17    if(request->_async != 0 )
 469               {
 470                  Uint32 mask = request->_async->getMask();
 471 mday  1.18       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 472                  
 473                  AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 474                  AsyncOpNode *op = async->op;
 475                  request->_async = 0;
 476 mday  1.27       // this request is probably going to be deleted !!
 477                  // remove it from the op node 
 478                  op->_request.remove(request);
 479 mday  1.18       
 480                  AsyncLegacyOperationResult *async_result = 
 481            	 new AsyncLegacyOperationResult( 
 482            	    async->getKey(),
 483            	    async->getRouting(),
 484            	    op,
 485            	    response);
 486                  _completeAsyncResponse(async,
 487            			     async_result,
 488            			     ASYNC_OPSTATE_COMPLETE, 
 489            			     0);
 490 kumpf 1.37       PEG_METHOD_EXIT();
 491 mday  1.18       return true;
 492 mday  1.17    }
 493 mday  1.18    
 494               // ensure that the destination queue is in response->dest
 495 kumpf 1.37    PEG_METHOD_EXIT();
 496 mday  1.24    return SendForget(response);
 497 mday  1.18    
 498 mday  1.17 }
 499            
 500 mday  1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
 501 mday  1.1  {
 502 mday  1.19    cimom::_make_response(req, code);
 503 mday  1.1  }
 504            
 505            
 506 mday  1.5  void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 507            						AsyncReply *reply, 
 508            						Uint32 state, 
 509            						Uint32 flag)
 510            {
 511 kumpf 1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 512                                "MessageQueueService::_completeAsyncResponse");
 513            
 514 mday  1.19    cimom::_completeAsyncResponse(request, reply, state, flag);
 515 kumpf 1.37 
 516               PEG_METHOD_EXIT();
 517 mday  1.5  }
 518            
 519            
 520 mday  1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 521            					    Uint32 state, 
 522            					    Uint32 flag, 
 523            					    Uint32 code)
 524            {
 525               cimom::_complete_op_node(op, state, flag, code);
 526            }
 527            
 528 mday  1.5  
 529            Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 530            {
 531 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 532                  return false;
 533               
 534 mday  1.20 // ATTN optimization remove the message checking altogether in the base 
 535            // << Mon Feb 18 14:02:20 2002 mdd >>
 536 mday  1.6     op->lock();
 537               Message *rq = op->_request.next(0);
 538 mday  1.20    Message *rp = op->_response.next(0);
 539 mday  1.6     op->unlock();
 540               
 541 mday  1.22    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 542            	_die.value() == 0  )
 543 mday  1.5     {
 544 mday  1.6        _incoming.insert_last_wait(op);
 545 mday  1.53       _polling_sem.signal();
 546 mday  1.5        return true;
 547               }
 548 mday  1.32 //    else
 549            //    {
 550            //       if(  (rq != 0 && (true == MessageQueueService::messageOK(rq))) || 
 551            // 	   (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&  
 552            // 	   _die.value() == 0)
 553            //       {
 554            // 	 MessageQueueService::_incoming.insert_last_wait(op);
 555            // 	 return true;
 556            //       }
 557            //    }
 558               
 559 mday  1.5     return false;
 560            }
 561            
 562            Boolean MessageQueueService::messageOK(const Message *msg)
 563            {
 564 mday  1.8     if (_incoming_queue_shutdown.value() > 0 )
 565                  return false;
 566 mday  1.18    return true;
 567            }
 568            
 569 mday  1.29 
 570            // made pure virtual 
 571            // << Wed Mar  6 15:11:31 2002 mdd >> 
 572 mday  1.25 // void MessageQueueService::handleEnqueue(Message *msg)
 573            // {
 574            //    if ( msg )
 575            //       delete msg;
 576            // }
 577 mday  1.5  
 578 mday  1.29 // made pure virtual 
 579            // << Wed Mar  6 15:11:56 2002 mdd >>
 580 mday  1.25 // void MessageQueueService::handleEnqueue(void)
 581            // {
 582            //     Message *msg = dequeue();
 583            //     handleEnqueue(msg);
 584            // }
 585 mday  1.5  
 586 mday  1.1  void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 587            {
 588               // default action is to echo a heartbeat response 
 589               
 590               AsyncReply *reply = 
 591                  new AsyncReply(async_messages::HEARTBEAT,
 592            		     req->getKey(),
 593            		     req->getRouting(),
 594            		     0,
 595            		     req->op, 
 596            		     async_results::OK, 
 597            		     req->resp,
 598            		     false);
 599 mday  1.4     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 600 mday  1.1  }
 601            
 602            
 603            void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 604            { 
 605               ;
 606            }
 607                  
 608            void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 609            {
 610 mday  1.8     
 611               switch( req->ctl )
 612               {
 613                  case AsyncIoctl::IO_CLOSE:
 614                  {
 615            	 // save my bearings 
 616 mday  1.53 //	 Thread *myself = req->op->_thread_ptr;
 617 mday  1.34 	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 618 mday  1.8  	 
 619            	 // respond to this message.
 620            	 _make_response(req, async_results::OK);
 621            	 // ensure we do not accept any further messages
 622            
 623            	 // ensure we don't recurse on IO_CLOSE
 624            	 if( _incoming_queue_shutdown.value() > 0 )
 625            	    break;
 626            	 
 627            	 // set the closing flag 
 628            	 service->_incoming_queue_shutdown = 1;
 629            	 // empty out the queue
 630            	 while( 1 )
 631            	 {
 632            	    AsyncOpNode *operation;
 633            	    try 
 634            	    {
 635            	       operation = service->_incoming.remove_first();
 636            	    }
 637            	    catch(IPCException & )
 638            	    {
 639 mday  1.8  	       break;
 640            	    }
 641            	    if( operation )
 642            	    {
 643 mday  1.53 //	       operation->_thread_ptr = myself;
 644 mday  1.34 	       operation->_service_ptr = service;
 645            	       service->_handle_incoming_operation(operation);
 646 mday  1.8  	    }
 647            	    else
 648            	       break;
 649            	 } // message processing loop
 650            
 651            	 // shutdown the AsyncDQueue
 652            	 service->_incoming.shutdown_queue();
 653            	 // exit the thread ! 
 654 mday  1.53 //	 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 655 mday  1.8  	 return;
 656                  }
 657            
 658                  default:
 659            	 _make_response(req, async_results::CIM_NAK);
 660               }
 661 mday  1.1  }
 662 mday  1.8  
 663 mday  1.1  void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 664            {
 665 mday  1.10    // clear the stoped bit and update
 666 mday  1.13    _capabilities &= (~(module_capabilities::stopped));
 667 mday  1.10    _make_response(req, async_results::OK);
 668               // now tell the meta dispatcher we are stopped 
 669               update_service(_capabilities, _mask);
 670            
 671 mday  1.1  }
 672            void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 673            {
 674 mday  1.10    // set the stopeed bit and update
 675               _capabilities |= module_capabilities::stopped;
 676               _make_response(req, async_results::CIM_STOPPED);
 677               // now tell the meta dispatcher we are stopped 
 678               update_service(_capabilities, _mask);
 679               
 680 mday  1.1  }
 681            void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 682            {
 683 mday  1.10    // set the paused bit and update
 684 mday  1.13    _capabilities |= module_capabilities::paused;
 685 mday  1.11    update_service(_capabilities, _mask);
 686 mday  1.10    _make_response(req, async_results::CIM_PAUSED);
 687               // now tell the meta dispatcher we are stopped 
 688 mday  1.1  }
 689            void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 690            {
 691 mday  1.10    // clear the paused  bit and update
 692 mday  1.13    _capabilities &= (~(module_capabilities::paused));
 693 mday  1.11    update_service(_capabilities, _mask);
 694 mday  1.10    _make_response(req, async_results::OK);
 695               // now tell the meta dispatcher we are stopped 
 696 mday  1.1  }
 697                  
 698            void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 699            {
 700               _make_response(req, async_results::CIM_NAK);
 701            }
 702            
 703            void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 704            {
 705 mday  1.14    ;
 706            }
 707            
 708 mday  1.10 
 709 mday  1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 710            {
 711               // remove the legacy message from the request and enqueue it to its destination
 712               Uint32 result = async_results::CIM_NAK;
 713               
 714 mday  1.25    Message *legacy = req->_act;
 715 mday  1.14    if ( legacy != 0 )
 716               {
 717 mday  1.25       MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 718 mday  1.14       if( queue != 0 )
 719                  {
 720 mday  1.25 	 if(queue->isAsync() == true )
 721            	 {
 722            	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 723            	 }
 724            	 else 
 725            	 {
 726            	    // Enqueue the response:
 727            	    queue->enqueue(req->get_action());
 728            	 }
 729            	 
 730 mday  1.14 	 result = async_results::OK;
 731                  }
 732               }
 733               _make_response(req, result);
 734            }
 735            
 736            void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 737            {
 738               ;
 739 mday  1.1  }
 740            
 741            AsyncOpNode *MessageQueueService::get_op(void)
 742            {
 743 mday  1.4     AsyncOpNode *op = new AsyncOpNode();
 744 mday  1.1     
 745 mday  1.9     op->_state = ASYNC_OPSTATE_UNKNOWN;
 746               op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 747 mday  1.4     
 748 mday  1.1     return op;
 749            }
 750            
 751            void MessageQueueService::return_op(AsyncOpNode *op)
 752            {
 753               PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 754 mday  1.4     delete op;
 755 mday  1.1  }
 756            
 757 mday  1.18 
 758 mday  1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 759            				       Uint32 destination)
 760            {
 761               PEGASUS_ASSERT(op != 0 );
 762 mday  1.30    op->lock();
 763               op->_op_dest = MessageQueue::lookup(destination);
 764 mday  1.29    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 765               op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 766 mday  1.30    op->unlock();
 767               if ( op->_op_dest == 0 )
 768                  return false;
 769                  
 770 mday  1.29    return  _meta_dispatcher->route_async(op);
 771            }
 772            
 773 mday  1.39  
 774 mday  1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 775            				       Uint32 destination,
 776 mday  1.18 				       void (*callback)(AsyncOpNode *, 
 777 mday  1.32 							MessageQueue *,
 778 mday  1.30 							void *),
 779            				       MessageQueue *callback_response_q,
 780            				       void *callback_ptr)
 781 mday  1.20 { 
 782 mday  1.21    PEGASUS_ASSERT(op != 0 && callback != 0 );
 783 mday  1.18    
 784 mday  1.21    // get the queue handle for the destination
 785            
 786 mday  1.30    op->lock();
 787 mday  1.32    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 788 mday  1.22    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 789               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 790               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 791 mday  1.30    // initialize the callback data
 792 mday  1.32    op->_async_callback = callback;   // callback function to be executed by recpt. of response
 793               op->_callback_node = op;          // the op node
 794               op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 795               op->_callback_ptr = callback_ptr;   // user data for callback
 796               op->_callback_request_q = this;     // I am the originator of this request
 797 mday  1.30    
 798               op->unlock();
 799               if(op->_op_dest == 0) 
 800                  return false;
 801 mday  1.21    
 802               return  _meta_dispatcher->route_async(op);
 803 mday  1.18 }
 804            
 805            
 806 mday  1.39 Boolean MessageQueueService::SendAsync(Message *msg, 
 807            				       Uint32 destination,
 808            				       void (*callback)(Message *response, 
 809            							void *handle, 
 810            							void *parameter),
 811            				       void *handle, 
 812            				       void *parameter)
 813            {
 814               if(msg == NULL)
 815                  return false;
 816               if(callback == NULL)
 817                  return SendForget(msg);
 818               AsyncOpNode *op = get_op();
 819 mday  1.43    msg->dest = destination;
 820 mday  1.39    if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 821               {
 822                  op->release();
 823                  return_op(op);
 824                  return false;
 825               }
 826               op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 827               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 828               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 829               op->__async_callback = callback;
 830               op->_callback_node = op;
 831               op->_callback_handle = handle;
 832               op->_callback_parameter = parameter;
 833               op->_callback_response_q = this;
 834               
 835            
 836               if( ! (msg->getMask() & message_mask::ha_async) )
 837               {
 838                  AsyncLegacyOperationStart *wrapper = 
 839            	 new AsyncLegacyOperationStart(get_next_xid(),
 840            				       op, 
 841 mday  1.39 				       destination, 
 842            				       msg, 
 843            				       destination);
 844               }
 845               else 
 846               {
 847                  op->_request.insert_first(msg);
 848                  (static_cast<AsyncMessage *>(msg))->op = op;
 849               }
 850               
 851               _callback.insert_last(op);
 852               return _meta_dispatcher->route_async(op);
 853            }
 854            
 855            
 856 mday  1.18 Boolean MessageQueueService::SendForget(Message *msg)
 857            {
 858            
 859 mday  1.24    
 860 mday  1.18    AsyncOpNode *op = 0;
 861 mday  1.22    Uint32 mask = msg->getMask();
 862               
 863               if (mask & message_mask::ha_async)
 864 mday  1.18    {
 865                  op = (static_cast<AsyncMessage *>(msg))->op ;
 866               }
 867 mday  1.22 
 868 mday  1.18    if( op == 0 )
 869 mday  1.20    {
 870 mday  1.18       op = get_op();
 871 mday  1.20       op->_request.insert_first(msg);
 872 mday  1.22       if (mask & message_mask::ha_async)
 873            	 (static_cast<AsyncMessage *>(msg))->op = op;
 874 mday  1.20    }
 875 mday  1.30    op->_op_dest = MessageQueue::lookup(msg->dest);
 876 mday  1.22    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 877 mday  1.41    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 878            		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 879 mday  1.22    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 880 mday  1.30    if ( op->_op_dest == 0 )
 881 mday  1.39    {
 882                  op->release();
 883                  return_op(op);
 884 mday  1.21       return false;
 885 mday  1.39    }
 886 mday  1.24    
 887 mday  1.18    // now see if the meta dispatcher will take it
 888 mday  1.30    return  _meta_dispatcher->route_async(op);
 889 mday  1.18 }
 890 mday  1.2  
 891 mday  1.1  
 892 mday  1.4  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 893 mday  1.1  {
 894 mday  1.4     if ( request == 0 )
 895                  return 0 ;
 896 mday  1.5  
 897               Boolean destroy_op = false;
 898               
 899               if (request->op == false)
 900               {
 901                  request->op = get_op();
 902 mday  1.7        request->op->_request.insert_first(request);
 903 mday  1.5        destroy_op = true;
 904               }
 905 mday  1.4     
 906 mday  1.33    request->block = false;
 907 mday  1.35    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 908 mday  1.33    SendAsync(request->op, 
 909            	     request->dest,
 910 mday  1.36  	     _sendwait_callback,
 911 mday  1.33 	     this,
 912            	     (void *)0);
 913 mday  1.4     
 914 mday  1.33    request->op->_client_sem.wait();
 915 mday  1.6     request->op->lock();
 916               AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 917               rpl->op = 0;
 918               request->op->unlock();
 919               
 920 mday  1.5     if( destroy_op == true)
 921               {
 922 mday  1.6        request->op->lock();
 923                  request->op->_request.remove(request);
 924                  request->op->_state |= ASYNC_OPSTATE_RELEASED;
 925                  request->op->unlock();
 926                  return_op(request->op);
 927 mday  1.7        request->op = 0;
 928 mday  1.5     }
 929               return rpl;
 930 mday  1.1  }
 931            
 932            
 933            Boolean MessageQueueService::register_service(String name, 
 934            					      Uint32 capabilities, 
 935            					      Uint32 mask)
 936            
 937            {
 938               RegisterCimService *msg = new RegisterCimService(get_next_xid(),
 939 mday  1.5  						    0, 
 940 mday  1.1  						    true, 
 941            						    name, 
 942            						    capabilities, 
 943            						    mask,
 944            						    _queueId);
 945 mday  1.44    msg->dest = CIMOM_Q_ID;
 946               
 947 mday  1.1     Boolean registered = false;
 948 mday  1.7     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
 949 mday  1.1     
 950 mday  1.2     if ( reply != 0 )
 951 mday  1.1     {
 952                  if(reply->getMask() & message_mask:: ha_async)
 953                  {
 954            	 if(reply->getMask() & message_mask::ha_reply)
 955            	 {
 956 mday  1.15 	    if(reply->result == async_results::OK || 
 957            	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
 958 mday  1.1  	       registered = true;
 959            	 }
 960                  }
 961                  
 962 mday  1.7        delete reply; 
 963 mday  1.1     }
 964 mday  1.5     delete msg;
 965 mday  1.1     return registered;
 966            }
 967            
 968            Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 969            {
 970               
 971               
 972               UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
 973 mday  1.5  						0, 
 974 mday  1.1  						true, 
 975            						_queueId,
 976            						_capabilities, 
 977            						_mask);
 978               Boolean registered = false;
 979 mday  1.2  
 980               AsyncMessage *reply = SendWait(msg);
 981               if (reply)
 982 mday  1.1     {
 983                  if(reply->getMask() & message_mask:: ha_async)
 984                  {
 985            	 if(reply->getMask() & message_mask::ha_reply)
 986            	 {
 987 mday  1.2  	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
 988 mday  1.1  	       registered = true;
 989            	 }
 990                  }
 991                  delete reply;
 992               }
 993 mday  1.5     delete msg;
 994 mday  1.1     return registered;
 995            }
 996            
 997            
 998            Boolean MessageQueueService::deregister_service(void)
 999            {
1000 mday  1.3  
1001 mday  1.5     _meta_dispatcher->deregister_module(_queueId);
1002               return true;
1003 mday  1.1  }
1004            
1005            
1006            void MessageQueueService::find_services(String name, 
1007            					Uint32 capabilities, 
1008            					Uint32 mask, 
1009            					Array<Uint32> *results)
1010            {
1011               
1012               if( results == 0 )
1013                  throw NullPointer();
1014 mday  1.5      
1015 mday  1.1     results->clear();
1016               
1017               FindServiceQueue *req = 
1018                  new FindServiceQueue(get_next_xid(), 
1019 mday  1.5  			   0, 
1020 mday  1.1  			   _queueId, 
1021            			   true, 
1022            			   name, 
1023            			   capabilities, 
1024            			   mask);
1025 mday  1.44    
1026               req->dest = CIMOM_Q_ID;
1027 mday  1.1     
1028 mday  1.2     AsyncMessage *reply = SendWait(req); 
1029               if(reply)
1030 mday  1.1     {
1031                  if( reply->getMask() & message_mask::ha_async)
1032                  {
1033            	 if(reply->getMask() & message_mask::ha_reply)
1034            	 {
1035            	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1036            	    {
1037            	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1038            		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1039            	    }
1040            	 }
1041                  }
1042                  delete reply;
1043               }
1044 mday  1.5     delete req;
1045 mday  1.1     return ;
1046            }
1047            
1048            void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1049            {
1050               if(result == 0)
1051                  throw NullPointer();
1052               
1053               EnumerateService *req 
1054                  = new EnumerateService(get_next_xid(),
1055 mday  1.5  			     0, 
1056 mday  1.1  			     _queueId, 
1057            			     true, 
1058            			     queue);
1059               
1060 mday  1.2     AsyncMessage *reply = SendWait(req);
1061 mday  1.1     
1062 mday  1.2     if (reply)
1063 mday  1.1     {
1064                  Boolean found = false;
1065                  
1066                  if( reply->getMask() & message_mask::ha_async)
1067                  {
1068            	 if(reply->getMask() & message_mask::ha_reply)
1069            	 {
1070            	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1071            	    {
1072            	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1073            	       {
1074            		  if( found == false)
1075            		  {
1076            		     found = true;
1077            		     
1078            		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1079            		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1080            		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1081            		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1082            		  }
1083            	       }
1084 mday  1.1  	    }
1085            	 }
1086                  }
1087                  delete reply;
1088               }
1089 mday  1.5     delete req;
1090               
1091 mday  1.1     return;
1092            }
1093            
1094            Uint32 MessageQueueService::get_next_xid(void)
1095            {
1096               _xid++;
1097               return _xid.value();
1098            }
1099            
1100            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2