(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.91 //%2005////////////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 karl  1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82 // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 mday  1.1  //
  12            // Permission is hereby granted, free of charge, to any person obtaining a copy
  13            // of this software and associated documentation files (the "Software"), to
  14            // deal in the Software without restriction, including without limitation the
  15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  16            // sell copies of the Software, and to permit persons to whom the Software is
  17            // furnished to do so, subject to the following conditions:
  18 karl  1.82 // 
  19 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  20            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  21            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  22            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  23            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  25            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  26            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27            //
  28            //==============================================================================
  29            //
  30            // Author: Mike Day (mdday@us.ibm.com)
  31            //
  32            // Modified By:
  33 a.arora 1.93 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
  34 joyce.j 1.101 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
  35 mday    1.1   //
  36               //%/////////////////////////////////////////////////////////////////////////////
  37               
  38               #include "MessageQueueService.h"
  39 mday    1.22  #include <Pegasus/Common/Tracer.h>
  40 humberto 1.71  #include <Pegasus/Common/MessageLoader.h> //l10n
  41 mday     1.1   
  42                PEGASUS_NAMESPACE_BEGIN
  43                
  44 mday     1.15   
  45                cimom *MessageQueueService::_meta_dispatcher = 0;
  46                AtomicInt MessageQueueService::_service_count = 0;
  47                AtomicInt MessageQueueService::_xid(1);
  48 mday     1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  49 mday     1.15  
  50 kumpf    1.99  static struct timeval deallocateWait = {300, 0}; 
  51 mday     1.53  
  52 mday     1.55  ThreadPool *MessageQueueService::_thread_pool = 0;
  53 mday     1.53  
  54                DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  55                
  56 kumpf    1.62  Thread* MessageQueueService::_polling_thread = 0;
  57 mday     1.61  
  58 mday     1.69  ThreadPool *MessageQueueService::get_thread_pool(void)
  59                {
  60                   return _thread_pool;
  61                }
  62                
  63                PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
  64 mday     1.53  {
  65 mday     1.69  
  66                   static struct timeval now, last = {0,0};
  67 mday     1.53     gettimeofday(&now, NULL);
  68 mday     1.56     int dead_threads = 0;
  69 mday     1.53     
  70 mday     1.70     if( now.tv_sec - last.tv_sec > 120 )
  71 mday     1.53     {
  72                      gettimeofday(&last, NULL);
  73 mday     1.56        try 
  74                      {
  75 kumpf    1.99  	 dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
  76 mday     1.56        }
  77 mday     1.69        catch(...)
  78 mday     1.56        {
  79                
  80                      }
  81 mday     1.53     }
  82 jenny.yu 1.92  
  83 w.otsuka 1.94  #ifdef PEGASUS_POINTER_64BIT
  84 jenny.yu 1.92     return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
  85                #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
  86                   return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
  87                #else
  88                   return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
  89                #endif
  90 mday     1.53  }
  91                
  92                PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
  93                {
  94                   Thread *myself = reinterpret_cast<Thread *>(parm);
  95                   DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
  96                   while ( _stop_polling.value()  == 0 ) 
  97                   {
  98 baggio.br 1.100       _polling_sem.wait();
  99                 
 100 mday      1.61        if(_stop_polling.value() != 0 )
 101                       {
 102 kumpf     1.62           break;
 103 mday      1.61        }
 104                       
 105 mday      1.53        list->lock();
 106                       MessageQueueService *service = list->next(0);
 107                       while(service != NULL)
 108                       {
 109                 	 if(service->_incoming.count() > 0 )
 110                 	 {
 111 mday      1.55  	    _thread_pool->allocate_and_awaken(service, _req_proc);
 112 mday      1.53  	 }
 113                 	 service = list->next(service);
 114                       }
 115                       list->unlock();
 116 mday      1.69        if(_check_idle_flag.value() != 0 )
 117                       {
 118                 	 _check_idle_flag = 0;
 119 kumpf     1.84  
 120                          // If there are insufficent resources to run
 121                          // kill_idle_threads, then just return.
 122                          _thread_pool->allocate_and_awaken(service, kill_idle_threads);
 123 mday      1.69        }
 124 mday      1.53     }
 125                    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 126                    return(0);
 127                 }
 128                 
 129                 
 130                 Semaphore MessageQueueService::_polling_sem(0);
 131                 AtomicInt MessageQueueService::_stop_polling(0);
 132 mday      1.69  AtomicInt MessageQueueService::_check_idle_flag(0);
 133 mday      1.53  
 134 mday      1.15  
 135 mday      1.1   MessageQueueService::MessageQueueService(const char *name, 
 136                 					 Uint32 queueID, 
 137                 					 Uint32 capabilities, 
 138                 					 Uint32 mask) 
 139 mday      1.15     : Base(name, true,  queueID),
 140 mday      1.22       
 141 mday      1.1        _mask(mask),
 142 mday      1.4        _die(0),
 143 mday      1.41       _incoming(true, 0),
 144 mday      1.39       _callback(true),
 145 mday      1.7        _incoming_queue_shutdown(0),
 146 mday      1.39       _callback_ready(0),
 147                      _req_thread(_req_proc, this, false),
 148                      _callback_thread(_callback_proc, this, false)
 149 mday      1.53  
 150 mday      1.1   { 
 151 mday      1.60  
 152 mday      1.22     _capabilities = (capabilities | module_capabilities::async);
 153                    
 154 mday      1.1      _default_op_timeout.tv_sec = 30;
 155                    _default_op_timeout.tv_usec = 100;
 156 mday      1.15  
 157 a.arora   1.87     AutoMutex autoMut(_meta_dispatcher_mutex);
 158 mday      1.15     
 159                    if( _meta_dispatcher == 0 )
 160                    {
 161 joyce.j   1.101       _stop_polling = 0;
 162 mday      1.15        PEGASUS_ASSERT( _service_count.value() == 0 );
 163                       _meta_dispatcher = new cimom();
 164                       if (_meta_dispatcher == NULL )
 165                       {
 166 a.arora   1.87           throw NullPointer();
 167 mday      1.15        }
 168 kumpf     1.99        _thread_pool =
 169                           new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);  
 170 mday      1.15     }
 171                    _service_count++;
 172                 
 173                    if( false == register_service(name, _capabilities, _mask) )
 174                    {
 175 humberto  1.71        //l10n
 176                       //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 177                       MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 178 humberto  1.74  			       "MessageQueueService Base Unable to register with  Meta Dispatcher");
 179 humberto  1.71        						   
 180                       throw BindFailedException(parms);
 181 mday      1.15     }
 182                    
 183 mday      1.53     _polling_list.insert_last(this);
 184                    
 185 a.arora   1.87  //   _meta_dispatcher_mutex.unlock();  //Bug#1090
 186 mday      1.39  //   _callback_thread.run();
 187                    
 188 mday      1.53  //   _req_thread.run();
 189 mday      1.1   }
 190                 
 191 mday      1.4   
 192 mday      1.1   MessageQueueService::~MessageQueueService(void)
 193                 {
 194                    _die = 1;
 195 mday      1.76  
 196                    if (_incoming_queue_shutdown.value() == 0 )
 197                    {
 198                       _shutdown_incoming_queue();
 199                    }
 200 mday      1.39     _callback_ready.signal();
 201                    
 202 mday      1.15     {
 203 a.arora   1.87       AutoMutex autoMut(_meta_dispatcher_mutex);
 204                      _service_count--;
 205                      if (_service_count.value() == 0 )
 206                      {
 207 mday      1.76  
 208 mday      1.53        _stop_polling++;
 209                       _polling_sem.signal();
 210 a.arora   1.93  	  if (_polling_thread) {
 211                       	_polling_thread->join();
 212                       	delete _polling_thread;
 213                       	_polling_thread = 0;
 214                 	  }
 215 mday      1.15        _meta_dispatcher->_shutdown_routed_queue();
 216                       delete _meta_dispatcher;
 217 kumpf     1.45        _meta_dispatcher = 0;
 218 mday      1.76  
 219 kumpf     1.65        delete _thread_pool;
 220                       _thread_pool = 0;
 221 a.arora   1.87       }
 222                    } // mutex unlocks here
 223 mday      1.53     _polling_list.remove(this);
 224 konrad.r  1.72     // Clean up in case there are extra stuff on the queue. 
 225                   while (_incoming.count())
 226                   {
 227                 	delete _incoming.remove_first();
 228                   }
 229 mday      1.53  } 
 230 mday      1.1   
 231 mday      1.7   void MessageQueueService::_shutdown_incoming_queue(void)
 232                 {
 233                    
 234 mday      1.76     
 235 mday      1.9      if (_incoming_queue_shutdown.value() > 0 )
 236                       return ;
 237 mday      1.8      AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 238                 				    0, 
 239                 				    _queueId, 
 240                 				    _queueId, 
 241                 				    true, 
 242                 				    AsyncIoctl::IO_CLOSE, 
 243                 				    0, 
 244                 				    0);
 245 mday      1.9   
 246 mday      1.8      msg->op = get_op();
 247 mday      1.41     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 248                    msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 249                 		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 250 mday      1.32     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 251 mday      1.41  
 252 mday      1.32     msg->op->_op_dest = this;
 253 mday      1.8      msg->op->_request.insert_first(msg);
 254                    
 255                    _incoming.insert_last_wait(msg->op);
 256 mday      1.32  
 257 mday      1.7   }
 258                 
 259 mday      1.22  
 260                 
 261 kumpf     1.85  void MessageQueueService::enqueue(Message *msg)
 262 mday      1.22  {
 263 kumpf     1.28     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 264                 
 265 mday      1.22     Base::enqueue(msg);
 266 kumpf     1.28  
 267                    PEG_METHOD_EXIT();
 268 mday      1.22  }
 269                 
 270                 
 271 mday      1.39  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 272                 {
 273                    Thread *myself = reinterpret_cast<Thread *>(parm);
 274                    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 275                    AsyncOpNode *operation = 0;
 276                    
 277                    while ( service->_die.value() == 0 ) 
 278                    {
 279 baggio.br 1.100       service->_callback_ready.wait();
 280 mday      1.39        
 281                       service->_callback.lock();
 282                       operation = service->_callback.next(0);
 283                       while( operation != NULL)
 284                       {
 285                 	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 286                 	 {
 287                 	    operation = service->_callback.remove_no_lock(operation);
 288                 	    PEGASUS_ASSERT(operation != NULL);
 289                 	    operation->_thread_ptr = myself;
 290                 	    operation->_service_ptr = service;
 291                 	    service->_handle_async_callback(operation);
 292                 	    break;
 293                 	 }
 294                 	 operation = service->_callback.next(operation);
 295                       }
 296                       service->_callback.unlock();
 297                    }
 298                    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 299                    return(0);
 300                 }
 301 mday      1.39  
 302 mday      1.22  
 303 mday      1.5   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 304 mday      1.1   {
 305 mday      1.53     MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 306 brian.campbell 1.90  
 307                         if ( service->_die.value() != 0)
 308                      		 return (0);
 309                      
 310 mday           1.5      // pull messages off the incoming queue and dispatch them. then 
 311                         // check pending messages that are non-blocking
 312 mday           1.7      AsyncOpNode *operation = 0;
 313 brian.campbell 1.90  
 314                      	 // many operations may have been queued.
 315                      	 do
 316 mday           1.41  	 {
 317 brian.campbell 1.90  		 try 
 318                      		 {
 319                      			 operation = service->_incoming.remove_first();
 320                      		 }
 321                      		 catch(ListClosed &)
 322                      		 {
 323                      			 break;
 324                      		 }
 325                      
 326                      		 if (operation)
 327                      		 {
 328                      			 operation->_service_ptr = service;
 329                      			 service->_handle_incoming_operation(operation);
 330                      		 }
 331                      	 } while (operation);
 332 mday           1.44  
 333 mday           1.5      return(0);
 334 mday           1.1   }
 335                      
 336 mday           1.43  Uint32 MessageQueueService::get_pending_callback_count(void)
 337                      {
 338                         return _callback.count();
 339                      }
 340                      
 341                      
 342                      
 343 mday           1.33  void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 344                      					     MessageQueue *q, 
 345                      					     void *parm)
 346                      {
 347                         op->_client_sem.signal();
 348                      }
 349                      
 350 mday           1.30  
 351                      // callback function is responsible for cleaning up all resources
 352                      // including op, op->_callback_node, and op->_callback_ptr
 353 mday           1.22  void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 354                      {
 355 mday           1.39     if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 356                         {
 357                      
 358                            Message *msg = op->get_request();
 359                            if( msg && ( msg->getMask() & message_mask::ha_async))
 360                            {
 361                      	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 362                      	 {
 363                      	    AsyncLegacyOperationStart *wrapper = 
 364                      	       static_cast<AsyncLegacyOperationStart *>(msg);
 365                      	    msg = wrapper->get_action();
 366                      	    delete wrapper;
 367                      	 }
 368                      	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 369                      	 {
 370                      	    AsyncModuleOperationStart *wrapper = 
 371                      	       static_cast<AsyncModuleOperationStart *>(msg);
 372                      	    msg = wrapper->get_action();
 373                      	    delete wrapper;
 374                      	 }
 375                      	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 376 mday           1.39  	 {
 377                      	    AsyncOperationStart *wrapper = 
 378                      	       static_cast<AsyncOperationStart *>(msg);
 379                      	    msg = wrapper->get_action();
 380                      	    delete wrapper;
 381                      	 }
 382                      	 delete msg;
 383                            }
 384                      
 385                            msg = op->get_response();
 386                            if( msg && ( msg->getMask() & message_mask::ha_async))
 387                            {
 388                      	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 389                      	 {
 390                      	    AsyncLegacyOperationResult *wrapper = 
 391                      	       static_cast<AsyncLegacyOperationResult *>(msg);
 392                      	    msg = wrapper->get_result();
 393                      	    delete wrapper;
 394                      	 }
 395                      	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 396                      	 {
 397 mday           1.39  	    AsyncModuleOperationResult *wrapper = 
 398                      	       static_cast<AsyncModuleOperationResult *>(msg);
 399                      	    msg = wrapper->get_result();
 400                      	    delete wrapper;
 401                      	 }
 402                            }
 403                            void (*callback)(Message *, void *, void *) = op->__async_callback;
 404                            void *handle = op->_callback_handle;
 405                            void *parm = op->_callback_parameter;
 406                            op->release();
 407                            return_op(op);
 408                            callback(msg, handle, parm);
 409                         }
 410                         else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 411                         {
 412                            // note that _callback_node may be different from op 
 413                            // op->_callback_response_q is a "this" pointer we can use for 
 414                            // static callback methods
 415                            op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 416                         }
 417 mday           1.22  }
 418                      
 419 mday           1.1   
 420 mday           1.34  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 421                      //						     Thread *thread, 
 422                      //						     MessageQueue *queue)
 423 mday           1.1   {
 424 mday           1.5      if ( operation != 0 )
 425                         {
 426 mday           1.29        
 427 mday           1.22  // ATTN: optimization 
 428                      // << Tue Feb 19 14:10:38 2002 mdd >>
 429 mday           1.6         operation->lock();
 430 mday           1.29              
 431 mday           1.6         Message *rq = operation->_request.next(0);
 432 mday           1.29       
 433 mday           1.31  // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 434                      // move this to the bottom of the loop when the majority of 
 435                      // messages become async messages. 
 436                      
 437 mday           1.18        // divert legacy messages to handleEnqueue
 438 mday           1.29        if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 439 mday           1.18        {
 440                      	 rq = operation->_request.remove_first() ;
 441                      	 operation->unlock();
 442                      	 // delete the op node 
 443 mday           1.39  	 operation->release();
 444                      	 return_op( operation);
 445 mday           1.24  
 446 mday           1.26  	 handleEnqueue(rq);
 447 mday           1.18  	 return;
 448                            }
 449                      
 450 mday           1.39        if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 451                      	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 452 mday           1.29  	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 453                            {
 454 mday           1.49  	 
 455 mday           1.29  	 operation->unlock();
 456                      	 _handle_async_callback(operation);
 457                            }
 458                            else 
 459                            {
 460                      	 PEGASUS_ASSERT(rq != 0 );
 461                      	 operation->unlock();
 462                      	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 463                            }
 464 mday           1.5      }
 465                         return;
 466 mday           1.1   }
 467                      
 468                      void MessageQueueService::_handle_async_request(AsyncRequest *req)
 469                      {
 470 mday           1.4      if ( req != 0 )
 471                         {
 472                            req->op->processing();
 473 mday           1.1      
 474 mday           1.4         Uint32 type = req->getType();
 475                            if( type == async_messages::HEARTBEAT )
 476                      	 handle_heartbeat_request(req);
 477                            else if (type == async_messages::IOCTL)
 478                      	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 479                            else if (type == async_messages::CIMSERVICE_START)
 480                      	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 481                            else if (type == async_messages::CIMSERVICE_STOP)
 482                      	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 483                            else if (type == async_messages::CIMSERVICE_PAUSE)
 484                      	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 485                            else if (type == async_messages::CIMSERVICE_RESUME)
 486                      	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 487                            else if ( type == async_messages::ASYNC_OP_START)
 488                      	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 489                            else 
 490                            {
 491                      	 // we don't handle this request message 
 492                      	 _make_response(req, async_results::CIM_NAK );
 493                            }
 494 mday           1.1      }
 495                      }
 496                      
 497 mday           1.17  
 498                      Boolean MessageQueueService::_enqueueResponse(
 499                         Message* request, 
 500                         Message* response)
 501 mday           1.18     
 502 mday           1.17  {
 503 w.white        1.102 
 504                        STAT_COPYDISPATCHER
 505                      
 506 kumpf          1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 507                                          "MessageQueueService::_enqueueResponse");
 508 mday           1.25  
 509                         if( request->getMask() & message_mask::ha_async)
 510                         {
 511                            if (response->getMask() & message_mask::ha_async )
 512                            {
 513                      	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 514                      				static_cast<AsyncReply *>(response), 
 515                      				ASYNC_OPSTATE_COMPLETE, 0 );
 516 kumpf          1.37           PEG_METHOD_EXIT();
 517 mday           1.25  	 return true;
 518                            }
 519                         }
 520                         
 521 mday           1.17     if(request->_async != 0 )
 522                         {
 523                            Uint32 mask = request->_async->getMask();
 524 mday           1.18        PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 525                            
 526                            AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 527                            AsyncOpNode *op = async->op;
 528                            request->_async = 0;
 529 mday           1.63        // the legacy request is going to be deleted by its handler
 530 mday           1.27        // remove it from the op node 
 531 mday           1.63  
 532                            static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 533 mday           1.18        
 534 mday           1.63              
 535 mday           1.18        AsyncLegacyOperationResult *async_result = 
 536                      	 new AsyncLegacyOperationResult( 
 537                      	    async->getKey(),
 538                      	    async->getRouting(),
 539                      	    op,
 540                      	    response);
 541                            _completeAsyncResponse(async,
 542                      			     async_result,
 543                      			     ASYNC_OPSTATE_COMPLETE, 
 544                      			     0);
 545 kumpf          1.37        PEG_METHOD_EXIT();
 546 mday           1.18        return true;
 547 mday           1.17     }
 548 mday           1.18     
 549                         // ensure that the destination queue is in response->dest
 550 kumpf          1.37     PEG_METHOD_EXIT();
 551 mday           1.24     return SendForget(response);
 552 mday           1.18     
 553 mday           1.17  }
 554                      
 555 mday           1.18  void MessageQueueService::_make_response(Message *req, Uint32 code)
 556 mday           1.1   {
 557 mday           1.19     cimom::_make_response(req, code);
 558 mday           1.1   }
 559                      
 560                      
 561 mday           1.5   void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 562                      						AsyncReply *reply, 
 563                      						Uint32 state, 
 564                      						Uint32 flag)
 565                      {
 566 kumpf          1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 567                                          "MessageQueueService::_completeAsyncResponse");
 568                      
 569 mday           1.19     cimom::_completeAsyncResponse(request, reply, state, flag);
 570 kumpf          1.37  
 571                         PEG_METHOD_EXIT();
 572 mday           1.5   }
 573                      
 574                      
 575 mday           1.32  void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 576                      					    Uint32 state, 
 577                      					    Uint32 flag, 
 578                      					    Uint32 code)
 579                      {
 580                         cimom::_complete_op_node(op, state, flag, code);
 581                      }
 582                      
 583 mday           1.5   
 584                      Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 585                      {
 586 mday           1.8      if (_incoming_queue_shutdown.value() > 0 )
 587                            return false;
 588 a.arora        1.93     if (_polling_thread == NULL)  {
 589                            _polling_thread = new Thread(polling_routine, 
 590                      			           reinterpret_cast<void *>(&_polling_list), 
 591                      			           false);
 592                            while (!_polling_thread->run())
 593                            {
 594                               pegasus_yield();
 595                            }
 596                      	}
 597 mday           1.20  // ATTN optimization remove the message checking altogether in the base 
 598                      // << Mon Feb 18 14:02:20 2002 mdd >>
 599 mday           1.6      op->lock();
 600                         Message *rq = op->_request.next(0);
 601 mday           1.20     Message *rp = op->_response.next(0);
 602 mday           1.6      op->unlock();
 603                         
 604 mday           1.22     if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 605                      	_die.value() == 0  )
 606 mday           1.5      {
 607 mday           1.6         _incoming.insert_last_wait(op);
 608 mday           1.53        _polling_sem.signal();
 609 mday           1.5         return true;
 610                         }
 611                         return false;
 612                      }
 613                      
 614                      Boolean MessageQueueService::messageOK(const Message *msg)
 615                      {
 616 mday           1.8      if (_incoming_queue_shutdown.value() > 0 )
 617                            return false;
 618 mday           1.18     return true;
 619                      }
 620                      
 621 mday           1.1   void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 622                      {
 623                         // default action is to echo a heartbeat response 
 624                         
 625                         AsyncReply *reply = 
 626                            new AsyncReply(async_messages::HEARTBEAT,
 627                      		     req->getKey(),
 628                      		     req->getRouting(),
 629                      		     0,
 630                      		     req->op, 
 631                      		     async_results::OK, 
 632                      		     req->resp,
 633                      		     false);
 634 mday           1.4      _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 635 mday           1.1   }
 636                      
 637                      
 638                      void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 639                      { 
 640                         ;
 641                      }
 642                            
 643                      void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 644                      {
 645 mday           1.8      
 646                         switch( req->ctl )
 647                         {
 648                            case AsyncIoctl::IO_CLOSE:
 649                            {
 650 mday           1.76  
 651 mday           1.34  	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 652 kumpf          1.81  
 653                      #ifdef MESSAGEQUEUESERVICE_DEBUG
 654 mday           1.79  	 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 655 kumpf          1.81  #endif
 656 mday           1.8   	 
 657 mday           1.76  	 // respond to this message. this is fire and forget, so we don't need to delete anything. 
 658                      	 // this takes care of two problems that were being found 
 659                      	 // << Thu Oct  9 10:52:48 2003 mdd >>
 660                      	  _make_response(req, async_results::OK);
 661 mday           1.8   	 // ensure we do not accept any further messages
 662                      
 663                      	 // ensure we don't recurse on IO_CLOSE
 664                      	 if( _incoming_queue_shutdown.value() > 0 )
 665                      	    break;
 666                      	 
 667                      	 // set the closing flag 
 668                      	 service->_incoming_queue_shutdown = 1;
 669                      	 // empty out the queue
 670                      	 while( 1 )
 671                      	 {
 672                      	    AsyncOpNode *operation;
 673                      	    try 
 674                      	    {
 675                      	       operation = service->_incoming.remove_first();
 676                      	    }
 677                      	    catch(IPCException & )
 678                      	    {
 679                      	       break;
 680                      	    }
 681                      	    if( operation )
 682 mday           1.8   	    {
 683 mday           1.34  	       operation->_service_ptr = service;
 684                      	       service->_handle_incoming_operation(operation);
 685 mday           1.8   	    }
 686                      	    else
 687                      	       break;
 688                      	 } // message processing loop
 689 mday           1.76  	 
 690 mday           1.8   	 // shutdown the AsyncDQueue
 691                      	 service->_incoming.shutdown_queue();
 692                      	 return;
 693                            }
 694                      
 695                            default:
 696                      	 _make_response(req, async_results::CIM_NAK);
 697                         }
 698 mday           1.1   }
 699 mday           1.8   
 700 mday           1.1   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 701                      {
 702 mday           1.79  
 703 kumpf          1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 704 mday           1.79     PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 705 kumpf          1.81  #endif
 706 mday           1.79     
 707 mday           1.10     // clear the stoped bit and update
 708 mday           1.13     _capabilities &= (~(module_capabilities::stopped));
 709 mday           1.10     _make_response(req, async_results::OK);
 710                         // now tell the meta dispatcher we are stopped 
 711                         update_service(_capabilities, _mask);
 712                      
 713 mday           1.1   }
 714                      void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 715                      {
 716 kumpf          1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 717 mday           1.79     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 718 kumpf          1.81  #endif
 719 mday           1.10     // set the stopeed bit and update
 720                         _capabilities |= module_capabilities::stopped;
 721                         _make_response(req, async_results::CIM_STOPPED);
 722                         // now tell the meta dispatcher we are stopped 
 723                         update_service(_capabilities, _mask);
 724                         
 725 mday           1.1   }
 726                      void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 727                      {
 728 mday           1.10     // set the paused bit and update
 729 mday           1.13     _capabilities |= module_capabilities::paused;
 730 mday           1.11     update_service(_capabilities, _mask);
 731 mday           1.10     _make_response(req, async_results::CIM_PAUSED);
 732                         // now tell the meta dispatcher we are stopped 
 733 mday           1.1   }
 734                      void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 735                      {
 736 mday           1.10     // clear the paused  bit and update
 737 mday           1.13     _capabilities &= (~(module_capabilities::paused));
 738 mday           1.11     update_service(_capabilities, _mask);
 739 mday           1.10     _make_response(req, async_results::OK);
 740                         // now tell the meta dispatcher we are stopped 
 741 mday           1.1   }
 742                            
 743                      void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 744                      {
 745                         _make_response(req, async_results::CIM_NAK);
 746                      }
 747                      
 748                      void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 749                      {
 750 mday           1.14     ;
 751                      }
 752                      
 753 mday           1.10  
 754 mday           1.14  void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 755                      {
 756                         // remove the legacy message from the request and enqueue it to its destination
 757                         Uint32 result = async_results::CIM_NAK;
 758                         
 759 mday           1.25     Message *legacy = req->_act;
 760 mday           1.14     if ( legacy != 0 )
 761                         {
 762 mday           1.25        MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 763 mday           1.14        if( queue != 0 )
 764                            {
 765 mday           1.25  	 if(queue->isAsync() == true )
 766                      	 {
 767                      	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 768                      	 }
 769                      	 else 
 770                      	 {
 771                      	    // Enqueue the response:
 772                      	    queue->enqueue(req->get_action());
 773                      	 }
 774                      	 
 775 mday           1.14  	 result = async_results::OK;
 776                            }
 777                         }
 778                         _make_response(req, result);
 779                      }
 780                      
 781                      void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 782                      {
 783                         ;
 784 mday           1.1   }
 785                      
 786                      AsyncOpNode *MessageQueueService::get_op(void)
 787                      {
 788 mday           1.4      AsyncOpNode *op = new AsyncOpNode();
 789 mday           1.1      
 790 mday           1.9      op->_state = ASYNC_OPSTATE_UNKNOWN;
 791                         op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 792 mday           1.4      
 793 mday           1.1      return op;
 794                      }
 795                      
 796                      void MessageQueueService::return_op(AsyncOpNode *op)
 797                      {
 798                         PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 799 mday           1.4      delete op;
 800 mday           1.1   }
 801                      
 802 mday           1.18  
 803 mday           1.29  Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
 804                      				       Uint32 destination)
 805                      {
 806                         PEGASUS_ASSERT(op != 0 );
 807 mday           1.30     op->lock();
 808                         op->_op_dest = MessageQueue::lookup(destination);
 809 mday           1.29     op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 810                         op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 811 mday           1.30     op->unlock();
 812                         if ( op->_op_dest == 0 )
 813                            return false;
 814                            
 815 mday           1.29     return  _meta_dispatcher->route_async(op);
 816                      }
 817                      
 818 mday           1.39   
 819 mday           1.21  Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
 820                      				       Uint32 destination,
 821 mday           1.18  				       void (*callback)(AsyncOpNode *, 
 822 mday           1.32  							MessageQueue *,
 823 mday           1.30  							void *),
 824                      				       MessageQueue *callback_response_q,
 825                      				       void *callback_ptr)
 826 mday           1.20  { 
 827 mday           1.21     PEGASUS_ASSERT(op != 0 && callback != 0 );
 828 mday           1.18     
 829 mday           1.21     // get the queue handle for the destination
 830                      
 831 mday           1.30     op->lock();
 832 mday           1.32     op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 833 mday           1.22     op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 834                         op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 835 mday           1.30     // initialize the callback data
 836 mday           1.32     op->_async_callback = callback;   // callback function to be executed by recpt. of response
 837                         op->_callback_node = op;          // the op node
 838                         op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 839                         op->_callback_ptr = callback_ptr;   // user data for callback
 840                         op->_callback_request_q = this;     // I am the originator of this request
 841 mday           1.30     
 842                         op->unlock();
 843                         if(op->_op_dest == 0) 
 844                            return false;
 845 mday           1.21     
 846                         return  _meta_dispatcher->route_async(op);
 847 mday           1.18  }
 848                      
 849                      
 850 mday           1.39  Boolean MessageQueueService::SendAsync(Message *msg, 
 851                      				       Uint32 destination,
 852                      				       void (*callback)(Message *response, 
 853                      							void *handle, 
 854                      							void *parameter),
 855                      				       void *handle, 
 856                      				       void *parameter)
 857                      {
 858                         if(msg == NULL)
 859                            return false;
 860                         if(callback == NULL)
 861                            return SendForget(msg);
 862                         AsyncOpNode *op = get_op();
 863 mday           1.43     msg->dest = destination;
 864 mday           1.39     if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 865                         {
 866                            op->release();
 867                            return_op(op);
 868                            return false;
 869                         }
 870                         op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 871                         op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 872                         op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 873                         op->__async_callback = callback;
 874                         op->_callback_node = op;
 875                         op->_callback_handle = handle;
 876                         op->_callback_parameter = parameter;
 877                         op->_callback_response_q = this;
 878                         
 879                      
 880                         if( ! (msg->getMask() & message_mask::ha_async) )
 881                         {
 882                            AsyncLegacyOperationStart *wrapper = 
 883                      	 new AsyncLegacyOperationStart(get_next_xid(),
 884                      				       op, 
 885 mday           1.39  				       destination, 
 886                      				       msg, 
 887                      				       destination);
 888                         }
 889                         else 
 890                         {
 891                            op->_request.insert_first(msg);
 892                            (static_cast<AsyncMessage *>(msg))->op = op;
 893                         }
 894                         
 895                         _callback.insert_last(op);
 896                         return _meta_dispatcher->route_async(op);
 897                      }
 898                      
 899                      
 900 mday           1.18  Boolean MessageQueueService::SendForget(Message *msg)
 901                      {
 902                      
 903 mday           1.24     
 904 mday           1.18     AsyncOpNode *op = 0;
 905 mday           1.22     Uint32 mask = msg->getMask();
 906                         
 907                         if (mask & message_mask::ha_async)
 908 mday           1.18     {
 909                            op = (static_cast<AsyncMessage *>(msg))->op ;
 910                         }
 911 mday           1.22  
 912 mday           1.18     if( op == 0 )
 913 mday           1.20     {
 914 mday           1.18        op = get_op();
 915 mday           1.20        op->_request.insert_first(msg);
 916 mday           1.22        if (mask & message_mask::ha_async)
 917                      	 (static_cast<AsyncMessage *>(msg))->op = op;
 918 mday           1.20     }
 919 mday           1.30     op->_op_dest = MessageQueue::lookup(msg->dest);
 920 mday           1.22     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 921 mday           1.41     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 922                      		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 923 mday           1.22     op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 924 mday           1.30     if ( op->_op_dest == 0 )
 925 mday           1.39     {
 926                            op->release();
 927                            return_op(op);
 928 mday           1.21        return false;
 929 mday           1.39     }
 930 mday           1.24     
 931 mday           1.18     // now see if the meta dispatcher will take it
 932 mday           1.30     return  _meta_dispatcher->route_async(op);
 933 mday           1.18  }
 934 mday           1.2   
 935 mday           1.1   
 936 mday           1.4   AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 937 mday           1.1   {
 938 mday           1.4      if ( request == 0 )
 939                            return 0 ;
 940 mday           1.5   
 941                         Boolean destroy_op = false;
 942                         
 943 s.hills        1.80     if (request->op == 0)
 944 mday           1.5      {
 945                            request->op = get_op();
 946 mday           1.7         request->op->_request.insert_first(request);
 947 mday           1.5         destroy_op = true;
 948                         }
 949 mday           1.4      
 950 mday           1.33     request->block = false;
 951 mday           1.35     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 952 mday           1.33     SendAsync(request->op, 
 953                      	     request->dest,
 954 mday           1.36   	     _sendwait_callback,
 955 mday           1.33  	     this,
 956                      	     (void *)0);
 957 mday           1.4      
 958 baggio.br      1.100    request->op->_client_sem.wait();
 959                      
 960 mday           1.6      request->op->lock();
 961                         AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
 962                         rpl->op = 0;
 963                         request->op->unlock();
 964                         
 965 mday           1.5      if( destroy_op == true)
 966                         {
 967 mday           1.6         request->op->lock();
 968                            request->op->_request.remove(request);
 969                            request->op->_state |= ASYNC_OPSTATE_RELEASED;
 970                            request->op->unlock();
 971                            return_op(request->op);
 972 mday           1.7         request->op = 0;
 973 mday           1.5      }
 974                         return rpl;
 975 mday           1.1   }
 976                      
 977                      
 978                      Boolean MessageQueueService::register_service(String name, 
 979                      					      Uint32 capabilities, 
 980                      					      Uint32 mask)
 981                      
 982                      {
 983                         RegisterCimService *msg = new RegisterCimService(get_next_xid(),
 984 mday           1.5   						    0, 
 985 mday           1.1   						    true, 
 986                      						    name, 
 987                      						    capabilities, 
 988                      						    mask,
 989                      						    _queueId);
 990 mday           1.44     msg->dest = CIMOM_Q_ID;
 991                         
 992 mday           1.1      Boolean registered = false;
 993 mday           1.7      AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
 994 mday           1.1      
 995 mday           1.2      if ( reply != 0 )
 996 mday           1.1      {
 997                            if(reply->getMask() & message_mask:: ha_async)
 998                            {
 999                      	 if(reply->getMask() & message_mask::ha_reply)
1000                      	 {
1001 mday           1.15  	    if(reply->result == async_results::OK || 
1002                      	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
1003 mday           1.1   	       registered = true;
1004                      	 }
1005                            }
1006                            
1007 mday           1.7         delete reply; 
1008 mday           1.1      }
1009 mday           1.5      delete msg;
1010 mday           1.1      return registered;
1011                      }
1012                      
1013                      Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1014                      {
1015                         
1016                         
1017                         UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
1018 mday           1.5   						0, 
1019 mday           1.1   						true, 
1020                      						_queueId,
1021                      						_capabilities, 
1022                      						_mask);
1023                         Boolean registered = false;
1024 mday           1.2   
1025                         AsyncMessage *reply = SendWait(msg);
1026                         if (reply)
1027 mday           1.1      {
1028                            if(reply->getMask() & message_mask:: ha_async)
1029                            {
1030                      	 if(reply->getMask() & message_mask::ha_reply)
1031                      	 {
1032 mday           1.2   	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
1033 mday           1.1   	       registered = true;
1034                      	 }
1035                            }
1036                            delete reply;
1037                         }
1038 mday           1.5      delete msg;
1039 mday           1.1      return registered;
1040                      }
1041                      
1042                      
1043                      Boolean MessageQueueService::deregister_service(void)
1044                      {
1045 mday           1.3   
1046 mday           1.5      _meta_dispatcher->deregister_module(_queueId);
1047                         return true;
1048 mday           1.1   }
1049                      
1050                      
1051                      void MessageQueueService::find_services(String name, 
1052                      					Uint32 capabilities, 
1053                      					Uint32 mask, 
1054                      					Array<Uint32> *results)
1055                      {
1056                         
1057                         if( results == 0 )
1058                            throw NullPointer();
1059 mday           1.5       
1060 mday           1.1      results->clear();
1061                         
1062                         FindServiceQueue *req = 
1063                            new FindServiceQueue(get_next_xid(), 
1064 mday           1.5   			   0, 
1065 mday           1.1   			   _queueId, 
1066                      			   true, 
1067                      			   name, 
1068                      			   capabilities, 
1069                      			   mask);
1070 mday           1.44     
1071                         req->dest = CIMOM_Q_ID;
1072 mday           1.1      
1073 mday           1.2      AsyncMessage *reply = SendWait(req); 
1074                         if(reply)
1075 mday           1.1      {
1076                            if( reply->getMask() & message_mask::ha_async)
1077                            {
1078                      	 if(reply->getMask() & message_mask::ha_reply)
1079                      	 {
1080                      	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1081                      	    {
1082                      	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1083                      		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1084                      	    }
1085                      	 }
1086                            }
1087                            delete reply;
1088                         }
1089 mday           1.5      delete req;
1090 mday           1.1      return ;
1091                      }
1092                      
1093                      void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1094                      {
1095                         if(result == 0)
1096                            throw NullPointer();
1097                         
1098                         EnumerateService *req 
1099                            = new EnumerateService(get_next_xid(),
1100 mday           1.5   			     0, 
1101 mday           1.1   			     _queueId, 
1102                      			     true, 
1103                      			     queue);
1104                         
1105 mday           1.2      AsyncMessage *reply = SendWait(req);
1106 mday           1.1      
1107 mday           1.2      if (reply)
1108 mday           1.1      {
1109                            Boolean found = false;
1110                            
1111                            if( reply->getMask() & message_mask::ha_async)
1112                            {
1113                      	 if(reply->getMask() & message_mask::ha_reply)
1114                      	 {
1115                      	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1116                      	    {
1117                      	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1118                      	       {
1119                      		  if( found == false)
1120                      		  {
1121                      		     found = true;
1122                      		     
1123                      		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1124                      		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1125                      		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1126                      		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1127                      		  }
1128                      	       }
1129 mday           1.1   	    }
1130                      	 }
1131                            }
1132                            delete reply;
1133                         }
1134 mday           1.5      delete req;
1135                         
1136 mday           1.1      return;
1137                      }
1138                      
1139                      Uint32 MessageQueueService::get_next_xid(void)
1140                      {
1141 mday           1.78     static Mutex _monitor;
1142                         Uint32 value;
1143 a.arora        1.87     AutoMutex autoMut(_monitor);   
1144 mday           1.1      _xid++;
1145 mday           1.78     value =  _xid.value();
1146                         return value;
1147                         
1148 mday           1.1   }
1149                      
1150                      PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2