(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 mday  1.53 //%////-*-c++-*-////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 kumpf 1.54 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
   4            // The Open Group, Tivoli Systems
   5 mday  1.1  //
   6            // Permission is hereby granted, free of charge, to any person obtaining a copy
   7            // of this software and associated documentation files (the "Software"), to
   8            // deal in the Software without restriction, including without limitation the
   9            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10            // sell copies of the Software, and to permit persons to whom the Software is
  11            // furnished to do so, subject to the following conditions:
  12 mday  1.53 //
  13 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  14            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  15            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  16            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  17            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  18            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  19            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  20            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  21            //
  22            //==============================================================================
  23            //
  24            // Author: Mike Day (mdday@us.ibm.com)
  25            //
  26            // Modified By:
  27            //
  28            //%/////////////////////////////////////////////////////////////////////////////
  29            
  30            #include "MessageQueueService.h"
  31 mday  1.22 #include <Pegasus/Common/Tracer.h>
  32 mday  1.1  
  33            PEGASUS_NAMESPACE_BEGIN
  34            
  35 mday  1.15  
  36            cimom *MessageQueueService::_meta_dispatcher = 0;
  37            AtomicInt MessageQueueService::_service_count = 0;
  38            AtomicInt MessageQueueService::_xid(1);
  39 mday  1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  40 mday  1.15 
  41 mday  1.58 static struct timeval create_time = {0, 1};
  42 mday  1.64.2.2 static struct timeval destroy_time = {15, 0};
  43 mday  1.59     static struct timeval deadlock_time = {0, 0};
  44 mday  1.53     
  45 mday  1.55     ThreadPool *MessageQueueService::_thread_pool = 0;
  46 mday  1.53     
  47                DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  48                
  49 kumpf 1.62     Thread* MessageQueueService::_polling_thread = 0;
  50 mday  1.61     
  51                
  52 mday  1.64.2.2 int MessageQueueService::kill_idle_threads(void)
  53 mday  1.53     {
  54 mday  1.64.2.2    static struct timeval now, last;
  55 mday  1.53        gettimeofday(&now, NULL);
  56 mday  1.56        int dead_threads = 0;
  57 mday  1.53        
  58 mday  1.64.2.2    if( now.tv_sec - last.tv_sec > 0 )
  59 mday  1.53        {
  60                      gettimeofday(&last, NULL);
  61 mday  1.56           try 
  62                      {
  63 mday  1.64.2.2 	 dead_threads =  _thread_pool->kill_dead_threads();
  64 mday  1.56           }
  65 mday  1.64.2.2       catch(IPCException& )
  66 mday  1.56           {
  67                
  68                      }
  69 mday  1.53        }
  70 mday  1.64.2.2    return dead_threads;
  71 mday  1.53     }
  72                
  73                PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
  74                {
  75                   Thread *myself = reinterpret_cast<Thread *>(parm);
  76                   DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
  77                   while ( _stop_polling.value()  == 0 ) 
  78                   {
  79                      _polling_sem.wait();
  80 mday  1.61           if(_stop_polling.value() != 0 )
  81                      {
  82 kumpf 1.62              break;
  83 mday  1.61           }
  84 mday  1.64.2.2       
  85 mday  1.53           list->lock();
  86                      MessageQueueService *service = list->next(0);
  87                      while(service != NULL)
  88                      {
  89                	 if(service->_incoming.count() > 0 )
  90                	 {
  91 mday  1.55     	    _thread_pool->allocate_and_awaken(service, _req_proc);
  92 mday  1.64.2.2 //	    service->_req_proc(service);
  93 mday  1.53     	 }
  94                	 service = list->next(service);
  95                      }
  96                      list->unlock();
  97                   }
  98                   myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
  99                   return(0);
 100                }
 101                
 102                
 103                Semaphore MessageQueueService::_polling_sem(0);
 104                AtomicInt MessageQueueService::_stop_polling(0);
 105                
 106 mday  1.15     
 107 mday  1.1      MessageQueueService::MessageQueueService(const char *name, 
 108                					 Uint32 queueID, 
 109                					 Uint32 capabilities, 
 110                					 Uint32 mask) 
 111 mday  1.15        : Base(name, true,  queueID),
 112 mday  1.22          
 113 mday  1.1           _mask(mask),
 114 mday  1.4           _die(0),
 115 mday  1.41          _incoming(true, 0),
 116 mday  1.39          _callback(true),
 117 mday  1.7           _incoming_queue_shutdown(0),
 118 mday  1.39          _callback_ready(0),
 119                     _req_thread(_req_proc, this, false),
 120                     _callback_thread(_callback_proc, this, false)
 121 mday  1.64.2.2 
 122 mday  1.1      { 
 123 mday  1.60     
 124 mday  1.22        _capabilities = (capabilities | module_capabilities::async);
 125                   
 126 mday  1.1         _default_op_timeout.tv_sec = 30;
 127                   _default_op_timeout.tv_usec = 100;
 128 mday  1.15     
 129                   _meta_dispatcher_mutex.lock(pegasus_thread_self());
 130                   
 131                   if( _meta_dispatcher == 0 )
 132                   {
 133                      PEGASUS_ASSERT( _service_count.value() == 0 );
 134                      _meta_dispatcher = new cimom();
 135                      if (_meta_dispatcher == NULL )
 136                      {
 137                	 _meta_dispatcher_mutex.unlock();
 138                	 throw NullPointer();
 139                      }
 140 mday  1.58           _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
 141 mday  1.55     				    create_time, destroy_time, deadlock_time);  
 142 mday  1.61           
 143 kumpf 1.62           _polling_thread = new Thread(polling_routine, 
 144                			           reinterpret_cast<void *>(&_polling_list), 
 145                			           false);
 146                      _polling_thread->run();
 147 mday  1.15        }
 148                   _service_count++;
 149                
 150                   if( false == register_service(name, _capabilities, _mask) )
 151                   {
 152                      _meta_dispatcher_mutex.unlock();
 153 mday  1.64.2.2       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
 154 mday  1.15        }
 155                   
 156 mday  1.53        _polling_list.insert_last(this);
 157                   
 158 mday  1.15        _meta_dispatcher_mutex.unlock();
 159 mday  1.39     //   _callback_thread.run();
 160                   
 161 mday  1.53     //   _req_thread.run();
 162 mday  1.1      }
 163                
 164 mday  1.4      
 165 mday  1.1      MessageQueueService::~MessageQueueService(void)
 166                {
 167                   _die = 1;
 168 mday  1.7         if (_incoming_queue_shutdown.value() == 0 )
 169 mday  1.16        {
 170 mday  1.32           _shutdown_incoming_queue();
 171 mday  1.16        }
 172 mday  1.39        _callback_ready.signal();
 173 mday  1.53     //   _callback_thread.join();
 174 mday  1.39        
 175 mday  1.15        _meta_dispatcher_mutex.lock(pegasus_thread_self());
 176                   _service_count--;
 177                   if (_service_count.value() == 0 )
 178                   {
 179 mday  1.53           _stop_polling++;
 180                      _polling_sem.signal();
 181 kumpf 1.62           _polling_thread->join();
 182                      delete _polling_thread;
 183 mday  1.15           _meta_dispatcher->_shutdown_routed_queue();
 184                      delete _meta_dispatcher;
 185 kumpf 1.45           _meta_dispatcher = 0;
 186 mday  1.64.2.2 
 187 mday  1.15        }
 188                   _meta_dispatcher_mutex.unlock();
 189 mday  1.53        _polling_list.remove(this);
 190                } 
 191 mday  1.1      
 192 mday  1.7      void MessageQueueService::_shutdown_incoming_queue(void)
 193                {
 194                   
 195 mday  1.9         if (_incoming_queue_shutdown.value() > 0 )
 196                      return ;
 197 mday  1.8         AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 198                				    0, 
 199                				    _queueId, 
 200                				    _queueId, 
 201                				    true, 
 202                				    AsyncIoctl::IO_CLOSE, 
 203                				    0, 
 204                				    0);
 205 mday  1.9      
 206 mday  1.8         msg->op = get_op();
 207 mday  1.41        msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 208                   msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 209                		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 210 mday  1.32        msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 211 mday  1.41     
 212 mday  1.32        msg->op->_op_dest = this;
 213 mday  1.8         msg->op->_request.insert_first(msg);
 214                   
 215                   _incoming.insert_last_wait(msg->op);
 216 mday  1.32     
 217 mday  1.53     //   _req_thread.join();
 218 mday  1.16        
 219 mday  1.7      }
 220                
 221 mday  1.22     
 222                
 223                void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 224                {
 225 kumpf 1.28        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 226                
 227 mday  1.22        Base::enqueue(msg);
 228 kumpf 1.28     
 229                   PEG_METHOD_EXIT();
 230 mday  1.22     }
 231                
 232                
 233 mday  1.39     PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 234                {
 235                   Thread *myself = reinterpret_cast<Thread *>(parm);
 236                   MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 237                   AsyncOpNode *operation = 0;
 238                   
 239                   while ( service->_die.value() == 0 ) 
 240                   {
 241                      service->_callback_ready.wait();
 242                      
 243                      service->_callback.lock();
 244                      operation = service->_callback.next(0);
 245                      while( operation != NULL)
 246                      {
 247                	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 248                	 {
 249                	    operation = service->_callback.remove_no_lock(operation);
 250                	    PEGASUS_ASSERT(operation != NULL);
 251                	    operation->_thread_ptr = myself;
 252                	    operation->_service_ptr = service;
 253                	    service->_handle_async_callback(operation);
 254 mday  1.39     	    break;
 255                	 }
 256                	 operation = service->_callback.next(operation);
 257                      }
 258                      service->_callback.unlock();
 259                   }
 260                   myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 261                   return(0);
 262                }
 263                
 264 mday  1.22     
 265 mday  1.5      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 266 mday  1.1      {
 267 mday  1.53        MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 268 mday  1.5         // pull messages off the incoming queue and dispatch them. then 
 269                   // check pending messages that are non-blocking
 270 mday  1.7         AsyncOpNode *operation = 0;
 271                   
 272 mday  1.56        if ( service->_die.value() == 0 ) 
 273                    {
 274 mday  1.41     	 try 
 275                	 {
 276 mday  1.53     	    operation = service->_incoming.remove_first();
 277 mday  1.41     	 }
 278                	 catch(ListClosed & )
 279                	 {
 280 mday  1.56     	    operation = 0;
 281                	    
 282                	    return(0);
 283 mday  1.41     	 }
 284                	 if( operation )
 285                	 {
 286                	    operation->_service_ptr = service;
 287                	    service->_handle_incoming_operation(operation);
 288                	 }
 289 mday  1.56         }
 290 mday  1.44     
 291 mday  1.5         return(0);
 292 mday  1.1      }
 293                
 294 mday  1.43     Uint32 MessageQueueService::get_pending_callback_count(void)
 295                {
 296                   return _callback.count();
 297                }
 298                
 299                
 300                
 301 mday  1.33     void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 302                					     MessageQueue *q, 
 303                					     void *parm)
 304                {
 305                   op->_client_sem.signal();
 306                }
 307                
 308 mday  1.30     
 309                // callback function is responsible for cleaning up all resources
 310                // including op, op->_callback_node, and op->_callback_ptr
 311 mday  1.22     void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 312                {
 313 mday  1.39        if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 314                   {
 315                
 316                      Message *msg = op->get_request();
 317                      if( msg && ( msg->getMask() & message_mask::ha_async))
 318                      {
 319                	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 320                	 {
 321                	    AsyncLegacyOperationStart *wrapper = 
 322                	       static_cast<AsyncLegacyOperationStart *>(msg);
 323                	    msg = wrapper->get_action();
 324                	    delete wrapper;
 325                	 }
 326                	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 327                	 {
 328                	    AsyncModuleOperationStart *wrapper = 
 329                	       static_cast<AsyncModuleOperationStart *>(msg);
 330                	    msg = wrapper->get_action();
 331                	    delete wrapper;
 332                	 }
 333                	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 334 mday  1.39     	 {
 335                	    AsyncOperationStart *wrapper = 
 336                	       static_cast<AsyncOperationStart *>(msg);
 337                	    msg = wrapper->get_action();
 338                	    delete wrapper;
 339                	 }
 340                	 delete msg;
 341                      }
 342                
 343                      msg = op->get_response();
 344                      if( msg && ( msg->getMask() & message_mask::ha_async))
 345                      {
 346                	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 347                	 {
 348                	    AsyncLegacyOperationResult *wrapper = 
 349                	       static_cast<AsyncLegacyOperationResult *>(msg);
 350                	    msg = wrapper->get_result();
 351                	    delete wrapper;
 352                	 }
 353                	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 354                	 {
 355 mday  1.39     	    AsyncModuleOperationResult *wrapper = 
 356                	       static_cast<AsyncModuleOperationResult *>(msg);
 357                	    msg = wrapper->get_result();
 358                	    delete wrapper;
 359                	 }
 360                      }
 361                      void (*callback)(Message *, void *, void *) = op->__async_callback;
 362                      void *handle = op->_callback_handle;
 363                      void *parm = op->_callback_parameter;
 364                      op->release();
 365                      return_op(op);
 366                      callback(msg, handle, parm);
 367                   }
 368                   else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 369                   {
 370                      // note that _callback_node may be different from op 
 371                      // op->_callback_response_q is a "this" pointer we can use for 
 372                      // static callback methods
 373                      op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 374                   }
 375 mday  1.22     }
 376                
 377 mday  1.1      
 378 mday  1.34     void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 379                //						     Thread *thread, 
 380                //						     MessageQueue *queue)
 381 mday  1.1      {
 382 mday  1.5         if ( operation != 0 )
 383                   {
 384 mday  1.29           
 385 mday  1.22     // ATTN: optimization 
 386                // << Tue Feb 19 14:10:38 2002 mdd >>
 387 mday  1.6            operation->lock();
 388 mday  1.29                 
 389 mday  1.6            Message *rq = operation->_request.next(0);
 390 mday  1.29          
 391 mday  1.31     // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 392                // move this to the bottom of the loop when the majority of 
 393                // messages become async messages. 
 394                
 395 mday  1.18           // divert legacy messages to handleEnqueue
 396 mday  1.29           if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 397 mday  1.18           {
 398                	 rq = operation->_request.remove_first() ;
 399                	 operation->unlock();
 400                	 // delete the op node 
 401 mday  1.39     	 operation->release();
 402                	 return_op( operation);
 403 mday  1.24     
 404 mday  1.26     	 handleEnqueue(rq);
 405 mday  1.18     	 return;
 406                      }
 407                
 408 mday  1.39           if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 409                	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 410 mday  1.29     	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 411                      {
 412 mday  1.49     	 
 413 mday  1.29     	 operation->unlock();
 414                	 _handle_async_callback(operation);
 415                      }
 416                      else 
 417                      {
 418                	 PEGASUS_ASSERT(rq != 0 );
 419                	 // ATTN: optimization
 420                	 // << Wed Mar  6 15:00:39 2002 mdd >>
 421                	 // put thread and queue into the asyncopnode structure. 
 422 mday  1.34              //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
 423                         //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
 424                	 // done << Tue Mar 12 14:49:07 2002 mdd >>
 425 mday  1.29     	 operation->unlock();
 426                	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 427                      }
 428 mday  1.5         }
 429                   return;
 430 mday  1.1      }
 431                
 432                void MessageQueueService::_handle_async_request(AsyncRequest *req)
 433                {
 434 mday  1.4         if ( req != 0 )
 435                   {
 436                      req->op->processing();
 437 mday  1.1         
 438 mday  1.4            Uint32 type = req->getType();
 439                      if( type == async_messages::HEARTBEAT )
 440                	 handle_heartbeat_request(req);
 441                      else if (type == async_messages::IOCTL)
 442                	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 443                      else if (type == async_messages::CIMSERVICE_START)
 444                	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 445                      else if (type == async_messages::CIMSERVICE_STOP)
 446                	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 447                      else if (type == async_messages::CIMSERVICE_PAUSE)
 448                	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 449                      else if (type == async_messages::CIMSERVICE_RESUME)
 450                	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 451                      else if ( type == async_messages::ASYNC_OP_START)
 452                	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 453                      else 
 454                      {
 455                	 // we don't handle this request message 
 456                	 _make_response(req, async_results::CIM_NAK );
 457                      }
 458 mday  1.1         }
 459                }
 460                
 461 mday  1.17     
 462                Boolean MessageQueueService::_enqueueResponse(
 463                   Message* request, 
 464                   Message* response)
 465 mday  1.18        
 466 mday  1.17     {
 467 kumpf 1.37        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 468                                    "MessageQueueService::_enqueueResponse");
 469 mday  1.25     
 470                   if( request->getMask() & message_mask::ha_async)
 471                   {
 472                      if (response->getMask() & message_mask::ha_async )
 473                      {
 474                	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 475                				static_cast<AsyncReply *>(response), 
 476                				ASYNC_OPSTATE_COMPLETE, 0 );
 477 kumpf 1.37              PEG_METHOD_EXIT();
 478 mday  1.25     	 return true;
 479                      }
 480                   }
 481                   
 482 mday  1.17        if(request->_async != 0 )
 483                   {
 484                      Uint32 mask = request->_async->getMask();
 485 mday  1.18           PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 486                      
 487                      AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 488                      AsyncOpNode *op = async->op;
 489                      request->_async = 0;
 490 mday  1.63           // the legacy request is going to be deleted by its handler
 491 mday  1.27           // remove it from the op node 
 492 mday  1.63     
 493                      static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 494 mday  1.18           
 495 mday  1.63                 
 496 mday  1.18           AsyncLegacyOperationResult *async_result = 
 497                	 new AsyncLegacyOperationResult( 
 498                	    async->getKey(),
 499                	    async->getRouting(),
 500                	    op,
 501                	    response);
 502                      _completeAsyncResponse(async,
 503                			     async_result,
 504                			     ASYNC_OPSTATE_COMPLETE, 
 505                			     0);
 506 kumpf 1.37           PEG_METHOD_EXIT();
 507 mday  1.18           return true;
 508 mday  1.17        }
 509 mday  1.18        
 510                   // ensure that the destination queue is in response->dest
 511 kumpf 1.37        PEG_METHOD_EXIT();
 512 mday  1.24        return SendForget(response);
 513 mday  1.18        
 514 mday  1.17     }
 515                
 516 mday  1.18     void MessageQueueService::_make_response(Message *req, Uint32 code)
 517 mday  1.1      {
 518 mday  1.19        cimom::_make_response(req, code);
 519 mday  1.1      }
 520                
 521                
 522 mday  1.5      void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 523                						AsyncReply *reply, 
 524                						Uint32 state, 
 525                						Uint32 flag)
 526                {
 527 kumpf 1.37        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 528                                    "MessageQueueService::_completeAsyncResponse");
 529                
 530 mday  1.19        cimom::_completeAsyncResponse(request, reply, state, flag);
 531 kumpf 1.37     
 532                   PEG_METHOD_EXIT();
 533 mday  1.5      }
 534                
 535                
 536 mday  1.32     void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 537                					    Uint32 state, 
 538                					    Uint32 flag, 
 539                					    Uint32 code)
 540                {
 541                   cimom::_complete_op_node(op, state, flag, code);
 542                }
 543                
 544 mday  1.5      
 545                Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 546                {
 547 mday  1.8         if (_incoming_queue_shutdown.value() > 0 )
 548                      return false;
 549                   
 550 mday  1.20     // ATTN optimization remove the message checking altogether in the base 
 551                // << Mon Feb 18 14:02:20 2002 mdd >>
 552 mday  1.64.2.2    op->lock();
 553 mday  1.6         Message *rq = op->_request.next(0);
 554 mday  1.20        Message *rp = op->_response.next(0);
 555 mday  1.64.2.2    op->unlock();
 556 mday  1.6         
 557 mday  1.22        if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 558                	_die.value() == 0  )
 559 mday  1.5         {
 560 mday  1.6            _incoming.insert_last_wait(op);
 561 mday  1.53           _polling_sem.signal();
 562 mday  1.5            return true;
 563                   }
 564 mday  1.32     //    else
 565                //    {
 566                //       if(  (rq != 0 && (true == MessageQueueService::messageOK(rq))) || 
 567                // 	   (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&  
 568                // 	   _die.value() == 0)
 569                //       {
 570                // 	 MessageQueueService::_incoming.insert_last_wait(op);
 571                // 	 return true;
 572                //       }
 573                //    }
 574                   
 575 mday  1.5         return false;
 576                }
 577                
 578                Boolean MessageQueueService::messageOK(const Message *msg)
 579                {
 580 mday  1.8         if (_incoming_queue_shutdown.value() > 0 )
 581                      return false;
 582 mday  1.18        return true;
 583                }
 584                
 585 mday  1.29     
 586                // made pure virtual 
 587                // << Wed Mar  6 15:11:31 2002 mdd >> 
 588 mday  1.25     // void MessageQueueService::handleEnqueue(Message *msg)
 589                // {
 590                //    if ( msg )
 591                //       delete msg;
 592                // }
 593 mday  1.5      
 594 mday  1.29     // made pure virtual 
 595                // << Wed Mar  6 15:11:56 2002 mdd >>
 596 mday  1.25     // void MessageQueueService::handleEnqueue(void)
 597                // {
 598                //     Message *msg = dequeue();
 599                //     handleEnqueue(msg);
 600                // }
 601 mday  1.5      
 602 mday  1.1      void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 603                {
 604                   // default action is to echo a heartbeat response 
 605                   
 606                   AsyncReply *reply = 
 607                      new AsyncReply(async_messages::HEARTBEAT,
 608                		     req->getKey(),
 609                		     req->getRouting(),
 610                		     0,
 611                		     req->op, 
 612                		     async_results::OK, 
 613                		     req->resp,
 614                		     false);
 615 mday  1.4         _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 616 mday  1.1      }
 617                
 618                
 619                void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 620                { 
 621                   ;
 622                }
 623                      
 624                void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 625                {
 626 mday  1.8         
 627                   switch( req->ctl )
 628                   {
 629                      case AsyncIoctl::IO_CLOSE:
 630                      {
 631                	 // save my bearings 
 632 mday  1.53     //	 Thread *myself = req->op->_thread_ptr;
 633 mday  1.34     	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 634 mday  1.8      	 
 635                	 // respond to this message.
 636                	 _make_response(req, async_results::OK);
 637                	 // ensure we do not accept any further messages
 638                
 639                	 // ensure we don't recurse on IO_CLOSE
 640                	 if( _incoming_queue_shutdown.value() > 0 )
 641                	    break;
 642                	 
 643                	 // set the closing flag 
 644                	 service->_incoming_queue_shutdown = 1;
 645                	 // empty out the queue
 646                	 while( 1 )
 647                	 {
 648                	    AsyncOpNode *operation;
 649                	    try 
 650                	    {
 651                	       operation = service->_incoming.remove_first();
 652                	    }
 653                	    catch(IPCException & )
 654                	    {
 655 mday  1.8      	       break;
 656                	    }
 657                	    if( operation )
 658                	    {
 659 mday  1.53     //	       operation->_thread_ptr = myself;
 660 mday  1.34     	       operation->_service_ptr = service;
 661                	       service->_handle_incoming_operation(operation);
 662 mday  1.8      	    }
 663                	    else
 664                	       break;
 665                	 } // message processing loop
 666                
 667                	 // shutdown the AsyncDQueue
 668                	 service->_incoming.shutdown_queue();
 669                	 // exit the thread ! 
 670 mday  1.53     //	 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 671 mday  1.8      	 return;
 672                      }
 673                
 674                      default:
 675                	 _make_response(req, async_results::CIM_NAK);
 676                   }
 677 mday  1.1      }
 678 mday  1.8      
 679 mday  1.1      void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 680                {
 681 mday  1.10        // clear the stoped bit and update
 682 mday  1.13        _capabilities &= (~(module_capabilities::stopped));
 683 mday  1.10        _make_response(req, async_results::OK);
 684                   // now tell the meta dispatcher we are stopped 
 685                   update_service(_capabilities, _mask);
 686                
 687 mday  1.1      }
 688                void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 689                {
 690 mday  1.10        // set the stopeed bit and update
 691                   _capabilities |= module_capabilities::stopped;
 692                   _make_response(req, async_results::CIM_STOPPED);
 693                   // now tell the meta dispatcher we are stopped 
 694                   update_service(_capabilities, _mask);
 695                   
 696 mday  1.1      }
 697                void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 698                {
 699 mday  1.10        // set the paused bit and update
 700 mday  1.13        _capabilities |= module_capabilities::paused;
 701 mday  1.11        update_service(_capabilities, _mask);
 702 mday  1.10        _make_response(req, async_results::CIM_PAUSED);
 703                   // now tell the meta dispatcher we are stopped 
 704 mday  1.1      }
 705                void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 706                {
 707 mday  1.10        // clear the paused  bit and update
 708 mday  1.13        _capabilities &= (~(module_capabilities::paused));
 709 mday  1.11        update_service(_capabilities, _mask);
 710 mday  1.10        _make_response(req, async_results::OK);
 711                   // now tell the meta dispatcher we are stopped 
 712 mday  1.1      }
 713                      
 714                void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 715                {
 716                   _make_response(req, async_results::CIM_NAK);
 717                }
 718                
 719                void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 720                {
 721 mday  1.14        ;
 722                }
 723                
 724 mday  1.10     
 725 mday  1.14     void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 726                {
 727                   // remove the legacy message from the request and enqueue it to its destination
 728                   Uint32 result = async_results::CIM_NAK;
 729                   
 730 mday  1.25        Message *legacy = req->_act;
 731 mday  1.14        if ( legacy != 0 )
 732                   {
 733 mday  1.25           MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 734 mday  1.14           if( queue != 0 )
 735                      {
 736 mday  1.25     	 if(queue->isAsync() == true )
 737                	 {
 738                	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 739                	 }
 740                	 else 
 741                	 {
 742                	    // Enqueue the response:
 743                	    queue->enqueue(req->get_action());
 744                	 }
 745                	 
 746 mday  1.14     	 result = async_results::OK;
 747                      }
 748                   }
 749                   _make_response(req, result);
 750                }
 751                
 752                void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 753                {
 754                   ;
 755 mday  1.1      }
 756                
 757                AsyncOpNode *MessageQueueService::get_op(void)
 758                {
 759 mday  1.4         AsyncOpNode *op = new AsyncOpNode();
 760 mday  1.1         
 761 mday  1.9         op->_state = ASYNC_OPSTATE_UNKNOWN;
 762                   op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 763 mday  1.4         
 764 mday  1.1         return op;
 765                }
 766                
 767                void MessageQueueService::return_op(AsyncOpNode *op)
 768                {
 769                   PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 770 mday  1.4         delete op;
 771 mday  1.1      }
 772                
 773 mday  1.18     
 774 mday  1.29     Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 775                				       Uint32 destination)
 776                {
 777                   PEGASUS_ASSERT(op != 0 );
 778 mday  1.30        op->lock();
 779                   op->_op_dest = MessageQueue::lookup(destination);
 780 mday  1.29        op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 781                   op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 782 mday  1.30        op->unlock();
 783                   if ( op->_op_dest == 0 )
 784                      return false;
 785                      
 786 mday  1.29        return  _meta_dispatcher->route_async(op);
 787                }
 788                
 789 mday  1.39      
 790 mday  1.21     Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 791                				       Uint32 destination,
 792 mday  1.18     				       void (*callback)(AsyncOpNode *, 
 793 mday  1.32     							MessageQueue *,
 794 mday  1.30     							void *),
 795                				       MessageQueue *callback_response_q,
 796                				       void *callback_ptr)
 797 mday  1.20     { 
 798 mday  1.21        PEGASUS_ASSERT(op != 0 && callback != 0 );
 799 mday  1.18        
 800 mday  1.21        // get the queue handle for the destination
 801                
 802 mday  1.30        op->lock();
 803 mday  1.32        op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 804 mday  1.22        op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 805                   op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 806                   op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 807 mday  1.30        // initialize the callback data
 808 mday  1.32        op->_async_callback = callback;   // callback function to be executed by recpt. of response
 809                   op->_callback_node = op;          // the op node
 810                   op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 811                   op->_callback_ptr = callback_ptr;   // user data for callback
 812                   op->_callback_request_q = this;     // I am the originator of this request
 813 mday  1.30        
 814                   op->unlock();
 815                   if(op->_op_dest == 0) 
 816                      return false;
 817 mday  1.21        
 818                   return  _meta_dispatcher->route_async(op);
 819 mday  1.18     }
 820                
 821                
 822 mday  1.39     Boolean MessageQueueService::SendAsync(Message *msg, 
 823                				       Uint32 destination,
 824                				       void (*callback)(Message *response, 
 825                							void *handle, 
 826                							void *parameter),
 827                				       void *handle, 
 828                				       void *parameter)
 829                {
 830                   if(msg == NULL)
 831                      return false;
 832                   if(callback == NULL)
 833                      return SendForget(msg);
 834                   AsyncOpNode *op = get_op();
 835 mday  1.43        msg->dest = destination;
 836 mday  1.39        if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 837                   {
 838                      op->release();
 839                      return_op(op);
 840                      return false;
 841                   }
 842                   op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 843                   op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 844                   op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 845                   op->__async_callback = callback;
 846                   op->_callback_node = op;
 847                   op->_callback_handle = handle;
 848                   op->_callback_parameter = parameter;
 849                   op->_callback_response_q = this;
 850                   
 851                
 852                   if( ! (msg->getMask() & message_mask::ha_async) )
 853                   {
 854                      AsyncLegacyOperationStart *wrapper = 
 855                	 new AsyncLegacyOperationStart(get_next_xid(),
 856                				       op, 
 857 mday  1.39     				       destination, 
 858                				       msg, 
 859                				       destination);
 860                   }
 861                   else 
 862                   {
 863                      op->_request.insert_first(msg);
 864                      (static_cast<AsyncMessage *>(msg))->op = op;
 865                   }
 866                   
 867                   _callback.insert_last(op);
 868                   return _meta_dispatcher->route_async(op);
 869                }
 870                
 871                
 872 mday  1.18     Boolean MessageQueueService::SendForget(Message *msg)
 873                {
 874                
 875 mday  1.24        
 876 mday  1.18        AsyncOpNode *op = 0;
 877 mday  1.22        Uint32 mask = msg->getMask();
 878                   
 879                   if (mask & message_mask::ha_async)
 880 mday  1.18        {
 881                      op = (static_cast<AsyncMessage *>(msg))->op ;
 882                   }
 883 mday  1.22     
 884 mday  1.18        if( op == 0 )
 885 mday  1.20        {
 886 mday  1.18           op = get_op();
 887 mday  1.20           op->_request.insert_first(msg);
 888 mday  1.22           if (mask & message_mask::ha_async)
 889                	 (static_cast<AsyncMessage *>(msg))->op = op;
 890 mday  1.20        }
 891 mday  1.30        op->_op_dest = MessageQueue::lookup(msg->dest);
 892 mday  1.22        op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 893 mday  1.41        op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 894                		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 895 mday  1.22        op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 896 mday  1.30        if ( op->_op_dest == 0 )
 897 mday  1.39        {
 898                      op->release();
 899                      return_op(op);
 900 mday  1.21           return false;
 901 mday  1.39        }
 902 mday  1.24        
 903 mday  1.18        // now see if the meta dispatcher will take it
 904 mday  1.30        return  _meta_dispatcher->route_async(op);
 905 mday  1.18     }
 906 mday  1.2      
 907 mday  1.1      
 908 mday  1.4      AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 909 mday  1.1      {
 910 mday  1.4         if ( request == 0 )
 911                      return 0 ;
 912 mday  1.5      
 913                   Boolean destroy_op = false;
 914                   
 915                   if (request->op == false)
 916                   {
 917                      request->op = get_op();
 918 mday  1.7            request->op->_request.insert_first(request);
 919 mday  1.5            destroy_op = true;
 920                   }
 921 mday  1.4         
 922 mday  1.33        request->block = false;
 923 mday  1.35        request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 924 mday  1.33        SendAsync(request->op, 
 925                	     request->dest,
 926 mday  1.36      	     _sendwait_callback,
 927 mday  1.33     	     this,
 928                	     (void *)0);
 929 mday  1.4         
 930 mday  1.33        request->op->_client_sem.wait();
 931 mday  1.6         request->op->lock();
 932                   AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 933                   rpl->op = 0;
 934                   request->op->unlock();
 935                   
 936 mday  1.5         if( destroy_op == true)
 937                   {
 938 mday  1.6            request->op->lock();
 939                      request->op->_request.remove(request);
 940                      request->op->_state |= ASYNC_OPSTATE_RELEASED;
 941                      request->op->unlock();
 942                      return_op(request->op);
 943 mday  1.7            request->op = 0;
 944 mday  1.5         }
 945                   return rpl;
 946 mday  1.1      }
 947                
 948                
 949                Boolean MessageQueueService::register_service(String name, 
 950                					      Uint32 capabilities, 
 951                					      Uint32 mask)
 952                
 953                {
 954                   RegisterCimService *msg = new RegisterCimService(get_next_xid(),
 955 mday  1.5      						    0, 
 956 mday  1.1      						    true, 
 957                						    name, 
 958                						    capabilities, 
 959                						    mask,
 960                						    _queueId);
 961 mday  1.44        msg->dest = CIMOM_Q_ID;
 962                   
 963 mday  1.1         Boolean registered = false;
 964 mday  1.7         AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
 965 mday  1.1         
 966 mday  1.2         if ( reply != 0 )
 967 mday  1.1         {
 968                      if(reply->getMask() & message_mask:: ha_async)
 969                      {
 970                	 if(reply->getMask() & message_mask::ha_reply)
 971                	 {
 972 mday  1.15     	    if(reply->result == async_results::OK || 
 973                	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
 974 mday  1.1      	       registered = true;
 975                	 }
 976                      }
 977                      
 978 mday  1.7            delete reply; 
 979 mday  1.1         }
 980 mday  1.5         delete msg;
 981 mday  1.1         return registered;
 982                }
 983                
 984                Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 985                {
 986                   
 987                   
 988                   UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
 989 mday  1.5      						0, 
 990 mday  1.1      						true, 
 991                						_queueId,
 992                						_capabilities, 
 993                						_mask);
 994                   Boolean registered = false;
 995 mday  1.2      
 996                   AsyncMessage *reply = SendWait(msg);
 997                   if (reply)
 998 mday  1.1         {
 999                      if(reply->getMask() & message_mask:: ha_async)
1000                      {
1001                	 if(reply->getMask() & message_mask::ha_reply)
1002                	 {
1003 mday  1.2      	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
1004 mday  1.1      	       registered = true;
1005                	 }
1006                      }
1007                      delete reply;
1008                   }
1009 mday  1.5         delete msg;
1010 mday  1.1         return registered;
1011                }
1012                
1013                
1014                Boolean MessageQueueService::deregister_service(void)
1015                {
1016 mday  1.3      
1017 mday  1.5         _meta_dispatcher->deregister_module(_queueId);
1018                   return true;
1019 mday  1.1      }
1020                
1021                
1022                void MessageQueueService::find_services(String name, 
1023                					Uint32 capabilities, 
1024                					Uint32 mask, 
1025                					Array<Uint32> *results)
1026                {
1027                   
1028                   if( results == 0 )
1029                      throw NullPointer();
1030 mday  1.5          
1031 mday  1.1         results->clear();
1032                   
1033                   FindServiceQueue *req = 
1034                      new FindServiceQueue(get_next_xid(), 
1035 mday  1.5      			   0, 
1036 mday  1.1      			   _queueId, 
1037                			   true, 
1038                			   name, 
1039                			   capabilities, 
1040                			   mask);
1041 mday  1.44        
1042                   req->dest = CIMOM_Q_ID;
1043 mday  1.1         
1044 mday  1.2         AsyncMessage *reply = SendWait(req); 
1045                   if(reply)
1046 mday  1.1         {
1047                      if( reply->getMask() & message_mask::ha_async)
1048                      {
1049                	 if(reply->getMask() & message_mask::ha_reply)
1050                	 {
1051                	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1052                	    {
1053                	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1054                		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1055                	    }
1056                	 }
1057                      }
1058                      delete reply;
1059                   }
1060 mday  1.5         delete req;
1061 mday  1.1         return ;
1062                }
1063                
1064                void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1065                {
1066                   if(result == 0)
1067                      throw NullPointer();
1068                   
1069                   EnumerateService *req 
1070                      = new EnumerateService(get_next_xid(),
1071 mday  1.5      			     0, 
1072 mday  1.1      			     _queueId, 
1073                			     true, 
1074                			     queue);
1075                   
1076 mday  1.2         AsyncMessage *reply = SendWait(req);
1077 mday  1.1         
1078 mday  1.2         if (reply)
1079 mday  1.1         {
1080                      Boolean found = false;
1081                      
1082                      if( reply->getMask() & message_mask::ha_async)
1083                      {
1084                	 if(reply->getMask() & message_mask::ha_reply)
1085                	 {
1086                	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1087                	    {
1088                	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1089                	       {
1090                		  if( found == false)
1091                		  {
1092                		     found = true;
1093                		     
1094                		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1095                		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1096                		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1097                		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1098                		  }
1099                	       }
1100 mday  1.1      	    }
1101                	 }
1102                      }
1103                      delete reply;
1104                   }
1105 mday  1.5         delete req;
1106                   
1107 mday  1.1         return;
1108                }
1109                
1110                Uint32 MessageQueueService::get_next_xid(void)
1111                {
1112                   _xid++;
1113                   return _xid.value();
1114                }
1115                
1116                PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2