(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.88 //%2004////////////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 karl  1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82 // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 mday  1.1  //
  10            // Permission is hereby granted, free of charge, to any person obtaining a copy
  11            // of this software and associated documentation files (the "Software"), to
  12            // deal in the Software without restriction, including without limitation the
  13            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  14            // sell copies of the Software, and to permit persons to whom the Software is
  15            // furnished to do so, subject to the following conditions:
  16 karl  1.82 // 
  17 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  18            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  19            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  20            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  21            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  22            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  23            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  24            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  25            //
  26            //==============================================================================
  27            //
  28            // Author: Mike Day (mdday@us.ibm.com)
  29            //
  30            // Modified By:
  31 a.arora 1.87 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
  32 mday    1.1  //
  33              //%/////////////////////////////////////////////////////////////////////////////
  34              
  35              #include "MessageQueueService.h"
  36 mday    1.22 #include <Pegasus/Common/Tracer.h>
  37 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
  38 mday     1.1  
  39               PEGASUS_NAMESPACE_BEGIN
  40               
  41 mday     1.15  
  42               cimom *MessageQueueService::_meta_dispatcher = 0;
  43               AtomicInt MessageQueueService::_service_count = 0;
  44               AtomicInt MessageQueueService::_xid(1);
  45 mday     1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
  46 mday     1.15 
  47 mday     1.58 static struct timeval create_time = {0, 1};
  48 mday     1.70 static struct timeval destroy_time = {300, 0}; 
  49 mday     1.59 static struct timeval deadlock_time = {0, 0};
  50 mday     1.53 
  51 mday     1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
  52 mday     1.53 
  53               DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  54               
  55 kumpf    1.62 Thread* MessageQueueService::_polling_thread = 0;
  56 mday     1.61 
  57 mday     1.69 ThreadPool *MessageQueueService::get_thread_pool(void)
  58               {
  59                  return _thread_pool;
  60               }
  61               
  62               PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
  63 mday     1.53 {
  64 mday     1.69 
  65                  static struct timeval now, last = {0,0};
  66 mday     1.53    gettimeofday(&now, NULL);
  67 mday     1.56    int dead_threads = 0;
  68 mday     1.53    
  69 mday     1.70    if( now.tv_sec - last.tv_sec > 120 )
  70 mday     1.53    {
  71                     gettimeofday(&last, NULL);
  72 mday     1.56       try 
  73                     {
  74 mday     1.69 	 dead_threads =  MessageQueueService::_thread_pool->kill_dead_threads();
  75 mday     1.56       }
  76 mday     1.69       catch(...)
  77 mday     1.56       {
  78               
  79                     }
  80 mday     1.53    }
  81 mday     1.69    return (PEGASUS_THREAD_RETURN)dead_threads;
  82 mday     1.53 }
  83               
  84 mday     1.68 
  85 mday     1.76 void MessageQueueService::force_shutdown(Boolean destroy_flag)
  86 mday     1.68 {
  87 mday     1.79    return;
  88                  
  89 kumpf    1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
  90 humberto 1.71 	//l10n
  91                  MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
  92 humberto 1.74 			    "Forcing shutdown of CIMOM Message Router");
  93 humberto 1.71    PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
  94 kumpf    1.75 #endif
  95 mday     1.76 
  96 kumpf    1.75 
  97 mday     1.68    MessageQueueService *svc;
  98 konrad.r 1.73    int counter = 0;   
  99 mday     1.68    _polling_list.lock();
 100                  svc = _polling_list.next(0);
 101 humberto 1.71    
 102 mday     1.68    while(svc != 0)
 103                  {
 104 kumpf    1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
 105 humberto 1.71    		//l10n - reuse same MessageLoaderParms to avoid multiple creates
 106                     	parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";
 107                  	  	parms.default_msg = "Stopping $0"; 
 108                  	  	parms.arg0 = svc->getQueueName();
 109                  	  	PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
 110 kumpf    1.75 #endif
 111 humberto 1.71       							
 112 mday     1.68       _polling_sem.signal();
 113                     svc->_shutdown_incoming_queue();
 114 konrad.r 1.73       counter++;
 115 mday     1.68       _polling_sem.signal();
 116                     svc = _polling_list.next(svc);
 117                  }
 118                  _polling_list.unlock();
 119 konrad.r 1.73 
 120                  _polling_sem.signal();
 121               
 122 mday     1.76    MessageQueueService::_stop_polling = 1;
 123                  
 124                  if(destroy_flag == true) 
 125                  {
 126               
 127                     svc = _polling_list.remove_last();
 128                     while(svc)
 129                     {
 130               	 delete svc;
 131               	 svc = _polling_list.remove_last();
 132                     }
 133                     
 134 konrad.r 1.73    }
 135 mday     1.68 }
 136               
 137               
 138 mday     1.53 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 139               {
 140                  Thread *myself = reinterpret_cast<Thread *>(parm);
 141                  DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
 142                  while ( _stop_polling.value()  == 0 ) 
 143                  {
 144 mday     1.69       _polling_sem.wait();
 145 mday     1.61       if(_stop_polling.value() != 0 )
 146                     {
 147 kumpf    1.62          break;
 148 mday     1.61       }
 149                     
 150 mday     1.53       list->lock();
 151                     MessageQueueService *service = list->next(0);
 152                     while(service != NULL)
 153                     {
 154               	 if(service->_incoming.count() > 0 )
 155               	 {
 156 mday     1.55 	    _thread_pool->allocate_and_awaken(service, _req_proc);
 157 mday     1.53 	 }
 158               	 service = list->next(service);
 159                     }
 160                     list->unlock();
 161 mday     1.69       if(_check_idle_flag.value() != 0 )
 162                     {
 163               	 _check_idle_flag = 0;
 164 kumpf    1.84 
 165                        // If there are insufficent resources to run
 166                        // kill_idle_threads, then just return.
 167                        _thread_pool->allocate_and_awaken(service, kill_idle_threads);
 168 mday     1.69       }
 169 mday     1.53    }
 170                  myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 171                  return(0);
 172               }
 173               
 174               
 175               Semaphore MessageQueueService::_polling_sem(0);
 176               AtomicInt MessageQueueService::_stop_polling(0);
 177 mday     1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
 178 mday     1.53 
 179 mday     1.15 
 180 mday     1.1  MessageQueueService::MessageQueueService(const char *name, 
 181               					 Uint32 queueID, 
 182               					 Uint32 capabilities, 
 183               					 Uint32 mask) 
 184 mday     1.15    : Base(name, true,  queueID),
 185 mday     1.22      
 186 mday     1.1       _mask(mask),
 187 mday     1.4       _die(0),
 188 mday     1.41      _incoming(true, 0),
 189 mday     1.39      _callback(true),
 190 mday     1.7       _incoming_queue_shutdown(0),
 191 mday     1.39      _callback_ready(0),
 192                    _req_thread(_req_proc, this, false),
 193                    _callback_thread(_callback_proc, this, false)
 194 mday     1.53 
 195 mday     1.1  { 
 196 mday     1.60 
 197 mday     1.22    _capabilities = (capabilities | module_capabilities::async);
 198                  
 199 mday     1.1     _default_op_timeout.tv_sec = 30;
 200                  _default_op_timeout.tv_usec = 100;
 201 mday     1.15 
 202 a.arora  1.87    AutoMutex autoMut(_meta_dispatcher_mutex);
 203 mday     1.15    
 204                  if( _meta_dispatcher == 0 )
 205                  {
 206                     PEGASUS_ASSERT( _service_count.value() == 0 );
 207                     _meta_dispatcher = new cimom();
 208                     if (_meta_dispatcher == NULL )
 209                     {
 210 a.arora  1.87          throw NullPointer();
 211 mday     1.15       }
 212 mday     1.58       _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
 213 mday     1.55 				    create_time, destroy_time, deadlock_time);  
 214 mday     1.61       
 215 kumpf    1.62       _polling_thread = new Thread(polling_routine, 
 216               			           reinterpret_cast<void *>(&_polling_list), 
 217               			           false);
 218 kumpf    1.83       while (!_polling_thread->run())
 219                     {
 220                        pegasus_yield();
 221                     }
 222 mday     1.15    }
 223                  _service_count++;
 224               
 225                  if( false == register_service(name, _capabilities, _mask) )
 226                  {
 227 humberto 1.71       //l10n
 228                     //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 229                     MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 230 humberto 1.74 			       "MessageQueueService Base Unable to register with  Meta Dispatcher");
 231 humberto 1.71       						   
 232                     throw BindFailedException(parms);
 233 mday     1.15    }
 234                  
 235 mday     1.53    _polling_list.insert_last(this);
 236                  
 237 a.arora  1.87 //   _meta_dispatcher_mutex.unlock();  //Bug#1090
 238 mday     1.39 //   _callback_thread.run();
 239                  
 240 mday     1.53 //   _req_thread.run();
 241 mday     1.1  }
 242               
 243 mday     1.4  
 244 mday     1.1  MessageQueueService::~MessageQueueService(void)
 245               {
 246                  _die = 1;
 247 mday     1.76 
 248                  if (_incoming_queue_shutdown.value() == 0 )
 249                  {
 250                     _shutdown_incoming_queue();
 251                  }
 252 mday     1.39    _callback_ready.signal();
 253                  
 254 mday     1.15    {
 255 a.arora  1.87      AutoMutex autoMut(_meta_dispatcher_mutex);
 256                    _service_count--;
 257                    if (_service_count.value() == 0 )
 258                    {
 259 mday     1.76 
 260 mday     1.53       _stop_polling++;
 261                     _polling_sem.signal();
 262 kumpf    1.62       _polling_thread->join();
 263                     delete _polling_thread;
 264 kumpf    1.65       _polling_thread = 0;
 265 mday     1.15       _meta_dispatcher->_shutdown_routed_queue();
 266                     delete _meta_dispatcher;
 267 kumpf    1.45       _meta_dispatcher = 0;
 268 mday     1.76 
 269 kumpf    1.65       delete _thread_pool;
 270                     _thread_pool = 0;
 271 a.arora  1.87      }
 272                  } // mutex unlocks here
 273 mday     1.53    _polling_list.remove(this);
 274 konrad.r 1.72    // Clean up in case there are extra stuff on the queue. 
 275                 while (_incoming.count())
 276                 {
 277               	delete _incoming.remove_first();
 278                 }
 279 mday     1.53 } 
 280 mday     1.1  
 281 mday     1.7  void MessageQueueService::_shutdown_incoming_queue(void)
 282               {
 283                  
 284 mday     1.76    
 285 mday     1.9     if (_incoming_queue_shutdown.value() > 0 )
 286                     return ;
 287 mday     1.8     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 288               				    0, 
 289               				    _queueId, 
 290               				    _queueId, 
 291               				    true, 
 292               				    AsyncIoctl::IO_CLOSE, 
 293               				    0, 
 294               				    0);
 295 mday     1.9  
 296 mday     1.8     msg->op = get_op();
 297 mday     1.41    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 298                  msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 299               		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 300 mday     1.32    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 301 mday     1.41 
 302 mday     1.32    msg->op->_op_dest = this;
 303 mday     1.8     msg->op->_request.insert_first(msg);
 304                  
 305                  _incoming.insert_last_wait(msg->op);
 306 mday     1.32 
 307 mday     1.7  }
 308               
 309 mday     1.22 
 310               
 311 kumpf    1.85 void MessageQueueService::enqueue(Message *msg)
 312 mday     1.22 {
 313 kumpf    1.28    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 314               
 315 mday     1.22    Base::enqueue(msg);
 316 kumpf    1.28 
 317                  PEG_METHOD_EXIT();
 318 mday     1.22 }
 319               
 320               
 321 mday     1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 322               {
 323                  Thread *myself = reinterpret_cast<Thread *>(parm);
 324                  MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 325                  AsyncOpNode *operation = 0;
 326                  
 327                  while ( service->_die.value() == 0 ) 
 328                  {
 329                     service->_callback_ready.wait();
 330                     
 331                     service->_callback.lock();
 332                     operation = service->_callback.next(0);
 333                     while( operation != NULL)
 334                     {
 335               	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 336               	 {
 337               	    operation = service->_callback.remove_no_lock(operation);
 338               	    PEGASUS_ASSERT(operation != NULL);
 339               	    operation->_thread_ptr = myself;
 340               	    operation->_service_ptr = service;
 341               	    service->_handle_async_callback(operation);
 342 mday     1.39 	    break;
 343               	 }
 344               	 operation = service->_callback.next(operation);
 345                     }
 346                     service->_callback.unlock();
 347                  }
 348                  myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 349                  return(0);
 350               }
 351               
 352 mday     1.22 
 353 mday     1.5  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 354 mday     1.1  {
 355 mday     1.53    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 356 mday     1.5     // pull messages off the incoming queue and dispatch them. then 
 357                  // check pending messages that are non-blocking
 358 mday     1.7     AsyncOpNode *operation = 0;
 359                  
 360 mday     1.56    if ( service->_die.value() == 0 ) 
 361                   {
 362 mday     1.41 	 try 
 363               	 {
 364 mday     1.53 	    operation = service->_incoming.remove_first();
 365 mday     1.41 	 }
 366               	 catch(ListClosed & )
 367               	 {
 368 mday     1.56 	    operation = 0;
 369               	    
 370               	    return(0);
 371 mday     1.41 	 }
 372               	 if( operation )
 373               	 {
 374               	    operation->_service_ptr = service;
 375               	    service->_handle_incoming_operation(operation);
 376               	 }
 377 mday     1.56     }
 378 mday     1.44 
 379 mday     1.5     return(0);
 380 mday     1.1  }
 381               
 382 mday     1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
 383               {
 384                  return _callback.count();
 385               }
 386               
 387               
 388               
 389 mday     1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 390               					     MessageQueue *q, 
 391               					     void *parm)
 392               {
 393                  op->_client_sem.signal();
 394               }
 395               
 396 mday     1.30 
 397               // callback function is responsible for cleaning up all resources
 398               // including op, op->_callback_node, and op->_callback_ptr
 399 mday     1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 400               {
 401 mday     1.39    if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 402                  {
 403               
 404                     Message *msg = op->get_request();
 405                     if( msg && ( msg->getMask() & message_mask::ha_async))
 406                     {
 407               	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 408               	 {
 409               	    AsyncLegacyOperationStart *wrapper = 
 410               	       static_cast<AsyncLegacyOperationStart *>(msg);
 411               	    msg = wrapper->get_action();
 412               	    delete wrapper;
 413               	 }
 414               	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 415               	 {
 416               	    AsyncModuleOperationStart *wrapper = 
 417               	       static_cast<AsyncModuleOperationStart *>(msg);
 418               	    msg = wrapper->get_action();
 419               	    delete wrapper;
 420               	 }
 421               	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 422 mday     1.39 	 {
 423               	    AsyncOperationStart *wrapper = 
 424               	       static_cast<AsyncOperationStart *>(msg);
 425               	    msg = wrapper->get_action();
 426               	    delete wrapper;
 427               	 }
 428               	 delete msg;
 429                     }
 430               
 431                     msg = op->get_response();
 432                     if( msg && ( msg->getMask() & message_mask::ha_async))
 433                     {
 434               	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 435               	 {
 436               	    AsyncLegacyOperationResult *wrapper = 
 437               	       static_cast<AsyncLegacyOperationResult *>(msg);
 438               	    msg = wrapper->get_result();
 439               	    delete wrapper;
 440               	 }
 441               	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 442               	 {
 443 mday     1.39 	    AsyncModuleOperationResult *wrapper = 
 444               	       static_cast<AsyncModuleOperationResult *>(msg);
 445               	    msg = wrapper->get_result();
 446               	    delete wrapper;
 447               	 }
 448                     }
 449                     void (*callback)(Message *, void *, void *) = op->__async_callback;
 450                     void *handle = op->_callback_handle;
 451                     void *parm = op->_callback_parameter;
 452                     op->release();
 453                     return_op(op);
 454                     callback(msg, handle, parm);
 455                  }
 456                  else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 457                  {
 458                     // note that _callback_node may be different from op 
 459                     // op->_callback_response_q is a "this" pointer we can use for 
 460                     // static callback methods
 461                     op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 462                  }
 463 mday     1.22 }
 464               
 465 mday     1.1  
 466 mday     1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 467               //						     Thread *thread, 
 468               //						     MessageQueue *queue)
 469 mday     1.1  {
 470 mday     1.5     if ( operation != 0 )
 471                  {
 472 mday     1.29       
 473 mday     1.22 // ATTN: optimization 
 474               // << Tue Feb 19 14:10:38 2002 mdd >>
 475 mday     1.6        operation->lock();
 476 mday     1.29             
 477 mday     1.6        Message *rq = operation->_request.next(0);
 478 mday     1.29      
 479 mday     1.31 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 480               // move this to the bottom of the loop when the majority of 
 481               // messages become async messages. 
 482               
 483 mday     1.18       // divert legacy messages to handleEnqueue
 484 mday     1.29       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 485 mday     1.18       {
 486               	 rq = operation->_request.remove_first() ;
 487               	 operation->unlock();
 488               	 // delete the op node 
 489 mday     1.39 	 operation->release();
 490               	 return_op( operation);
 491 mday     1.24 
 492 mday     1.26 	 handleEnqueue(rq);
 493 mday     1.18 	 return;
 494                     }
 495               
 496 mday     1.39       if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 497               	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 498 mday     1.29 	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 499                     {
 500 mday     1.49 	 
 501 mday     1.29 	 operation->unlock();
 502               	 _handle_async_callback(operation);
 503                     }
 504                     else 
 505                     {
 506               	 PEGASUS_ASSERT(rq != 0 );
 507               	 operation->unlock();
 508               	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 509                     }
 510 mday     1.5     }
 511                  return;
 512 mday     1.1  }
 513               
 514               void MessageQueueService::_handle_async_request(AsyncRequest *req)
 515               {
 516 mday     1.4     if ( req != 0 )
 517                  {
 518                     req->op->processing();
 519 mday     1.1     
 520 mday     1.4        Uint32 type = req->getType();
 521                     if( type == async_messages::HEARTBEAT )
 522               	 handle_heartbeat_request(req);
 523                     else if (type == async_messages::IOCTL)
 524               	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 525                     else if (type == async_messages::CIMSERVICE_START)
 526               	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 527                     else if (type == async_messages::CIMSERVICE_STOP)
 528               	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 529                     else if (type == async_messages::CIMSERVICE_PAUSE)
 530               	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 531                     else if (type == async_messages::CIMSERVICE_RESUME)
 532               	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 533                     else if ( type == async_messages::ASYNC_OP_START)
 534               	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 535                     else 
 536                     {
 537               	 // we don't handle this request message 
 538               	 _make_response(req, async_results::CIM_NAK );
 539                     }
 540 mday     1.1     }
 541               }
 542               
 543 mday     1.17 
 544               Boolean MessageQueueService::_enqueueResponse(
 545                  Message* request, 
 546                  Message* response)
 547 mday     1.18    
 548 mday     1.17 {
 549 kumpf    1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 550                                   "MessageQueueService::_enqueueResponse");
 551 mday     1.25 
 552                  if( request->getMask() & message_mask::ha_async)
 553                  {
 554                     if (response->getMask() & message_mask::ha_async )
 555                     {
 556               	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 557               				static_cast<AsyncReply *>(response), 
 558               				ASYNC_OPSTATE_COMPLETE, 0 );
 559 kumpf    1.37          PEG_METHOD_EXIT();
 560 mday     1.25 	 return true;
 561                     }
 562                  }
 563                  
 564 mday     1.17    if(request->_async != 0 )
 565                  {
 566                     Uint32 mask = request->_async->getMask();
 567 mday     1.18       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 568                     
 569                     AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 570                     AsyncOpNode *op = async->op;
 571                     request->_async = 0;
 572 mday     1.63       // the legacy request is going to be deleted by its handler
 573 mday     1.27       // remove it from the op node 
 574 mday     1.63 
 575                     static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 576 mday     1.18       
 577 mday     1.63             
 578 mday     1.18       AsyncLegacyOperationResult *async_result = 
 579               	 new AsyncLegacyOperationResult( 
 580               	    async->getKey(),
 581               	    async->getRouting(),
 582               	    op,
 583               	    response);
 584                     _completeAsyncResponse(async,
 585               			     async_result,
 586               			     ASYNC_OPSTATE_COMPLETE, 
 587               			     0);
 588 kumpf    1.37       PEG_METHOD_EXIT();
 589 mday     1.18       return true;
 590 mday     1.17    }
 591 mday     1.18    
 592                  // ensure that the destination queue is in response->dest
 593 kumpf    1.37    PEG_METHOD_EXIT();
 594 mday     1.24    return SendForget(response);
 595 mday     1.18    
 596 mday     1.17 }
 597               
 598 mday     1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
 599 mday     1.1  {
 600 mday     1.19    cimom::_make_response(req, code);
 601 mday     1.1  }
 602               
 603               
 604 mday     1.5  void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 605               						AsyncReply *reply, 
 606               						Uint32 state, 
 607               						Uint32 flag)
 608               {
 609 kumpf    1.37    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 610                                   "MessageQueueService::_completeAsyncResponse");
 611               
 612 mday     1.19    cimom::_completeAsyncResponse(request, reply, state, flag);
 613 kumpf    1.37 
 614                  PEG_METHOD_EXIT();
 615 mday     1.5  }
 616               
 617               
 618 mday     1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 619               					    Uint32 state, 
 620               					    Uint32 flag, 
 621               					    Uint32 code)
 622               {
 623                  cimom::_complete_op_node(op, state, flag, code);
 624               }
 625               
 626 mday     1.5  
 627               Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 628               {
 629 mday     1.8     if (_incoming_queue_shutdown.value() > 0 )
 630                     return false;
 631                  
 632 mday     1.20 // ATTN optimization remove the message checking altogether in the base 
 633               // << Mon Feb 18 14:02:20 2002 mdd >>
 634 mday     1.6     op->lock();
 635                  Message *rq = op->_request.next(0);
 636 mday     1.20    Message *rp = op->_response.next(0);
 637 mday     1.6     op->unlock();
 638                  
 639 mday     1.22    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 640               	_die.value() == 0  )
 641 mday     1.5     {
 642 mday     1.6        _incoming.insert_last_wait(op);
 643 mday     1.53       _polling_sem.signal();
 644 mday     1.5        return true;
 645                  }
 646                  return false;
 647               }
 648               
 649               Boolean MessageQueueService::messageOK(const Message *msg)
 650               {
 651 mday     1.8     if (_incoming_queue_shutdown.value() > 0 )
 652                     return false;
 653 mday     1.18    return true;
 654               }
 655               
 656 mday     1.1  void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 657               {
 658                  // default action is to echo a heartbeat response 
 659                  
 660                  AsyncReply *reply = 
 661                     new AsyncReply(async_messages::HEARTBEAT,
 662               		     req->getKey(),
 663               		     req->getRouting(),
 664               		     0,
 665               		     req->op, 
 666               		     async_results::OK, 
 667               		     req->resp,
 668               		     false);
 669 mday     1.4     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 670 mday     1.1  }
 671               
 672               
 673               void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 674               { 
 675                  ;
 676               }
 677                     
 678               void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 679               {
 680 mday     1.8     
 681                  switch( req->ctl )
 682                  {
 683                     case AsyncIoctl::IO_CLOSE:
 684                     {
 685 mday     1.76 
 686 mday     1.34 	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 687 kumpf    1.81 
 688               #ifdef MESSAGEQUEUESERVICE_DEBUG
 689 mday     1.79 	 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 690 kumpf    1.81 #endif
 691 mday     1.8  	 
 692 mday     1.76 	 // respond to this message. this is fire and forget, so we don't need to delete anything. 
 693               	 // this takes care of two problems that were being found 
 694               	 // << Thu Oct  9 10:52:48 2003 mdd >>
 695               	  _make_response(req, async_results::OK);
 696 mday     1.8  	 // ensure we do not accept any further messages
 697               
 698               	 // ensure we don't recurse on IO_CLOSE
 699               	 if( _incoming_queue_shutdown.value() > 0 )
 700               	    break;
 701               	 
 702               	 // set the closing flag 
 703               	 service->_incoming_queue_shutdown = 1;
 704               	 // empty out the queue
 705               	 while( 1 )
 706               	 {
 707               	    AsyncOpNode *operation;
 708               	    try 
 709               	    {
 710               	       operation = service->_incoming.remove_first();
 711               	    }
 712               	    catch(IPCException & )
 713               	    {
 714               	       break;
 715               	    }
 716               	    if( operation )
 717 mday     1.8  	    {
 718 mday     1.34 	       operation->_service_ptr = service;
 719               	       service->_handle_incoming_operation(operation);
 720 mday     1.8  	    }
 721               	    else
 722               	       break;
 723               	 } // message processing loop
 724 mday     1.76 	 
 725 mday     1.8  	 // shutdown the AsyncDQueue
 726               	 service->_incoming.shutdown_queue();
 727               	 return;
 728                     }
 729               
 730                     default:
 731               	 _make_response(req, async_results::CIM_NAK);
 732                  }
 733 mday     1.1  }
 734 mday     1.8  
 735 mday     1.1  void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 736               {
 737 mday     1.79 
 738 kumpf    1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
 739 mday     1.79    PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 740 kumpf    1.81 #endif
 741 mday     1.79    
 742 mday     1.10    // clear the stoped bit and update
 743 mday     1.13    _capabilities &= (~(module_capabilities::stopped));
 744 mday     1.10    _make_response(req, async_results::OK);
 745                  // now tell the meta dispatcher we are stopped 
 746                  update_service(_capabilities, _mask);
 747               
 748 mday     1.1  }
 749               void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 750               {
 751 kumpf    1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
 752 mday     1.79    PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 753 kumpf    1.81 #endif
 754 mday     1.10    // set the stopeed bit and update
 755                  _capabilities |= module_capabilities::stopped;
 756                  _make_response(req, async_results::CIM_STOPPED);
 757                  // now tell the meta dispatcher we are stopped 
 758                  update_service(_capabilities, _mask);
 759                  
 760 mday     1.1  }
 761               void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 762               {
 763 mday     1.10    // set the paused bit and update
 764 mday     1.13    _capabilities |= module_capabilities::paused;
 765 mday     1.11    update_service(_capabilities, _mask);
 766 mday     1.10    _make_response(req, async_results::CIM_PAUSED);
 767                  // now tell the meta dispatcher we are stopped 
 768 mday     1.1  }
 769               void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 770               {
 771 mday     1.10    // clear the paused  bit and update
 772 mday     1.13    _capabilities &= (~(module_capabilities::paused));
 773 mday     1.11    update_service(_capabilities, _mask);
 774 mday     1.10    _make_response(req, async_results::OK);
 775                  // now tell the meta dispatcher we are stopped 
 776 mday     1.1  }
 777                     
 778               void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 779               {
 780                  _make_response(req, async_results::CIM_NAK);
 781               }
 782               
 783               void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 784               {
 785 mday     1.14    ;
 786               }
 787               
 788 mday     1.10 
 789 mday     1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 790               {
 791                  // remove the legacy message from the request and enqueue it to its destination
 792                  Uint32 result = async_results::CIM_NAK;
 793                  
 794 mday     1.25    Message *legacy = req->_act;
 795 mday     1.14    if ( legacy != 0 )
 796                  {
 797 mday     1.25       MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 798 mday     1.14       if( queue != 0 )
 799                     {
 800 mday     1.25 	 if(queue->isAsync() == true )
 801               	 {
 802               	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 803               	 }
 804               	 else 
 805               	 {
 806               	    // Enqueue the response:
 807               	    queue->enqueue(req->get_action());
 808               	 }
 809               	 
 810 mday     1.14 	 result = async_results::OK;
 811                     }
 812                  }
 813                  _make_response(req, result);
 814               }
 815               
 816               void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 817               {
 818                  ;
 819 mday     1.1  }
 820               
 821               AsyncOpNode *MessageQueueService::get_op(void)
 822               {
 823 mday     1.4     AsyncOpNode *op = new AsyncOpNode();
 824 mday     1.1     
 825 mday     1.9     op->_state = ASYNC_OPSTATE_UNKNOWN;
 826                  op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 827 mday     1.4     
 828 mday     1.1     return op;
 829               }
 830               
 831               void MessageQueueService::return_op(AsyncOpNode *op)
 832               {
 833                  PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 834 mday     1.4     delete op;
 835 mday     1.1  }
 836               
 837 mday     1.18 
 838 mday     1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 839               				       Uint32 destination)
 840               {
 841                  PEGASUS_ASSERT(op != 0 );
 842 mday     1.30    op->lock();
 843                  op->_op_dest = MessageQueue::lookup(destination);
 844 mday     1.29    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 845                  op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 846 mday     1.30    op->unlock();
 847                  if ( op->_op_dest == 0 )
 848                     return false;
 849                     
 850 mday     1.29    return  _meta_dispatcher->route_async(op);
 851               }
 852               
 853 mday     1.39  
 854 mday     1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 855               				       Uint32 destination,
 856 mday     1.18 				       void (*callback)(AsyncOpNode *, 
 857 mday     1.32 							MessageQueue *,
 858 mday     1.30 							void *),
 859               				       MessageQueue *callback_response_q,
 860               				       void *callback_ptr)
 861 mday     1.20 { 
 862 mday     1.21    PEGASUS_ASSERT(op != 0 && callback != 0 );
 863 mday     1.18    
 864 mday     1.21    // get the queue handle for the destination
 865               
 866 mday     1.30    op->lock();
 867 mday     1.32    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 868 mday     1.22    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 869                  op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 870 mday     1.30    // initialize the callback data
 871 mday     1.32    op->_async_callback = callback;   // callback function to be executed by recpt. of response
 872                  op->_callback_node = op;          // the op node
 873                  op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 874                  op->_callback_ptr = callback_ptr;   // user data for callback
 875                  op->_callback_request_q = this;     // I am the originator of this request
 876 mday     1.30    
 877                  op->unlock();
 878                  if(op->_op_dest == 0) 
 879                     return false;
 880 mday     1.21    
 881                  return  _meta_dispatcher->route_async(op);
 882 mday     1.18 }
 883               
 884               
 885 mday     1.39 Boolean MessageQueueService::SendAsync(Message *msg, 
 886               				       Uint32 destination,
 887               				       void (*callback)(Message *response, 
 888               							void *handle, 
 889               							void *parameter),
 890               				       void *handle, 
 891               				       void *parameter)
 892               {
 893                  if(msg == NULL)
 894                     return false;
 895                  if(callback == NULL)
 896                     return SendForget(msg);
 897                  AsyncOpNode *op = get_op();
 898 mday     1.43    msg->dest = destination;
 899 mday     1.39    if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 900                  {
 901                     op->release();
 902                     return_op(op);
 903                     return false;
 904                  }
 905                  op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 906                  op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 907                  op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 908                  op->__async_callback = callback;
 909                  op->_callback_node = op;
 910                  op->_callback_handle = handle;
 911                  op->_callback_parameter = parameter;
 912                  op->_callback_response_q = this;
 913                  
 914               
 915                  if( ! (msg->getMask() & message_mask::ha_async) )
 916                  {
 917                     AsyncLegacyOperationStart *wrapper = 
 918               	 new AsyncLegacyOperationStart(get_next_xid(),
 919               				       op, 
 920 mday     1.39 				       destination, 
 921               				       msg, 
 922               				       destination);
 923                  }
 924                  else 
 925                  {
 926                     op->_request.insert_first(msg);
 927                     (static_cast<AsyncMessage *>(msg))->op = op;
 928                  }
 929                  
 930                  _callback.insert_last(op);
 931                  return _meta_dispatcher->route_async(op);
 932               }
 933               
 934               
 935 mday     1.18 Boolean MessageQueueService::SendForget(Message *msg)
 936               {
 937               
 938 mday     1.24    
 939 mday     1.18    AsyncOpNode *op = 0;
 940 mday     1.22    Uint32 mask = msg->getMask();
 941                  
 942                  if (mask & message_mask::ha_async)
 943 mday     1.18    {
 944                     op = (static_cast<AsyncMessage *>(msg))->op ;
 945                  }
 946 mday     1.22 
 947 mday     1.18    if( op == 0 )
 948 mday     1.20    {
 949 mday     1.18       op = get_op();
 950 mday     1.20       op->_request.insert_first(msg);
 951 mday     1.22       if (mask & message_mask::ha_async)
 952               	 (static_cast<AsyncMessage *>(msg))->op = op;
 953 mday     1.20    }
 954 mday     1.30    op->_op_dest = MessageQueue::lookup(msg->dest);
 955 mday     1.22    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 956 mday     1.41    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 957               		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 958 mday     1.22    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 959 mday     1.30    if ( op->_op_dest == 0 )
 960 mday     1.39    {
 961                     op->release();
 962                     return_op(op);
 963 mday     1.21       return false;
 964 mday     1.39    }
 965 mday     1.24    
 966 mday     1.18    // now see if the meta dispatcher will take it
 967 mday     1.30    return  _meta_dispatcher->route_async(op);
 968 mday     1.18 }
 969 mday     1.2  
 970 mday     1.1  
 971 mday     1.4  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 972 mday     1.1  {
 973 mday     1.4     if ( request == 0 )
 974                     return 0 ;
 975 mday     1.5  
 976                  Boolean destroy_op = false;
 977                  
 978 s.hills  1.80    if (request->op == 0)
 979 mday     1.5     {
 980                     request->op = get_op();
 981 mday     1.7        request->op->_request.insert_first(request);
 982 mday     1.5        destroy_op = true;
 983                  }
 984 mday     1.4     
 985 mday     1.33    request->block = false;
 986 mday     1.35    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 987 mday     1.33    SendAsync(request->op, 
 988               	     request->dest,
 989 mday     1.36  	     _sendwait_callback,
 990 mday     1.33 	     this,
 991               	     (void *)0);
 992 mday     1.4     
 993 mday     1.33    request->op->_client_sem.wait();
 994 mday     1.6     request->op->lock();
 995                  AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 996                  rpl->op = 0;
 997                  request->op->unlock();
 998                  
 999 mday     1.5     if( destroy_op == true)
1000                  {
1001 mday     1.6        request->op->lock();
1002                     request->op->_request.remove(request);
1003                     request->op->_state |= ASYNC_OPSTATE_RELEASED;
1004                     request->op->unlock();
1005                     return_op(request->op);
1006 mday     1.7        request->op = 0;
1007 mday     1.5     }
1008                  return rpl;
1009 mday     1.1  }
1010               
1011               
1012               Boolean MessageQueueService::register_service(String name, 
1013               					      Uint32 capabilities, 
1014               					      Uint32 mask)
1015               
1016               {
1017                  RegisterCimService *msg = new RegisterCimService(get_next_xid(),
1018 mday     1.5  						    0, 
1019 mday     1.1  						    true, 
1020               						    name, 
1021               						    capabilities, 
1022               						    mask,
1023               						    _queueId);
1024 mday     1.44    msg->dest = CIMOM_Q_ID;
1025                  
1026 mday     1.1     Boolean registered = false;
1027 mday     1.7     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
1028 mday     1.1     
1029 mday     1.2     if ( reply != 0 )
1030 mday     1.1     {
1031                     if(reply->getMask() & message_mask:: ha_async)
1032                     {
1033               	 if(reply->getMask() & message_mask::ha_reply)
1034               	 {
1035 mday     1.15 	    if(reply->result == async_results::OK || 
1036               	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
1037 mday     1.1  	       registered = true;
1038               	 }
1039                     }
1040                     
1041 mday     1.7        delete reply; 
1042 mday     1.1     }
1043 mday     1.5     delete msg;
1044 mday     1.1     return registered;
1045               }
1046               
1047               Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1048               {
1049                  
1050                  
1051                  UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
1052 mday     1.5  						0, 
1053 mday     1.1  						true, 
1054               						_queueId,
1055               						_capabilities, 
1056               						_mask);
1057                  Boolean registered = false;
1058 mday     1.2  
1059                  AsyncMessage *reply = SendWait(msg);
1060                  if (reply)
1061 mday     1.1     {
1062                     if(reply->getMask() & message_mask:: ha_async)
1063                     {
1064               	 if(reply->getMask() & message_mask::ha_reply)
1065               	 {
1066 mday     1.2  	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
1067 mday     1.1  	       registered = true;
1068               	 }
1069                     }
1070                     delete reply;
1071                  }
1072 mday     1.5     delete msg;
1073 mday     1.1     return registered;
1074               }
1075               
1076               
1077               Boolean MessageQueueService::deregister_service(void)
1078               {
1079 mday     1.3  
1080 mday     1.5     _meta_dispatcher->deregister_module(_queueId);
1081                  return true;
1082 mday     1.1  }
1083               
1084               
1085               void MessageQueueService::find_services(String name, 
1086               					Uint32 capabilities, 
1087               					Uint32 mask, 
1088               					Array<Uint32> *results)
1089               {
1090                  
1091                  if( results == 0 )
1092                     throw NullPointer();
1093 mday     1.5      
1094 mday     1.1     results->clear();
1095                  
1096                  FindServiceQueue *req = 
1097                     new FindServiceQueue(get_next_xid(), 
1098 mday     1.5  			   0, 
1099 mday     1.1  			   _queueId, 
1100               			   true, 
1101               			   name, 
1102               			   capabilities, 
1103               			   mask);
1104 mday     1.44    
1105                  req->dest = CIMOM_Q_ID;
1106 mday     1.1     
1107 mday     1.2     AsyncMessage *reply = SendWait(req); 
1108                  if(reply)
1109 mday     1.1     {
1110                     if( reply->getMask() & message_mask::ha_async)
1111                     {
1112               	 if(reply->getMask() & message_mask::ha_reply)
1113               	 {
1114               	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1115               	    {
1116               	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1117               		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1118               	    }
1119               	 }
1120                     }
1121                     delete reply;
1122                  }
1123 mday     1.5     delete req;
1124 mday     1.1     return ;
1125               }
1126               
1127               void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1128               {
1129                  if(result == 0)
1130                     throw NullPointer();
1131                  
1132                  EnumerateService *req 
1133                     = new EnumerateService(get_next_xid(),
1134 mday     1.5  			     0, 
1135 mday     1.1  			     _queueId, 
1136               			     true, 
1137               			     queue);
1138                  
1139 mday     1.2     AsyncMessage *reply = SendWait(req);
1140 mday     1.1     
1141 mday     1.2     if (reply)
1142 mday     1.1     {
1143                     Boolean found = false;
1144                     
1145                     if( reply->getMask() & message_mask::ha_async)
1146                     {
1147               	 if(reply->getMask() & message_mask::ha_reply)
1148               	 {
1149               	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1150               	    {
1151               	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1152               	       {
1153               		  if( found == false)
1154               		  {
1155               		     found = true;
1156               		     
1157               		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1158               		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1159               		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1160               		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1161               		  }
1162               	       }
1163 mday     1.1  	    }
1164               	 }
1165                     }
1166                     delete reply;
1167                  }
1168 mday     1.5     delete req;
1169                  
1170 mday     1.1     return;
1171               }
1172               
1173               Uint32 MessageQueueService::get_next_xid(void)
1174               {
1175 mday     1.78    static Mutex _monitor;
1176                  Uint32 value;
1177 a.arora  1.87    AutoMutex autoMut(_monitor);   
1178 mday     1.1     _xid++;
1179 mday     1.78    value =  _xid.value();
1180                  return value;
1181                  
1182 mday     1.1  }
1183               
1184               PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2