(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.82 //%2003////////////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 karl  1.82 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
   4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
   6            // IBM Corp.; EMC Corporation, The Open Group.
   7 mday  1.1  //
   8            // Permission is hereby granted, free of charge, to any person obtaining a copy
   9            // of this software and associated documentation files (the "Software"), to
  10            // deal in the Software without restriction, including without limitation the
  11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  12            // sell copies of the Software, and to permit persons to whom the Software is
  13            // furnished to do so, subject to the following conditions:
  14 karl  1.82 // 
  15 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  16            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  18            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  21            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23            //
  24            //==============================================================================
  25            //
  26            // Author: Mike Day (mdday@us.ibm.com)
  27            //
  28            // Modified By:
  29            //
  30            //%/////////////////////////////////////////////////////////////////////////////
  31            
  32            #include "MessageQueueService.h"
  33 mday  1.22 #include <Pegasus/Common/Tracer.h>
  34 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
  35 mday     1.1  
  36               PEGASUS_NAMESPACE_BEGIN
  37               
  38 mday     1.15  
  39               cimom *MessageQueueService::_meta_dispatcher = 0;
  40               AtomicInt MessageQueueService::_service_count = 0;
  41               AtomicInt MessageQueueService::_xid(1);
  42 mday     1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  43 mday     1.15 
  44 mday     1.58 static struct timeval create_time = {0, 1};
  45 mday     1.70 static struct timeval destroy_time = {300, 0}; 
  46 mday     1.59 static struct timeval deadlock_time = {0, 0};
  47 mday     1.53 
  48 mday     1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
  49 mday     1.53 
  50               DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  51               
  52 kumpf    1.62 Thread* MessageQueueService::_polling_thread = 0;
  53 mday     1.61 
  54 mday     1.69 ThreadPool *MessageQueueService::get_thread_pool(void)
  55               {
  56                  return _thread_pool;
  57               }
  58               
  59               void unload_idle_providers(void);
  60 mday     1.61 
  61 mday     1.69 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
  62 mday     1.53 {
  63 mday     1.69 
  64                  static struct timeval now, last = {0,0};
  65 mday     1.53    gettimeofday(&now, NULL);
  66 mday     1.56    int dead_threads = 0;
  67 mday     1.53    
  68 mday     1.70    if( now.tv_sec - last.tv_sec > 120 )
  69 mday     1.53    {
  70                     gettimeofday(&last, NULL);
  71 mday     1.56       try 
  72                     {
  73 mday     1.69 	 dead_threads =  MessageQueueService::_thread_pool->kill_dead_threads();
  74 mday     1.56       }
  75 mday     1.69       catch(...)
  76 mday     1.56       {
  77               
  78                     }
  79 mday     1.53    }
  80 mday     1.69    exit_thread((PEGASUS_THREAD_RETURN)dead_threads);
  81                  return (PEGASUS_THREAD_RETURN)dead_threads;
  82 mday     1.53 }
  83               
  84 mday     1.68 
  85 mday     1.76 void MessageQueueService::force_shutdown(Boolean destroy_flag)
  86 mday     1.68 {
  87 mday     1.79    return;
  88                  
  89 kumpf    1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
  90 humberto 1.71 	//l10n
  91                  MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
  92 humberto 1.74 			    "Forcing shutdown of CIMOM Message Router");
  93 humberto 1.71    PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
  94 kumpf    1.75 #endif
  95 mday     1.76 
  96 kumpf    1.75 
  97 mday     1.68    MessageQueueService *svc;
  98 konrad.r 1.73    int counter = 0;   
  99 mday     1.68    _polling_list.lock();
 100                  svc = _polling_list.next(0);
 101 humberto 1.71    
 102 mday     1.68    while(svc != 0)
 103                  {
 104 kumpf    1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
 105 humberto 1.71    		//l10n - reuse same MessageLoaderParms to avoid multiple creates
 106                     	parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";
 107                  	  	parms.default_msg = "Stopping $0"; 
 108                  	  	parms.arg0 = svc->getQueueName();
 109                  	  	PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
 110 kumpf    1.75 #endif
 111 humberto 1.71       							
 112 mday     1.68       _polling_sem.signal();
 113                     svc->_shutdown_incoming_queue();
 114 konrad.r 1.73       counter++;
 115 mday     1.68       _polling_sem.signal();
 116                     svc = _polling_list.next(svc);
 117                  }
 118                  _polling_list.unlock();
 119 konrad.r 1.73 
 120                  _polling_sem.signal();
 121               
 122 mday     1.76    MessageQueueService::_stop_polling = 1;
 123                  
 124                  if(destroy_flag == true) 
 125                  {
 126               
 127                     svc = _polling_list.remove_last();
 128                     while(svc)
 129                     {
 130               	 delete svc;
 131               	 svc = _polling_list.remove_last();
 132                     }
 133                     
 134 konrad.r 1.73    }
 135 mday     1.68 }
 136               
 137               
 138 mday     1.53 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 139               {
 140                  Thread *myself = reinterpret_cast<Thread *>(parm);
 141                  DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
 142                  while ( _stop_polling.value()  == 0 ) 
 143                  {
 144 mday     1.69       _polling_sem.wait();
 145 mday     1.61       if(_stop_polling.value() != 0 )
 146                     {
 147 kumpf    1.62          break;
 148 mday     1.61       }
 149                     
 150 mday     1.53       list->lock();
 151                     MessageQueueService *service = list->next(0);
 152                     while(service != NULL)
 153                     {
 154               	 if(service->_incoming.count() > 0 )
 155               	 {
 156 mday     1.55 	    _thread_pool->allocate_and_awaken(service, _req_proc);
 157 mday     1.53 	 }
 158               	 service = list->next(service);
 159                     }
 160                     list->unlock();
 161 mday     1.69       if(_check_idle_flag.value() != 0 )
 162                     {
 163               	 _check_idle_flag = 0;
 164               	 Thread th(kill_idle_threads, 0, true);
 165               	 th.run();
 166                     }
 167 mday     1.53    }
 168                  myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 169                  return(0);
 170               }
 171               
 172               
 173               Semaphore MessageQueueService::_polling_sem(0);
 174               AtomicInt MessageQueueService::_stop_polling(0);
 175 mday     1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
 176 mday     1.53 
 177 mday     1.15 
 178 mday     1.1  MessageQueueService::MessageQueueService(const char *name, 
 179               					 Uint32 queueID, 
 180               					 Uint32 capabilities, 
 181 mday     1.82.6.1 					 Uint32 mask, 
 182                   					 int threads) 
 183 mday     1.15        : Base(name, true,  queueID),
 184 mday     1.22          
 185 mday     1.1           _mask(mask),
 186 mday     1.4           _die(0),
 187 mday     1.41          _incoming(true, 0),
 188 mday     1.39          _callback(true),
 189 mday     1.7           _incoming_queue_shutdown(0),
 190 mday     1.39          _callback_ready(0),
 191                        _req_thread(_req_proc, this, false),
 192                        _callback_thread(_callback_proc, this, false)
 193 mday     1.53     
 194 mday     1.1      { 
 195 mday     1.60     
 196 mday     1.22        _capabilities = (capabilities | module_capabilities::async);
 197                      
 198 mday     1.1         _default_op_timeout.tv_sec = 30;
 199                      _default_op_timeout.tv_usec = 100;
 200 mday     1.15     
 201                      _meta_dispatcher_mutex.lock(pegasus_thread_self());
 202                      
 203                      if( _meta_dispatcher == 0 )
 204                      {
 205                         PEGASUS_ASSERT( _service_count.value() == 0 );
 206                         _meta_dispatcher = new cimom();
 207                         if (_meta_dispatcher == NULL )
 208                         {
 209                   	 _meta_dispatcher_mutex.unlock();
 210                   	 throw NullPointer();
 211                         }
 212 mday     1.82.6.1       _thread_pool = new ThreadPool(0, "MessageQueueService", 0, threads,
 213 mday     1.55     				    create_time, destroy_time, deadlock_time);  
 214 mday     1.61           
 215 kumpf    1.62           _polling_thread = new Thread(polling_routine, 
 216                   			           reinterpret_cast<void *>(&_polling_list), 
 217                   			           false);
 218                         _polling_thread->run();
 219 mday     1.15        }
 220                      _service_count++;
 221                   
 222                      if( false == register_service(name, _capabilities, _mask) )
 223                      {
 224                         _meta_dispatcher_mutex.unlock();
 225 humberto 1.71           //l10n
 226                         //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 227                         MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 228 humberto 1.74     			       "MessageQueueService Base Unable to register with  Meta Dispatcher");
 229 humberto 1.71           						   
 230                         throw BindFailedException(parms);
 231 mday     1.15        }
 232                      
 233 mday     1.53        _polling_list.insert_last(this);
 234                      
 235 mday     1.15        _meta_dispatcher_mutex.unlock();
 236 mday     1.39     //   _callback_thread.run();
 237                      
 238 mday     1.53     //   _req_thread.run();
 239 mday     1.1      }
 240                   
 241 mday     1.4      
 242 mday     1.1      MessageQueueService::~MessageQueueService(void)
 243                   {
 244                      _die = 1;
 245 mday     1.76     
 246                      if (_incoming_queue_shutdown.value() == 0 )
 247                      {
 248                         _shutdown_incoming_queue();
 249                      }
 250 mday     1.39        _callback_ready.signal();
 251                      
 252 mday     1.15        _meta_dispatcher_mutex.lock(pegasus_thread_self());
 253                      _service_count--;
 254                      if (_service_count.value() == 0 )
 255                      {
 256 mday     1.76     
 257 mday     1.53           _stop_polling++;
 258                         _polling_sem.signal();
 259 kumpf    1.62           _polling_thread->join();
 260                         delete _polling_thread;
 261 kumpf    1.65           _polling_thread = 0;
 262 mday     1.15           _meta_dispatcher->_shutdown_routed_queue();
 263                         delete _meta_dispatcher;
 264 kumpf    1.45           _meta_dispatcher = 0;
 265 mday     1.76     
 266 kumpf    1.65           delete _thread_pool;
 267                         _thread_pool = 0;
 268 mday     1.15        }
 269                      _meta_dispatcher_mutex.unlock();
 270 mday     1.53        _polling_list.remove(this);
 271 konrad.r 1.72        // Clean up in case there are extra stuff on the queue. 
 272                     while (_incoming.count())
 273                     {
 274                   	delete _incoming.remove_first();
 275                     }
 276 mday     1.53     } 
 277 mday     1.1      
 278 mday     1.7      void MessageQueueService::_shutdown_incoming_queue(void)
 279                   {
 280                      
 281 mday     1.76        
 282 mday     1.9         if (_incoming_queue_shutdown.value() > 0 )
 283                         return ;
 284 mday     1.8         AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 285                   				    0, 
 286                   				    _queueId, 
 287                   				    _queueId, 
 288                   				    true, 
 289                   				    AsyncIoctl::IO_CLOSE, 
 290                   				    0, 
 291                   				    0);
 292 mday     1.9      
 293 mday     1.8         msg->op = get_op();
 294 mday     1.41        msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 295                      msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 296                   		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 297 mday     1.32        msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 298 mday     1.41     
 299 mday     1.32        msg->op->_op_dest = this;
 300 mday     1.8         msg->op->_request.insert_first(msg);
 301                      
 302                      _incoming.insert_last_wait(msg->op);
 303 mday     1.32     
 304 mday     1.7      }
 305                   
 306 mday     1.22     
 307                   
 308                   void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 309                   {
 310 kumpf    1.28        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 311                   
 312 mday     1.22        Base::enqueue(msg);
 313 kumpf    1.28     
 314                      PEG_METHOD_EXIT();
 315 mday     1.22     }
 316                   
 317                   
 318 mday     1.39     PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 319                   {
 320                      Thread *myself = reinterpret_cast<Thread *>(parm);
 321                      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 322                      AsyncOpNode *operation = 0;
 323                      
 324                      while ( service->_die.value() == 0 ) 
 325                      {
 326                         service->_callback_ready.wait();
 327                         
 328                         service->_callback.lock();
 329                         operation = service->_callback.next(0);
 330                         while( operation != NULL)
 331                         {
 332                   	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 333                   	 {
 334                   	    operation = service->_callback.remove_no_lock(operation);
 335                   	    PEGASUS_ASSERT(operation != NULL);
 336                   	    operation->_thread_ptr = myself;
 337                   	    operation->_service_ptr = service;
 338                   	    service->_handle_async_callback(operation);
 339 mday     1.39     	    break;
 340                   	 }
 341                   	 operation = service->_callback.next(operation);
 342                         }
 343                         service->_callback.unlock();
 344                      }
 345                      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 346                      return(0);
 347                   }
 348                   
 349 mday     1.22     
 350 mday     1.5      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 351 mday     1.1      {
 352 mday     1.53        MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 353 mday     1.5         // pull messages off the incoming queue and dispatch them. then 
 354                      // check pending messages that are non-blocking
 355 mday     1.7         AsyncOpNode *operation = 0;
 356                      
 357 mday     1.56        if ( service->_die.value() == 0 ) 
 358                       {
 359 mday     1.41     	 try 
 360                   	 {
 361 mday     1.53     	    operation = service->_incoming.remove_first();
 362 mday     1.41     	 }
 363                   	 catch(ListClosed & )
 364                   	 {
 365 mday     1.56     	    operation = 0;
 366                   	    
 367                   	    return(0);
 368 mday     1.41     	 }
 369                   	 if( operation )
 370                   	 {
 371                   	    operation->_service_ptr = service;
 372                   	    service->_handle_incoming_operation(operation);
 373                   	 }
 374 mday     1.56         }
 375 mday     1.44     
 376 mday     1.5         return(0);
 377 mday     1.1      }
 378                   
 379 mday     1.43     Uint32 MessageQueueService::get_pending_callback_count(void)
 380                   {
 381                      return _callback.count();
 382                   }
 383                   
 384                   
 385                   
 386 mday     1.33     void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 387                   					     MessageQueue *q, 
 388                   					     void *parm)
 389                   {
 390                      op->_client_sem.signal();
 391                   }
 392                   
 393 mday     1.30     
 394                   // callback function is responsible for cleaning up all resources
 395                   // including op, op->_callback_node, and op->_callback_ptr
 396 mday     1.22     void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 397                   {
 398 mday     1.39        if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 399                      {
 400                   
 401                         Message *msg = op->get_request();
 402                         if( msg && ( msg->getMask() & message_mask::ha_async))
 403                         {
 404                   	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 405                   	 {
 406                   	    AsyncLegacyOperationStart *wrapper = 
 407                   	       static_cast<AsyncLegacyOperationStart *>(msg);
 408                   	    msg = wrapper->get_action();
 409                   	    delete wrapper;
 410                   	 }
 411                   	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 412                   	 {
 413                   	    AsyncModuleOperationStart *wrapper = 
 414                   	       static_cast<AsyncModuleOperationStart *>(msg);
 415                   	    msg = wrapper->get_action();
 416                   	    delete wrapper;
 417                   	 }
 418                   	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 419 mday     1.39     	 {
 420                   	    AsyncOperationStart *wrapper = 
 421                   	       static_cast<AsyncOperationStart *>(msg);
 422                   	    msg = wrapper->get_action();
 423                   	    delete wrapper;
 424                   	 }
 425                   	 delete msg;
 426                         }
 427                   
 428                         msg = op->get_response();
 429                         if( msg && ( msg->getMask() & message_mask::ha_async))
 430                         {
 431                   	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 432                   	 {
 433                   	    AsyncLegacyOperationResult *wrapper = 
 434                   	       static_cast<AsyncLegacyOperationResult *>(msg);
 435                   	    msg = wrapper->get_result();
 436                   	    delete wrapper;
 437                   	 }
 438                   	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 439                   	 {
 440 mday     1.39     	    AsyncModuleOperationResult *wrapper = 
 441                   	       static_cast<AsyncModuleOperationResult *>(msg);
 442                   	    msg = wrapper->get_result();
 443                   	    delete wrapper;
 444                   	 }
 445                         }
 446                         void (*callback)(Message *, void *, void *) = op->__async_callback;
 447                         void *handle = op->_callback_handle;
 448                         void *parm = op->_callback_parameter;
 449                         op->release();
 450                         return_op(op);
 451                         callback(msg, handle, parm);
 452                      }
 453                      else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 454                      {
 455                         // note that _callback_node may be different from op 
 456                         // op->_callback_response_q is a "this" pointer we can use for 
 457                         // static callback methods
 458                         op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 459                      }
 460 mday     1.22     }
 461                   
 462 mday     1.1      
 463 mday     1.34     void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 464                   //						     Thread *thread, 
 465                   //						     MessageQueue *queue)
 466 mday     1.1      {
 467 mday     1.5         if ( operation != 0 )
 468                      {
 469 mday     1.29           
 470 mday     1.22     // ATTN: optimization 
 471                   // << Tue Feb 19 14:10:38 2002 mdd >>
 472 mday     1.6            operation->lock();
 473 mday     1.29                 
 474 mday     1.6            Message *rq = operation->_request.next(0);
 475 mday     1.29          
 476 mday     1.31     // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 477                   // move this to the bottom of the loop when the majority of 
 478                   // messages become async messages. 
 479                   
 480 mday     1.18           // divert legacy messages to handleEnqueue
 481 mday     1.29           if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 482 mday     1.18           {
 483                   	 rq = operation->_request.remove_first() ;
 484                   	 operation->unlock();
 485                   	 // delete the op node 
 486 mday     1.39     	 operation->release();
 487                   	 return_op( operation);
 488 mday     1.24     
 489 mday     1.26     	 handleEnqueue(rq);
 490 mday     1.18     	 return;
 491                         }
 492                   
 493 mday     1.39           if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 494                   	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 495 mday     1.29     	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 496                         {
 497 mday     1.49     	 
 498 mday     1.29     	 operation->unlock();
 499                   	 _handle_async_callback(operation);
 500                         }
 501                         else 
 502                         {
 503                   	 PEGASUS_ASSERT(rq != 0 );
 504                   	 operation->unlock();
 505                   	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 506                         }
 507 mday     1.5         }
 508                      return;
 509 mday     1.1      }
 510                   
 511                   void MessageQueueService::_handle_async_request(AsyncRequest *req)
 512                   {
 513 mday     1.4         if ( req != 0 )
 514                      {
 515                         req->op->processing();
 516 mday     1.1         
 517 mday     1.4            Uint32 type = req->getType();
 518                         if( type == async_messages::HEARTBEAT )
 519                   	 handle_heartbeat_request(req);
 520                         else if (type == async_messages::IOCTL)
 521                   	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 522                         else if (type == async_messages::CIMSERVICE_START)
 523                   	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 524                         else if (type == async_messages::CIMSERVICE_STOP)
 525                   	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 526                         else if (type == async_messages::CIMSERVICE_PAUSE)
 527                   	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 528                         else if (type == async_messages::CIMSERVICE_RESUME)
 529                   	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 530                         else if ( type == async_messages::ASYNC_OP_START)
 531                   	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 532                         else 
 533                         {
 534                   	 // we don't handle this request message 
 535                   	 _make_response(req, async_results::CIM_NAK );
 536                         }
 537 mday     1.1         }
 538                   }
 539                   
 540 mday     1.17     
 541                   Boolean MessageQueueService::_enqueueResponse(
 542                      Message* request, 
 543                      Message* response)
 544 mday     1.18        
 545 mday     1.17     {
 546 kumpf    1.37        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 547                                       "MessageQueueService::_enqueueResponse");
 548 mday     1.25     
 549                      if( request->getMask() & message_mask::ha_async)
 550                      {
 551                         if (response->getMask() & message_mask::ha_async )
 552                         {
 553                   	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 554                   				static_cast<AsyncReply *>(response), 
 555                   				ASYNC_OPSTATE_COMPLETE, 0 );
 556 kumpf    1.37              PEG_METHOD_EXIT();
 557 mday     1.25     	 return true;
 558                         }
 559                      }
 560                      
 561 mday     1.17        if(request->_async != 0 )
 562                      {
 563                         Uint32 mask = request->_async->getMask();
 564 mday     1.18           PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 565                         
 566                         AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 567                         AsyncOpNode *op = async->op;
 568                         request->_async = 0;
 569 mday     1.63           // the legacy request is going to be deleted by its handler
 570 mday     1.27           // remove it from the op node 
 571 mday     1.63     
 572                         static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 573 mday     1.18           
 574 mday     1.63                 
 575 mday     1.18           AsyncLegacyOperationResult *async_result = 
 576                   	 new AsyncLegacyOperationResult( 
 577                   	    async->getKey(),
 578                   	    async->getRouting(),
 579                   	    op,
 580                   	    response);
 581                         _completeAsyncResponse(async,
 582                   			     async_result,
 583                   			     ASYNC_OPSTATE_COMPLETE, 
 584                   			     0);
 585 kumpf    1.37           PEG_METHOD_EXIT();
 586 mday     1.18           return true;
 587 mday     1.17        }
 588 mday     1.18        
 589                      // ensure that the destination queue is in response->dest
 590 kumpf    1.37        PEG_METHOD_EXIT();
 591 mday     1.24        return SendForget(response);
 592 mday     1.18        
 593 mday     1.17     }
 594                   
 595 mday     1.18     void MessageQueueService::_make_response(Message *req, Uint32 code)
 596 mday     1.1      {
 597 mday     1.19        cimom::_make_response(req, code);
 598 mday     1.1      }
 599                   
 600                   
 601 mday     1.5      void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 602                   						AsyncReply *reply, 
 603                   						Uint32 state, 
 604                   						Uint32 flag)
 605                   {
 606 kumpf    1.37        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 607                                       "MessageQueueService::_completeAsyncResponse");
 608                   
 609 mday     1.19        cimom::_completeAsyncResponse(request, reply, state, flag);
 610 kumpf    1.37     
 611                      PEG_METHOD_EXIT();
 612 mday     1.5      }
 613                   
 614                   
 615 mday     1.32     void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 616                   					    Uint32 state, 
 617                   					    Uint32 flag, 
 618                   					    Uint32 code)
 619                   {
 620                      cimom::_complete_op_node(op, state, flag, code);
 621                   }
 622                   
 623 mday     1.5      
 624                   Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 625                   {
 626 mday     1.8         if (_incoming_queue_shutdown.value() > 0 )
 627                         return false;
 628                      
 629 mday     1.20     // ATTN optimization remove the message checking altogether in the base 
 630                   // << Mon Feb 18 14:02:20 2002 mdd >>
 631 mday     1.6         op->lock();
 632                      Message *rq = op->_request.next(0);
 633 mday     1.20        Message *rp = op->_response.next(0);
 634 mday     1.6         op->unlock();
 635                      
 636 mday     1.22        if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 637                   	_die.value() == 0  )
 638 mday     1.5         {
 639 mday     1.6            _incoming.insert_last_wait(op);
 640 mday     1.53           _polling_sem.signal();
 641 mday     1.5            return true;
 642                      }
 643                      return false;
 644                   }
 645                   
 646                   Boolean MessageQueueService::messageOK(const Message *msg)
 647                   {
 648 mday     1.8         if (_incoming_queue_shutdown.value() > 0 )
 649                         return false;
 650 mday     1.18        return true;
 651                   }
 652                   
 653 mday     1.1      void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 654                   {
 655                      // default action is to echo a heartbeat response 
 656                      
 657                      AsyncReply *reply = 
 658                         new AsyncReply(async_messages::HEARTBEAT,
 659                   		     req->getKey(),
 660                   		     req->getRouting(),
 661                   		     0,
 662                   		     req->op, 
 663                   		     async_results::OK, 
 664                   		     req->resp,
 665                   		     false);
 666 mday     1.4         _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 667 mday     1.1      }
 668                   
 669                   
 670                   void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 671                   { 
 672                      ;
 673                   }
 674                         
 675                   void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 676                   {
 677 mday     1.8         
 678                      switch( req->ctl )
 679                      {
 680                         case AsyncIoctl::IO_CLOSE:
 681                         {
 682 mday     1.76     
 683 mday     1.34     	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 684 kumpf    1.81     
 685                   #ifdef MESSAGEQUEUESERVICE_DEBUG
 686 mday     1.79     	 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 687 kumpf    1.81     #endif
 688 mday     1.8      	 
 689 mday     1.76     	 // respond to this message. this is fire and forget, so we don't need to delete anything. 
 690                   	 // this takes care of two problems that were being found 
 691                   	 // << Thu Oct  9 10:52:48 2003 mdd >>
 692                   	  _make_response(req, async_results::OK);
 693 mday     1.8      	 // ensure we do not accept any further messages
 694                   
 695                   	 // ensure we don't recurse on IO_CLOSE
 696                   	 if( _incoming_queue_shutdown.value() > 0 )
 697                   	    break;
 698                   	 
 699                   	 // set the closing flag 
 700                   	 service->_incoming_queue_shutdown = 1;
 701                   	 // empty out the queue
 702                   	 while( 1 )
 703                   	 {
 704                   	    AsyncOpNode *operation;
 705                   	    try 
 706                   	    {
 707                   	       operation = service->_incoming.remove_first();
 708                   	    }
 709                   	    catch(IPCException & )
 710                   	    {
 711                   	       break;
 712                   	    }
 713                   	    if( operation )
 714 mday     1.8      	    {
 715 mday     1.34     	       operation->_service_ptr = service;
 716                   	       service->_handle_incoming_operation(operation);
 717 mday     1.8      	    }
 718                   	    else
 719                   	       break;
 720                   	 } // message processing loop
 721 mday     1.76     	 
 722 mday     1.8      	 // shutdown the AsyncDQueue
 723                   	 service->_incoming.shutdown_queue();
 724                   	 return;
 725                         }
 726                   
 727                         default:
 728                   	 _make_response(req, async_results::CIM_NAK);
 729                      }
 730 mday     1.1      }
 731 mday     1.8      
 732 mday     1.1      void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 733                   {
 734 mday     1.79     
 735 kumpf    1.81     #ifdef MESSAGEQUEUESERVICE_DEBUG
 736 mday     1.79        PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 737 kumpf    1.81     #endif
 738 mday     1.79        
 739 mday     1.10        // clear the stoped bit and update
 740 mday     1.13        _capabilities &= (~(module_capabilities::stopped));
 741 mday     1.10        _make_response(req, async_results::OK);
 742                      // now tell the meta dispatcher we are stopped 
 743                      update_service(_capabilities, _mask);
 744                   
 745 mday     1.1      }
 746                   void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 747                   {
 748 kumpf    1.81     #ifdef MESSAGEQUEUESERVICE_DEBUG
 749 mday     1.79        PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 750 kumpf    1.81     #endif
 751 mday     1.10        // set the stopeed bit and update
 752                      _capabilities |= module_capabilities::stopped;
 753                      _make_response(req, async_results::CIM_STOPPED);
 754                      // now tell the meta dispatcher we are stopped 
 755                      update_service(_capabilities, _mask);
 756                      
 757 mday     1.1      }
 758                   void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 759                   {
 760 mday     1.10        // set the paused bit and update
 761 mday     1.13        _capabilities |= module_capabilities::paused;
 762 mday     1.11        update_service(_capabilities, _mask);
 763 mday     1.10        _make_response(req, async_results::CIM_PAUSED);
 764                      // now tell the meta dispatcher we are stopped 
 765 mday     1.1      }
 766                   void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 767                   {
 768 mday     1.10        // clear the paused  bit and update
 769 mday     1.13        _capabilities &= (~(module_capabilities::paused));
 770 mday     1.11        update_service(_capabilities, _mask);
 771 mday     1.10        _make_response(req, async_results::OK);
 772                      // now tell the meta dispatcher we are stopped 
 773 mday     1.1      }
 774                         
 775                   void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 776                   {
 777                      _make_response(req, async_results::CIM_NAK);
 778                   }
 779                   
 780                   void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 781                   {
 782 mday     1.14        ;
 783                   }
 784                   
 785 mday     1.10     
 786 mday     1.14     void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 787                   {
 788                      // remove the legacy message from the request and enqueue it to its destination
 789                      Uint32 result = async_results::CIM_NAK;
 790                      
 791 mday     1.25        Message *legacy = req->_act;
 792 mday     1.14        if ( legacy != 0 )
 793                      {
 794 mday     1.25           MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 795 mday     1.14           if( queue != 0 )
 796                         {
 797 mday     1.25     	 if(queue->isAsync() == true )
 798                   	 {
 799                   	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 800                   	 }
 801                   	 else 
 802                   	 {
 803                   	    // Enqueue the response:
 804                   	    queue->enqueue(req->get_action());
 805                   	 }
 806                   	 
 807 mday     1.14     	 result = async_results::OK;
 808                         }
 809                      }
 810                      _make_response(req, result);
 811                   }
 812                   
 813                   void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 814                   {
 815                      ;
 816 mday     1.1      }
 817                   
 818                   AsyncOpNode *MessageQueueService::get_op(void)
 819                   {
 820 mday     1.4         AsyncOpNode *op = new AsyncOpNode();
 821 mday     1.1         
 822 mday     1.9         op->_state = ASYNC_OPSTATE_UNKNOWN;
 823                      op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 824 mday     1.4         
 825 mday     1.1         return op;
 826                   }
 827                   
 828                   void MessageQueueService::return_op(AsyncOpNode *op)
 829                   {
 830                      PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 831 mday     1.4         delete op;
 832 mday     1.1      }
 833                   
 834 mday     1.18     
 835 mday     1.29     Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 836                   				       Uint32 destination)
 837                   {
 838                      PEGASUS_ASSERT(op != 0 );
 839 mday     1.30        op->lock();
 840                      op->_op_dest = MessageQueue::lookup(destination);
 841 mday     1.29        op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 842                      op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 843 mday     1.30        op->unlock();
 844                      if ( op->_op_dest == 0 )
 845                         return false;
 846                         
 847 mday     1.29        return  _meta_dispatcher->route_async(op);
 848                   }
 849                   
 850 mday     1.39      
 851 mday     1.21     Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 852                   				       Uint32 destination,
 853 mday     1.18     				       void (*callback)(AsyncOpNode *, 
 854 mday     1.32     							MessageQueue *,
 855 mday     1.30     							void *),
 856                   				       MessageQueue *callback_response_q,
 857                   				       void *callback_ptr)
 858 mday     1.20     { 
 859 mday     1.21        PEGASUS_ASSERT(op != 0 && callback != 0 );
 860 mday     1.18        
 861 mday     1.21        // get the queue handle for the destination
 862                   
 863 mday     1.30        op->lock();
 864 mday     1.32        op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 865 mday     1.22        op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 866                      op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 867                      op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 868 mday     1.30        // initialize the callback data
 869 mday     1.32        op->_async_callback = callback;   // callback function to be executed by recpt. of response
 870                      op->_callback_node = op;          // the op node
 871                      op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 872                      op->_callback_ptr = callback_ptr;   // user data for callback
 873                      op->_callback_request_q = this;     // I am the originator of this request
 874 mday     1.30        
 875                      op->unlock();
 876                      if(op->_op_dest == 0) 
 877                         return false;
 878 mday     1.21        
 879                      return  _meta_dispatcher->route_async(op);
 880 mday     1.18     }
 881                   
 882                   
 883 mday     1.39     Boolean MessageQueueService::SendAsync(Message *msg, 
 884                   				       Uint32 destination,
 885                   				       void (*callback)(Message *response, 
 886                   							void *handle, 
 887                   							void *parameter),
 888                   				       void *handle, 
 889                   				       void *parameter)
 890                   {
 891                      if(msg == NULL)
 892                         return false;
 893                      if(callback == NULL)
 894                         return SendForget(msg);
 895                      AsyncOpNode *op = get_op();
 896 mday     1.43        msg->dest = destination;
 897 mday     1.39        if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 898                      {
 899                         op->release();
 900                         return_op(op);
 901                         return false;
 902                      }
 903                      op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 904                      op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 905                      op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 906                      op->__async_callback = callback;
 907                      op->_callback_node = op;
 908                      op->_callback_handle = handle;
 909                      op->_callback_parameter = parameter;
 910                      op->_callback_response_q = this;
 911                      
 912                   
 913                      if( ! (msg->getMask() & message_mask::ha_async) )
 914                      {
 915                         AsyncLegacyOperationStart *wrapper = 
 916                   	 new AsyncLegacyOperationStart(get_next_xid(),
 917                   				       op, 
 918 mday     1.39     				       destination, 
 919                   				       msg, 
 920                   				       destination);
 921                      }
 922                      else 
 923                      {
 924                         op->_request.insert_first(msg);
 925                         (static_cast<AsyncMessage *>(msg))->op = op;
 926                      }
 927                      
 928                      _callback.insert_last(op);
 929                      return _meta_dispatcher->route_async(op);
 930                   }
 931                   
 932                   
 933 mday     1.18     Boolean MessageQueueService::SendForget(Message *msg)
 934                   {
 935                   
 936 mday     1.24        
 937 mday     1.18        AsyncOpNode *op = 0;
 938 mday     1.22        Uint32 mask = msg->getMask();
 939                      
 940                      if (mask & message_mask::ha_async)
 941 mday     1.18        {
 942                         op = (static_cast<AsyncMessage *>(msg))->op ;
 943                      }
 944 mday     1.22     
 945 mday     1.18        if( op == 0 )
 946 mday     1.20        {
 947 mday     1.18           op = get_op();
 948 mday     1.20           op->_request.insert_first(msg);
 949 mday     1.22           if (mask & message_mask::ha_async)
 950                   	 (static_cast<AsyncMessage *>(msg))->op = op;
 951 mday     1.20        }
 952 mday     1.30        op->_op_dest = MessageQueue::lookup(msg->dest);
 953 mday     1.22        op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 954 mday     1.41        op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 955                   		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 956 mday     1.22        op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 957 mday     1.30        if ( op->_op_dest == 0 )
 958 mday     1.39        {
 959                         op->release();
 960                         return_op(op);
 961 mday     1.21           return false;
 962 mday     1.39        }
 963 mday     1.24        
 964 mday     1.18        // now see if the meta dispatcher will take it
 965 mday     1.30        return  _meta_dispatcher->route_async(op);
 966 mday     1.18     }
 967 mday     1.2      
 968 mday     1.1      
 969 mday     1.4      AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 970 mday     1.1      {
 971 mday     1.4         if ( request == 0 )
 972                         return 0 ;
 973 mday     1.5      
 974                      Boolean destroy_op = false;
 975                      
 976 s.hills  1.80        if (request->op == 0)
 977 mday     1.5         {
 978                         request->op = get_op();
 979 mday     1.7            request->op->_request.insert_first(request);
 980 mday     1.5            destroy_op = true;
 981                      }
 982 mday     1.4         
 983 mday     1.33        request->block = false;
 984 mday     1.35        request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 985 mday     1.33        SendAsync(request->op, 
 986                   	     request->dest,
 987 mday     1.36      	     _sendwait_callback,
 988 mday     1.33     	     this,
 989                   	     (void *)0);
 990 mday     1.4         
 991 mday     1.33        request->op->_client_sem.wait();
 992 mday     1.6         request->op->lock();
 993                      AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 994                      rpl->op = 0;
 995                      request->op->unlock();
 996                      
 997 mday     1.5         if( destroy_op == true)
 998                      {
 999 mday     1.6            request->op->lock();
1000                         request->op->_request.remove(request);
1001                         request->op->_state |= ASYNC_OPSTATE_RELEASED;
1002                         request->op->unlock();
1003                         return_op(request->op);
1004 mday     1.7            request->op = 0;
1005 mday     1.5         }
1006                      return rpl;
1007 mday     1.1      }
1008                   
1009                   
1010                   Boolean MessageQueueService::register_service(String name, 
1011                   					      Uint32 capabilities, 
1012                   					      Uint32 mask)
1013                   
1014                   {
1015                      RegisterCimService *msg = new RegisterCimService(get_next_xid(),
1016 mday     1.5      						    0, 
1017 mday     1.1      						    true, 
1018                   						    name, 
1019                   						    capabilities, 
1020                   						    mask,
1021                   						    _queueId);
1022 mday     1.44        msg->dest = CIMOM_Q_ID;
1023                      
1024 mday     1.1         Boolean registered = false;
1025 mday     1.7         AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
1026 mday     1.1         
1027 mday     1.2         if ( reply != 0 )
1028 mday     1.1         {
1029                         if(reply->getMask() & message_mask:: ha_async)
1030                         {
1031                   	 if(reply->getMask() & message_mask::ha_reply)
1032                   	 {
1033 mday     1.15     	    if(reply->result == async_results::OK || 
1034                   	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
1035 mday     1.1      	       registered = true;
1036                   	 }
1037                         }
1038                         
1039 mday     1.7            delete reply; 
1040 mday     1.1         }
1041 mday     1.5         delete msg;
1042 mday     1.1         return registered;
1043                   }
1044                   
1045                   Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1046                   {
1047                      
1048                      
1049                      UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
1050 mday     1.5      						0, 
1051 mday     1.1      						true, 
1052                   						_queueId,
1053                   						_capabilities, 
1054                   						_mask);
1055                      Boolean registered = false;
1056 mday     1.2      
1057                      AsyncMessage *reply = SendWait(msg);
1058                      if (reply)
1059 mday     1.1         {
1060                         if(reply->getMask() & message_mask:: ha_async)
1061                         {
1062                   	 if(reply->getMask() & message_mask::ha_reply)
1063                   	 {
1064 mday     1.2      	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
1065 mday     1.1      	       registered = true;
1066                   	 }
1067                         }
1068                         delete reply;
1069                      }
1070 mday     1.5         delete msg;
1071 mday     1.1         return registered;
1072                   }
1073                   
1074                   
1075                   Boolean MessageQueueService::deregister_service(void)
1076                   {
1077 mday     1.3      
1078 mday     1.5         _meta_dispatcher->deregister_module(_queueId);
1079                      return true;
1080 mday     1.1      }
1081                   
1082                   
1083                   void MessageQueueService::find_services(String name, 
1084                   					Uint32 capabilities, 
1085                   					Uint32 mask, 
1086                   					Array<Uint32> *results)
1087                   {
1088                      
1089                      if( results == 0 )
1090                         throw NullPointer();
1091 mday     1.5          
1092 mday     1.1         results->clear();
1093                      
1094                      FindServiceQueue *req = 
1095                         new FindServiceQueue(get_next_xid(), 
1096 mday     1.5      			   0, 
1097 mday     1.1      			   _queueId, 
1098                   			   true, 
1099                   			   name, 
1100                   			   capabilities, 
1101                   			   mask);
1102 mday     1.44        
1103                      req->dest = CIMOM_Q_ID;
1104 mday     1.1         
1105 mday     1.2         AsyncMessage *reply = SendWait(req); 
1106                      if(reply)
1107 mday     1.1         {
1108                         if( reply->getMask() & message_mask::ha_async)
1109                         {
1110                   	 if(reply->getMask() & message_mask::ha_reply)
1111                   	 {
1112                   	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1113                   	    {
1114                   	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1115                   		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1116                   	    }
1117                   	 }
1118                         }
1119                         delete reply;
1120                      }
1121 mday     1.5         delete req;
1122 mday     1.1         return ;
1123                   }
1124                   
1125                   void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1126                   {
1127                      if(result == 0)
1128                         throw NullPointer();
1129                      
1130                      EnumerateService *req 
1131                         = new EnumerateService(get_next_xid(),
1132 mday     1.5      			     0, 
1133 mday     1.1      			     _queueId, 
1134                   			     true, 
1135                   			     queue);
1136                      
1137 mday     1.2         AsyncMessage *reply = SendWait(req);
1138 mday     1.1         
1139 mday     1.2         if (reply)
1140 mday     1.1         {
1141                         Boolean found = false;
1142                         
1143                         if( reply->getMask() & message_mask::ha_async)
1144                         {
1145                   	 if(reply->getMask() & message_mask::ha_reply)
1146                   	 {
1147                   	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1148                   	    {
1149                   	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1150                   	       {
1151                   		  if( found == false)
1152                   		  {
1153                   		     found = true;
1154                   		     
1155                   		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1156                   		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1157                   		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1158                   		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1159                   		  }
1160                   	       }
1161 mday     1.1      	    }
1162                   	 }
1163                         }
1164                         delete reply;
1165                      }
1166 mday     1.5         delete req;
1167                      
1168 mday     1.1         return;
1169                   }
1170                   
1171                   Uint32 MessageQueueService::get_next_xid(void)
1172                   {
1173 mday     1.78        static Mutex _monitor;
1174                      Uint32 value;
1175                      _monitor.lock(pegasus_thread_self());
1176                      
1177 mday     1.1         _xid++;
1178 mday     1.78        value =  _xid.value();
1179                      _monitor.unlock();
1180                      return value;
1181                      
1182 mday     1.1      }
1183                   
1184                   PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2