(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 w.otsuka 1.88.2.1 //%2005////////////////////////////////////////////////////////////////////////
   2 mday     1.1      //
   3 karl     1.88     // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                   // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                   // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl     1.82     // IBM Corp.; EMC Corporation, The Open Group.
   7 karl     1.88     // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                   // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 w.otsuka 1.88.2.1 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                   // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 mday     1.1      //
  12                   // Permission is hereby granted, free of charge, to any person obtaining a copy
  13                   // of this software and associated documentation files (the "Software"), to
  14                   // deal in the Software without restriction, including without limitation the
  15                   // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  16                   // sell copies of the Software, and to permit persons to whom the Software is
  17                   // furnished to do so, subject to the following conditions:
  18 karl     1.82     // 
  19 mday     1.1      // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  20                   // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  21                   // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  22                   // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  23                   // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24                   // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  25                   // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  26                   // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27                   //
  28                   //==============================================================================
  29                   //
  30                   // Author: Mike Day (mdday@us.ibm.com)
  31                   //
  32                   // Modified By:
  33 a.arora  1.87     //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
  34 mday     1.1      //
  35                   //%/////////////////////////////////////////////////////////////////////////////
  36                   
  37                   #include "MessageQueueService.h"
  38 mday     1.22     #include <Pegasus/Common/Tracer.h>
  39 humberto 1.71     #include <Pegasus/Common/MessageLoader.h> //l10n
  40 mday     1.1      
  41                   PEGASUS_NAMESPACE_BEGIN
  42                   
  43 mday     1.15      
  44                   cimom *MessageQueueService::_meta_dispatcher = 0;
  45                   AtomicInt MessageQueueService::_service_count = 0;
  46                   AtomicInt MessageQueueService::_xid(1);
  47 mday     1.38     Mutex MessageQueueService::_meta_dispatcher_mutex;
  48 mday     1.15     
  49 mday     1.58     static struct timeval create_time = {0, 1};
  50 mday     1.70     static struct timeval destroy_time = {300, 0}; 
  51 mday     1.59     static struct timeval deadlock_time = {0, 0};
  52 mday     1.53     
  53 mday     1.55     ThreadPool *MessageQueueService::_thread_pool = 0;
  54 mday     1.53     
  55 kumpf    1.88.2.7 DQueue<MessageQueueService>* MessageQueueService::_polling_list = 0;
  56 mday     1.53     
  57 kumpf    1.62     Thread* MessageQueueService::_polling_thread = 0;
  58 mday     1.61     
  59 mday     1.69     ThreadPool *MessageQueueService::get_thread_pool(void)
  60                   {
  61                      return _thread_pool;
  62                   }
  63                   
  64 denise.eckstein 1.88.2.4 //
  65                          // MAX_THREADS_PER_SVC_QUEUE_LIMIT
  66                          //
  67                          // JR Wunderlich Jun 6, 2005
  68                          //
  69                          
  70                          #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
  71                          #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  72                          
  73                          Uint32 max_threads_per_svc_queue;
  74                          
  75 mday            1.69     PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
  76 mday            1.53     {
  77 mday            1.69     
  78                             static struct timeval now, last = {0,0};
  79 mday            1.53        gettimeofday(&now, NULL);
  80 mday            1.56        int dead_threads = 0;
  81 mday            1.53        
  82 mday            1.70        if( now.tv_sec - last.tv_sec > 120 )
  83 mday            1.53        {
  84                                gettimeofday(&last, NULL);
  85 mday            1.56           try 
  86                                {
  87 mday            1.69     	 dead_threads =  MessageQueueService::_thread_pool->kill_dead_threads();
  88 mday            1.56           }
  89 mday            1.69           catch(...)
  90 mday            1.56           {
  91                          
  92                                }
  93 mday            1.53        }
  94 w.otsuka        1.88.2.1 
  95                          #ifdef PEGASUS_POINTER_64BIT
  96                             return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
  97                          #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
  98                             return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
  99                          #else
 100                             return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
 101                          #endif
 102 mday            1.53     }
 103                          
 104 mday            1.68     
 105 mday            1.76     void MessageQueueService::force_shutdown(Boolean destroy_flag)
 106 mday            1.68     {
 107 mday            1.79        return;
 108                             
 109 kumpf           1.75     #ifdef MESSAGEQUEUESERVICE_DEBUG
 110 humberto        1.71     	//l10n
 111                             MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
 112 humberto        1.74     			    "Forcing shutdown of CIMOM Message Router");
 113 humberto        1.71        PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
 114 kumpf           1.75     #endif
 115 mday            1.76     
 116 kumpf           1.75     
 117 mday            1.68        MessageQueueService *svc;
 118 konrad.r        1.73        int counter = 0;   
 119 kumpf           1.88.2.7    _polling_list->lock();
 120                             svc = _polling_list->next(0);
 121 humberto        1.71        
 122 mday            1.68        while(svc != 0)
 123                             {
 124 kumpf           1.75     #ifdef MESSAGEQUEUESERVICE_DEBUG
 125 humberto        1.71        		//l10n - reuse same MessageLoaderParms to avoid multiple creates
 126                                	parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";
 127                             	  	parms.default_msg = "Stopping $0"; 
 128                             	  	parms.arg0 = svc->getQueueName();
 129                             	  	PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
 130 kumpf           1.75     #endif
 131 humberto        1.71           							
 132 kumpf           1.88.2.7       _polling_sem->signal();
 133 mday            1.68           svc->_shutdown_incoming_queue();
 134 konrad.r        1.73           counter++;
 135 kumpf           1.88.2.7       _polling_sem->signal();
 136                                svc = _polling_list->next(svc);
 137 mday            1.68        }
 138 kumpf           1.88.2.7    _polling_list->unlock();
 139 konrad.r        1.73     
 140 kumpf           1.88.2.7    _polling_sem->signal();
 141 konrad.r        1.73     
 142 kumpf           1.88.2.7    *MessageQueueService::_stop_polling = 1;
 143 mday            1.76        
 144                             if(destroy_flag == true) 
 145                             {
 146                          
 147 kumpf           1.88.2.7       svc = _polling_list->remove_last();
 148 mday            1.76           while(svc)
 149                                {
 150                          	 delete svc;
 151 kumpf           1.88.2.7 	 svc = _polling_list->remove_last();
 152 mday            1.76           }
 153                                
 154 konrad.r        1.73        }
 155 mday            1.68     }
 156                          
 157                          
 158 mday            1.53     PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 159                          {
 160                             Thread *myself = reinterpret_cast<Thread *>(parm);
 161                             DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
 162 kumpf           1.88.2.7    while ( _stop_polling->value()  == 0 ) 
 163 mday            1.53        {
 164 kumpf           1.88.2.7       _polling_sem->wait();
 165                                if(_stop_polling->value() != 0 )
 166 mday            1.61           {
 167 kumpf           1.62              break;
 168 mday            1.61           }
 169 denise.eckstein 1.88.2.5    
 170                                // The polling_routine thread must hold the lock on the
 171                                // _polling_thread list while processing incoming messages.
 172                                // This lock is used to give this thread ownership of
 173                                // services on the _polling_routine list.
 174                          
 175                                // This is necessary to avoid confict with other threads
 176                                // processing the _polling_list
 177                                // (e.g., MessageQueueServer::~MessageQueueService).
 178                             
 179 mday            1.53           list->lock();
 180                                MessageQueueService *service = list->next(0);
 181 denise.eckstein 1.88.2.5       ThreadStatus rtn = PEGASUS_THREAD_OK;
 182                                while (service != NULL)
 183 mday            1.53           {
 184 denise.eckstein 1.88.2.5           if ((service->_incoming.count() > 0) &&
 185                                        (service->_die.value() == 0) &&
 186                                        (service->_threads < max_threads_per_svc_queue))
 187                                    {
 188                                       // The _threads count is used to track the 
 189                                       // number of active threads that have been allocated
 190                                       // to process messages for this service.
 191                          
 192                                       // The _threads count MUST be incremented while
 193                                       // the polling_routine owns the _polling_thread
 194                                       // lock and has ownership of the service object.
 195                          
 196                                       service->_threads++;
 197                                       try
 198                                       {
 199                                           rtn = _thread_pool->allocate_and_awaken(
 200 kumpf           1.88.2.7                       service, _req_proc, _polling_sem);
 201 denise.eckstein 1.88.2.5              }
 202                                       catch (...)
 203                                       {
 204                                           service->_threads--;
 205                          
 206                                           // allocate_and_awaken should never generate an exception.
 207                                           PEGASUS_ASSERT(0);
 208                                       }
 209                                       // if no more threads available, break from processing loop
 210                                       if (rtn != PEGASUS_THREAD_OK )
 211                                       {
 212                                           service->_threads--;
 213                                           Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 214                                              "Not enough threads to process this request. Skipping.");
 215                          
 216                                           Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 217                                              "Could not allocate thread for %s. " \
 218                                              "Queue has %d messages waiting and %d threads servicing." \
 219                                              "Skipping the service for right now. ",
 220                                              service->getQueueName(),
 221                                              service->_incoming.count(),
 222 denise.eckstein 1.88.2.5                     service->_threads.value());
 223                          
 224                                           pegasus_yield();
 225                                           service = NULL;
 226                                        } 
 227                                    }
 228                                    if (service != NULL) 
 229                                    {
 230                                       service = list->next(service);
 231                                    }
 232 mday            1.53           }
 233                                list->unlock();
 234 denise.eckstein 1.88.2.5 
 235 kumpf           1.88.2.7       if(_check_idle_flag->value() != 0 )
 236 mday            1.69           {
 237 kumpf           1.88.2.7 	 *_check_idle_flag = 0;
 238 kumpf           1.84     
 239 denise.eckstein 1.88.2.3          // try to do idle thread clean up processing when system is not busy
 240                                   // if system is busy there may not be a thread available to allocate
 241                                   // so nothing will be done and that is OK.
 242                          
 243                                   if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads,
 244 kumpf           1.88.2.7               _polling_sem) != PEGASUS_THREAD_OK)
 245 denise.eckstein 1.88.2.3          {
 246                                       Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 247                                          "Not enough threads to kill idle threads. What an irony.");
 248                          
 249                                       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 250                                          "Could not allocate thread to kill idle threads." \
 251                                          "Skipping. ");
 252                                   }
 253 mday            1.69           }
 254 mday            1.53        }
 255                             myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 256                             return(0);
 257                          }
 258                          
 259                          
 260 kumpf           1.88.2.7 Semaphore* MessageQueueService::_polling_sem = 0;
 261                          AtomicInt* MessageQueueService::_stop_polling = 0;
 262                          AtomicInt* MessageQueueService::_check_idle_flag = 0;
 263 mday            1.53     
 264 mday            1.15     
 265 mday            1.1      MessageQueueService::MessageQueueService(const char *name, 
 266                          					 Uint32 queueID, 
 267                          					 Uint32 capabilities, 
 268                          					 Uint32 mask) 
 269 mday            1.15        : Base(name, true,  queueID),
 270 mday            1.22          
 271 mday            1.1           _mask(mask),
 272 mday            1.4           _die(0),
 273 denise.eckstein 1.88.2.3      _threads(0),
 274 mday            1.41          _incoming(true, 0),
 275 mday            1.39          _callback(true),
 276 mday            1.7           _incoming_queue_shutdown(0),
 277 mday            1.39          _callback_ready(0),
 278                               _req_thread(_req_proc, this, false),
 279                               _callback_thread(_callback_proc, this, false)
 280 mday            1.53     
 281 mday            1.1      { 
 282 mday            1.60     
 283 mday            1.22        _capabilities = (capabilities | module_capabilities::async);
 284                             
 285 mday            1.1         _default_op_timeout.tv_sec = 30;
 286                             _default_op_timeout.tv_usec = 100;
 287 mday            1.15     
 288 denise.eckstein 1.88.2.4    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 289                          
 290                             // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT
 291                             // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
 292                          
 293                             if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)
 294                             {
 295                                max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 296                             }
 297                          
 298                             // if requested threads eq 0 (unlimited)
 299                             // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
 300                          
 301                             if (max_threads_per_svc_queue == 0)
 302                             {
 303                                max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_DEFAULT;
 304                             }
 305                          
 306                             // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;
 307                             // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;
 308                          
 309 a.arora         1.87        AutoMutex autoMut(_meta_dispatcher_mutex);
 310 mday            1.15        
 311                             if( _meta_dispatcher == 0 )
 312                             {
 313 kumpf           1.88.2.7       // Instantiate the common objects
 314                                _polling_list = new DQueue<MessageQueueService>(true);
 315                                _stop_polling = new AtomicInt(0);
 316                                _polling_sem = new Semaphore(0);
 317                                _check_idle_flag = new AtomicInt(0);
 318                          
 319                                *_stop_polling = 0;
 320 mday            1.15           PEGASUS_ASSERT( _service_count.value() == 0 );
 321                                _meta_dispatcher = new cimom();
 322                                if (_meta_dispatcher == NULL )
 323                                {
 324 a.arora         1.87              throw NullPointer();
 325 mday            1.15           }
 326 mday            1.58           _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
 327 mday            1.55     				    create_time, destroy_time, deadlock_time);  
 328 mday            1.61           
 329 kumpf           1.62           _polling_thread = new Thread(polling_routine, 
 330 kumpf           1.88.2.7 			           reinterpret_cast<void *>(_polling_list), 
 331 kumpf           1.62     			           false);
 332 kumpf           1.83           while (!_polling_thread->run())
 333                                {
 334                                   pegasus_yield();
 335                                }
 336 mday            1.15        }
 337                             _service_count++;
 338                          
 339                             if( false == register_service(name, _capabilities, _mask) )
 340                             {
 341 humberto        1.71           //l10n
 342                                //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 343                                MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 344 humberto        1.74     			       "MessageQueueService Base Unable to register with  Meta Dispatcher");
 345 humberto        1.71           						   
 346                                throw BindFailedException(parms);
 347 mday            1.15        }
 348                             
 349 kumpf           1.88.2.7    _polling_list->insert_last(this);
 350 mday            1.53        
 351 a.arora         1.87     //   _meta_dispatcher_mutex.unlock();  //Bug#1090
 352 mday            1.39     //   _callback_thread.run();
 353                             
 354 mday            1.53     //   _req_thread.run();
 355 mday            1.1      }
 356                          
 357 mday            1.4      
 358 mday            1.1      MessageQueueService::~MessageQueueService(void)
 359                          {
 360                             _die = 1;
 361 mday            1.76     
 362 denise.eckstein 1.88.2.5    // The polling_routine locks the _polling_list while
 363                             // processing the incoming messages for services on the 
 364                             // list.  Deleting the service from the _polling_list
 365                             // prior to processing, avoids synchronization issues
 366                             // with the _polling_routine.
 367                          
 368 kumpf           1.88.2.7    _polling_list->remove(this);
 369 denise.eckstein 1.88.2.5 
 370                             _callback_ready.signal();
 371                          
 372                             // ATTN: The code for closing the _incoming queue
 373                             // is not working correctly. In OpenPegasus 2.4,
 374                             // execution of the following code is very timing
 375                             // dependent. This needs to be fix.
 376                             // See Bug 4079 for details. 
 377                             if (_incoming_queue_shutdown.value() == 0)
 378 mday            1.76        {
 379 denise.eckstein 1.88.2.5        _shutdown_incoming_queue();
 380 mday            1.76        }
 381 denise.eckstein 1.88.2.3 
 382 denise.eckstein 1.88.2.5    // Wait until all threads processing the messages
 383                             // for this service have completed.
 384                          
 385                             while (_threads.value() > 0)
 386                             {
 387                                pegasus_yield();
 388                             }
 389 denise.eckstein 1.88.2.3 
 390 mday            1.15        {
 391 a.arora         1.87          AutoMutex autoMut(_meta_dispatcher_mutex);
 392                               _service_count--;
 393                               if (_service_count.value() == 0 )
 394                               {
 395 mday            1.76     
 396 kumpf           1.88.2.7       (*_stop_polling)++;
 397                                _polling_sem->signal();
 398 kumpf           1.62           _polling_thread->join();
 399                                delete _polling_thread;
 400 kumpf           1.65           _polling_thread = 0;
 401 mday            1.15           _meta_dispatcher->_shutdown_routed_queue();
 402                                delete _meta_dispatcher;
 403 kumpf           1.45           _meta_dispatcher = 0;
 404 mday            1.76     
 405 kumpf           1.65           delete _thread_pool;
 406                                _thread_pool = 0;
 407 kumpf           1.88.2.7 
 408                                // Clean up the common objects
 409                                delete _check_idle_flag;
 410                                delete _polling_sem;
 411                                delete _stop_polling;
 412                                delete _polling_list;
 413 a.arora         1.87          }
 414                             } // mutex unlocks here
 415 konrad.r        1.72        // Clean up in case there are extra stuff on the queue. 
 416 denise.eckstein 1.88.2.3    while (_incoming.count()) 
 417                             { 
 418                                 try 
 419                                 { 
 420                                    delete _incoming.remove_first(); 
 421                                 }
 422                                 catch (const ListClosed &e) 
 423                                 { 
 424                                    // If the list is closed, there is nothing we can do. 
 425                                    break; 
 426                                 } 
 427                             } 
 428 mday            1.53     } 
 429 mday            1.1      
 430 mday            1.7      void MessageQueueService::_shutdown_incoming_queue(void)
 431                          {
 432                             
 433 mday            1.76        
 434 mday            1.9         if (_incoming_queue_shutdown.value() > 0 )
 435                                return ;
 436 mday            1.8         AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
 437                          				    0, 
 438                          				    _queueId, 
 439                          				    _queueId, 
 440                          				    true, 
 441                          				    AsyncIoctl::IO_CLOSE, 
 442                          				    0, 
 443                          				    0);
 444 mday            1.9      
 445 mday            1.8         msg->op = get_op();
 446 mday            1.41        msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 447                             msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
 448                          		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 449 mday            1.32        msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 450 mday            1.41     
 451 mday            1.32        msg->op->_op_dest = this;
 452 mday            1.8         msg->op->_request.insert_first(msg);
 453 mday            1.32     
 454 denise.eckstein 1.88.2.3    try 
 455                             {
 456                                 _incoming.insert_last_wait(msg->op);
 457 kumpf           1.88.2.7        _polling_sem->signal(); 
 458 denise.eckstein 1.88.2.3    } 
 459                             catch (const ListClosed &) 
 460                             { 
 461                                 // This means the queue has already been shut-down (happens  when there 
 462                                 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first 
 463                                 // processed. 
 464                                 delete msg; 
 465                             } 
 466                             catch (const Permission &) 
 467                             { 
 468                                 delete msg; 
 469                             } 
 470 mday            1.7      }
 471                          
 472 mday            1.22     
 473                          
 474 kumpf           1.85     void MessageQueueService::enqueue(Message *msg)
 475 mday            1.22     {
 476 kumpf           1.28        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 477                          
 478 mday            1.22        Base::enqueue(msg);
 479 kumpf           1.28     
 480                             PEG_METHOD_EXIT();
 481 mday            1.22     }
 482                          
 483                          
 484 mday            1.39     PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 485                          {
 486                             Thread *myself = reinterpret_cast<Thread *>(parm);
 487                             MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 488                             AsyncOpNode *operation = 0;
 489                             
 490                             while ( service->_die.value() == 0 ) 
 491                             {
 492                                service->_callback_ready.wait();
 493                                
 494                                service->_callback.lock();
 495                                operation = service->_callback.next(0);
 496                                while( operation != NULL)
 497                                {
 498                          	 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
 499                          	 {
 500                          	    operation = service->_callback.remove_no_lock(operation);
 501                          	    PEGASUS_ASSERT(operation != NULL);
 502                          	    operation->_thread_ptr = myself;
 503                          	    operation->_service_ptr = service;
 504                          	    service->_handle_async_callback(operation);
 505 mday            1.39     	    break;
 506                          	 }
 507                          	 operation = service->_callback.next(operation);
 508                                }
 509                                service->_callback.unlock();
 510                             }
 511                             myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 512                             return(0);
 513                          }
 514                          
 515 mday            1.22     
 516 mday            1.5      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 517 mday            1.1      {
 518 mday            1.53        MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
 519 mday            1.5         // pull messages off the incoming queue and dispatch them. then 
 520                             // check pending messages that are non-blocking
 521 mday            1.7         AsyncOpNode *operation = 0;
 522                             
 523 mday            1.56        if ( service->_die.value() == 0 ) 
 524                              {
 525 mday            1.41     	 try 
 526                          	 {
 527 mday            1.53     	    operation = service->_incoming.remove_first();
 528 mday            1.41     	 }
 529                          	 catch(ListClosed & )
 530                          	 {
 531 mday            1.56     	    operation = 0;
 532 denise.eckstein 1.88.2.3             service->_threads--;
 533 mday            1.56     	    return(0);
 534 mday            1.41     	 }
 535                          	 if( operation )
 536                          	 {
 537                          	    operation->_service_ptr = service;
 538                          	    service->_handle_incoming_operation(operation);
 539                          	 }
 540 mday            1.56         }
 541 denise.eckstein 1.88.2.3    service->_threads--;
 542 mday            1.5         return(0);
 543 mday            1.1      }
 544                          
 545 mday            1.43     Uint32 MessageQueueService::get_pending_callback_count(void)
 546                          {
 547                             return _callback.count();
 548                          }
 549                          
 550                          
 551                          
 552 mday            1.33     void MessageQueueService::_sendwait_callback(AsyncOpNode *op, 
 553                          					     MessageQueue *q, 
 554                          					     void *parm)
 555                          {
 556                             op->_client_sem.signal();
 557                          }
 558                          
 559 mday            1.30     
 560                          // callback function is responsible for cleaning up all resources
 561                          // including op, op->_callback_node, and op->_callback_ptr
 562 mday            1.22     void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 563                          {
 564 mday            1.39        if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
 565                             {
 566                          
 567                                Message *msg = op->get_request();
 568                                if( msg && ( msg->getMask() & message_mask::ha_async))
 569                                {
 570                          	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
 571                          	 {
 572                          	    AsyncLegacyOperationStart *wrapper = 
 573                          	       static_cast<AsyncLegacyOperationStart *>(msg);
 574                          	    msg = wrapper->get_action();
 575                          	    delete wrapper;
 576                          	 }
 577                          	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 578                          	 {
 579                          	    AsyncModuleOperationStart *wrapper = 
 580                          	       static_cast<AsyncModuleOperationStart *>(msg);
 581                          	    msg = wrapper->get_action();
 582                          	    delete wrapper;
 583                          	 }
 584                          	 else if (msg->getType() == async_messages::ASYNC_OP_START)
 585 mday            1.39     	 {
 586                          	    AsyncOperationStart *wrapper = 
 587                          	       static_cast<AsyncOperationStart *>(msg);
 588                          	    msg = wrapper->get_action();
 589                          	    delete wrapper;
 590                          	 }
 591                          	 delete msg;
 592                                }
 593                          
 594                                msg = op->get_response();
 595                                if( msg && ( msg->getMask() & message_mask::ha_async))
 596                                {
 597                          	 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
 598                          	 {
 599                          	    AsyncLegacyOperationResult *wrapper = 
 600                          	       static_cast<AsyncLegacyOperationResult *>(msg);
 601                          	    msg = wrapper->get_result();
 602                          	    delete wrapper;
 603                          	 }
 604                          	 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 605                          	 {
 606 mday            1.39     	    AsyncModuleOperationResult *wrapper = 
 607                          	       static_cast<AsyncModuleOperationResult *>(msg);
 608                          	    msg = wrapper->get_result();
 609                          	    delete wrapper;
 610                          	 }
 611                                }
 612                                void (*callback)(Message *, void *, void *) = op->__async_callback;
 613                                void *handle = op->_callback_handle;
 614                                void *parm = op->_callback_parameter;
 615                                op->release();
 616                                return_op(op);
 617                                callback(msg, handle, parm);
 618                             }
 619                             else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
 620                             {
 621                                // note that _callback_node may be different from op 
 622                                // op->_callback_response_q is a "this" pointer we can use for 
 623                                // static callback methods
 624                                op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 625                             }
 626 mday            1.22     }
 627                          
 628 mday            1.1      
 629 mday            1.34     void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) 
 630                          //						     Thread *thread, 
 631                          //						     MessageQueue *queue)
 632 mday            1.1      {
 633 mday            1.5         if ( operation != 0 )
 634                             {
 635 mday            1.29           
 636 mday            1.22     // ATTN: optimization 
 637                          // << Tue Feb 19 14:10:38 2002 mdd >>
 638 mday            1.6            operation->lock();
 639 mday            1.29                 
 640 mday            1.6            Message *rq = operation->_request.next(0);
 641 mday            1.29          
 642 mday            1.31     // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 643                          // move this to the bottom of the loop when the majority of 
 644                          // messages become async messages. 
 645                          
 646 mday            1.18           // divert legacy messages to handleEnqueue
 647 mday            1.29           if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 648 mday            1.18           {
 649                          	 rq = operation->_request.remove_first() ;
 650                          	 operation->unlock();
 651                          	 // delete the op node 
 652 mday            1.39     	 operation->release();
 653                          	 return_op( operation);
 654 mday            1.24     
 655 mday            1.26     	 handleEnqueue(rq);
 656 mday            1.18     	 return;
 657                                }
 658                          
 659 mday            1.39           if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK || 
 660                          	    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && 
 661 mday            1.29     	   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 662                                {
 663 mday            1.49     	 
 664 mday            1.29     	 operation->unlock();
 665                          	 _handle_async_callback(operation);
 666                                }
 667                                else 
 668                                {
 669                          	 PEGASUS_ASSERT(rq != 0 );
 670                          	 operation->unlock();
 671                          	 _handle_async_request(static_cast<AsyncRequest *>(rq));
 672                                }
 673 mday            1.5         }
 674                             return;
 675 mday            1.1      }
 676                          
 677                          void MessageQueueService::_handle_async_request(AsyncRequest *req)
 678                          {
 679 mday            1.4         if ( req != 0 )
 680                             {
 681                                req->op->processing();
 682 mday            1.1         
 683 mday            1.4            Uint32 type = req->getType();
 684                                if( type == async_messages::HEARTBEAT )
 685                          	 handle_heartbeat_request(req);
 686                                else if (type == async_messages::IOCTL)
 687                          	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 688                                else if (type == async_messages::CIMSERVICE_START)
 689                          	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 690                                else if (type == async_messages::CIMSERVICE_STOP)
 691                          	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 692                                else if (type == async_messages::CIMSERVICE_PAUSE)
 693                          	 handle_CimServicePause(static_cast<CimServicePause *>(req));
 694                                else if (type == async_messages::CIMSERVICE_RESUME)
 695                          	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 696                                else if ( type == async_messages::ASYNC_OP_START)
 697                          	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 698                                else 
 699                                {
 700                          	 // we don't handle this request message 
 701                          	 _make_response(req, async_results::CIM_NAK );
 702                                }
 703 mday            1.1         }
 704                          }
 705                          
 706 mday            1.17     
 707                          Boolean MessageQueueService::_enqueueResponse(
 708                             Message* request, 
 709                             Message* response)
 710 mday            1.18        
 711 mday            1.17     {
 712 kumpf           1.37        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 713                                              "MessageQueueService::_enqueueResponse");
 714 mday            1.25     
 715                             if( request->getMask() & message_mask::ha_async)
 716                             {
 717                                if (response->getMask() & message_mask::ha_async )
 718                                {
 719                          	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
 720                          				static_cast<AsyncReply *>(response), 
 721                          				ASYNC_OPSTATE_COMPLETE, 0 );
 722 kumpf           1.37              PEG_METHOD_EXIT();
 723 mday            1.25     	 return true;
 724                                }
 725                             }
 726                             
 727 mday            1.17        if(request->_async != 0 )
 728                             {
 729                                Uint32 mask = request->_async->getMask();
 730 mday            1.18           PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
 731                                
 732                                AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 733                                AsyncOpNode *op = async->op;
 734                                request->_async = 0;
 735 mday            1.63           // the legacy request is going to be deleted by its handler
 736 mday            1.27           // remove it from the op node 
 737 mday            1.63     
 738                                static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 739 mday            1.18           
 740 mday            1.63                 
 741 mday            1.18           AsyncLegacyOperationResult *async_result = 
 742                          	 new AsyncLegacyOperationResult( 
 743                          	    async->getKey(),
 744                          	    async->getRouting(),
 745                          	    op,
 746                          	    response);
 747                                _completeAsyncResponse(async,
 748                          			     async_result,
 749                          			     ASYNC_OPSTATE_COMPLETE, 
 750                          			     0);
 751 kumpf           1.37           PEG_METHOD_EXIT();
 752 mday            1.18           return true;
 753 mday            1.17        }
 754 mday            1.18        
 755                             // ensure that the destination queue is in response->dest
 756 kumpf           1.37        PEG_METHOD_EXIT();
 757 mday            1.24        return SendForget(response);
 758 mday            1.18        
 759 mday            1.17     }
 760                          
 761 mday            1.18     void MessageQueueService::_make_response(Message *req, Uint32 code)
 762 mday            1.1      {
 763 mday            1.19        cimom::_make_response(req, code);
 764 mday            1.1      }
 765                          
 766                          
 767 mday            1.5      void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
 768                          						AsyncReply *reply, 
 769                          						Uint32 state, 
 770                          						Uint32 flag)
 771                          {
 772 kumpf           1.37        PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 773                                              "MessageQueueService::_completeAsyncResponse");
 774                          
 775 mday            1.19        cimom::_completeAsyncResponse(request, reply, state, flag);
 776 kumpf           1.37     
 777                             PEG_METHOD_EXIT();
 778 mday            1.5      }
 779                          
 780                          
 781 mday            1.32     void MessageQueueService::_complete_op_node(AsyncOpNode *op,
 782                          					    Uint32 state, 
 783                          					    Uint32 flag, 
 784                          					    Uint32 code)
 785                          {
 786                             cimom::_complete_op_node(op, state, flag, code);
 787                          }
 788                          
 789 mday            1.5      
 790                          Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 791                          {
 792 mday            1.8         if (_incoming_queue_shutdown.value() > 0 )
 793 denise.eckstein 1.88.2.6       return true;
 794 mday            1.8         
 795 mday            1.20     // ATTN optimization remove the message checking altogether in the base 
 796                          // << Mon Feb 18 14:02:20 2002 mdd >>
 797 mday            1.6         op->lock();
 798                             Message *rq = op->_request.next(0);
 799 mday            1.20        Message *rp = op->_response.next(0);
 800 mday            1.6         op->unlock();
 801                             
 802 mday            1.22        if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
 803                          	_die.value() == 0  )
 804 mday            1.5         {
 805 mday            1.6            _incoming.insert_last_wait(op);
 806 kumpf           1.88.2.7       _polling_sem->signal();
 807 mday            1.5            return true;
 808                             }
 809                             return false;
 810                          }
 811                          
 812                          Boolean MessageQueueService::messageOK(const Message *msg)
 813                          {
 814 mday            1.8         if (_incoming_queue_shutdown.value() > 0 )
 815                                return false;
 816 mday            1.18        return true;
 817                          }
 818                          
 819 mday            1.1      void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 820                          {
 821                             // default action is to echo a heartbeat response 
 822                             
 823                             AsyncReply *reply = 
 824                                new AsyncReply(async_messages::HEARTBEAT,
 825                          		     req->getKey(),
 826                          		     req->getRouting(),
 827                          		     0,
 828                          		     req->op, 
 829                          		     async_results::OK, 
 830                          		     req->resp,
 831                          		     false);
 832 mday            1.4         _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 833 mday            1.1      }
 834                          
 835                          
 836                          void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 837                          { 
 838                             ;
 839                          }
 840                                
 841                          void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 842                          {
 843 mday            1.8         
 844                             switch( req->ctl )
 845                             {
 846                                case AsyncIoctl::IO_CLOSE:
 847                                {
 848 mday            1.76     
 849 mday            1.34     	 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 850 kumpf           1.81     
 851                          #ifdef MESSAGEQUEUESERVICE_DEBUG
 852 mday            1.79     	 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 853 kumpf           1.81     #endif
 854 mday            1.8      	 
 855 mday            1.76     	 // respond to this message. this is fire and forget, so we don't need to delete anything. 
 856                          	 // this takes care of two problems that were being found 
 857                          	 // << Thu Oct  9 10:52:48 2003 mdd >>
 858                          	  _make_response(req, async_results::OK);
 859 mday            1.8      	 // ensure we do not accept any further messages
 860                          
 861                          	 // ensure we don't recurse on IO_CLOSE
 862                          	 if( _incoming_queue_shutdown.value() > 0 )
 863                          	    break;
 864                          	 
 865                          	 // set the closing flag 
 866                          	 service->_incoming_queue_shutdown = 1;
 867                          	 // empty out the queue
 868                          	 while( 1 )
 869                          	 {
 870                          	    AsyncOpNode *operation;
 871                          	    try 
 872                          	    {
 873                          	       operation = service->_incoming.remove_first();
 874                          	    }
 875                          	    catch(IPCException & )
 876                          	    {
 877                          	       break;
 878                          	    }
 879                          	    if( operation )
 880 mday            1.8      	    {
 881 mday            1.34     	       operation->_service_ptr = service;
 882                          	       service->_handle_incoming_operation(operation);
 883 mday            1.8      	    }
 884                          	    else
 885                          	       break;
 886                          	 } // message processing loop
 887 mday            1.76     	 
 888 mday            1.8      	 // shutdown the AsyncDQueue
 889                          	 service->_incoming.shutdown_queue();
 890                          	 return;
 891                                }
 892                          
 893                                default:
 894                          	 _make_response(req, async_results::CIM_NAK);
 895                             }
 896 mday            1.1      }
 897 mday            1.8      
 898 mday            1.1      void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 899                          {
 900 mday            1.79     
 901 kumpf           1.81     #ifdef MESSAGEQUEUESERVICE_DEBUG
 902 mday            1.79        PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 903 kumpf           1.81     #endif
 904 mday            1.79        
 905 mday            1.10        // clear the stoped bit and update
 906 mday            1.13        _capabilities &= (~(module_capabilities::stopped));
 907 mday            1.10        _make_response(req, async_results::OK);
 908                             // now tell the meta dispatcher we are stopped 
 909                             update_service(_capabilities, _mask);
 910                          
 911 mday            1.1      }
 912                          void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 913                          {
 914 kumpf           1.81     #ifdef MESSAGEQUEUESERVICE_DEBUG
 915 mday            1.79        PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 916 kumpf           1.81     #endif
 917 mday            1.10        // set the stopeed bit and update
 918                             _capabilities |= module_capabilities::stopped;
 919                             _make_response(req, async_results::CIM_STOPPED);
 920                             // now tell the meta dispatcher we are stopped 
 921                             update_service(_capabilities, _mask);
 922                             
 923 mday            1.1      }
 924                          void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 925                          {
 926 mday            1.10        // set the paused bit and update
 927 mday            1.13        _capabilities |= module_capabilities::paused;
 928 mday            1.11        update_service(_capabilities, _mask);
 929 mday            1.10        _make_response(req, async_results::CIM_PAUSED);
 930                             // now tell the meta dispatcher we are stopped 
 931 mday            1.1      }
 932                          void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 933                          {
 934 mday            1.10        // clear the paused  bit and update
 935 mday            1.13        _capabilities &= (~(module_capabilities::paused));
 936 mday            1.11        update_service(_capabilities, _mask);
 937 mday            1.10        _make_response(req, async_results::OK);
 938                             // now tell the meta dispatcher we are stopped 
 939 mday            1.1      }
 940                                
 941                          void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 942                          {
 943                             _make_response(req, async_results::CIM_NAK);
 944                          }
 945                          
 946                          void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 947                          {
 948 mday            1.14        ;
 949                          }
 950                          
 951 mday            1.10     
 952 mday            1.14     void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 953                          {
 954                             // remove the legacy message from the request and enqueue it to its destination
 955                             Uint32 result = async_results::CIM_NAK;
 956                             
 957 mday            1.25        Message *legacy = req->_act;
 958 mday            1.14        if ( legacy != 0 )
 959                             {
 960 mday            1.25           MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 961 mday            1.14           if( queue != 0 )
 962                                {
 963 mday            1.25     	 if(queue->isAsync() == true )
 964                          	 {
 965                          	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 966                          	 }
 967                          	 else 
 968                          	 {
 969                          	    // Enqueue the response:
 970                          	    queue->enqueue(req->get_action());
 971                          	 }
 972                          	 
 973 mday            1.14     	 result = async_results::OK;
 974                                }
 975                             }
 976                             _make_response(req, result);
 977                          }
 978                          
 979                          void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 980                          {
 981                             ;
 982 mday            1.1      }
 983                          
 984                          AsyncOpNode *MessageQueueService::get_op(void)
 985                          {
 986 mday            1.4         AsyncOpNode *op = new AsyncOpNode();
 987 mday            1.1         
 988 mday            1.9         op->_state = ASYNC_OPSTATE_UNKNOWN;
 989                             op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 990 mday            1.4         
 991 mday            1.1         return op;
 992                          }
 993                          
 994                          void MessageQueueService::return_op(AsyncOpNode *op)
 995                          {
 996                             PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 997 mday            1.4         delete op;
 998 mday            1.1      }
 999                          
1000 mday            1.18     
1001 mday            1.29     Boolean MessageQueueService::ForwardOp(AsyncOpNode *op, 
1002                          				       Uint32 destination)
1003                          {
1004                             PEGASUS_ASSERT(op != 0 );
1005 mday            1.30        op->lock();
1006                             op->_op_dest = MessageQueue::lookup(destination);
1007 mday            1.29        op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
1008                             op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
1009 mday            1.30        op->unlock();
1010                             if ( op->_op_dest == 0 )
1011                                return false;
1012                                
1013 mday            1.29        return  _meta_dispatcher->route_async(op);
1014                          }
1015                          
1016 mday            1.39      
1017 mday            1.21     Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
1018                          				       Uint32 destination,
1019 mday            1.18     				       void (*callback)(AsyncOpNode *, 
1020 mday            1.32     							MessageQueue *,
1021 mday            1.30     							void *),
1022                          				       MessageQueue *callback_response_q,
1023                          				       void *callback_ptr)
1024 mday            1.20     { 
1025 mday            1.21        PEGASUS_ASSERT(op != 0 && callback != 0 );
1026 mday            1.18        
1027 mday            1.21        // get the queue handle for the destination
1028                          
1029 mday            1.30        op->lock();
1030 mday            1.32        op->_op_dest = MessageQueue::lookup(destination); // destination of this message
1031 mday            1.22        op->_flags |= ASYNC_OPFLAGS_CALLBACK;
1032                             op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
1033                             op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1034 mday            1.30        // initialize the callback data
1035 mday            1.32        op->_async_callback = callback;   // callback function to be executed by recpt. of response
1036                             op->_callback_node = op;          // the op node
1037                             op->_callback_response_q = callback_response_q;  // the queue that will receive the response
1038                             op->_callback_ptr = callback_ptr;   // user data for callback
1039                             op->_callback_request_q = this;     // I am the originator of this request
1040 mday            1.30        
1041                             op->unlock();
1042                             if(op->_op_dest == 0) 
1043                                return false;
1044 mday            1.21        
1045                             return  _meta_dispatcher->route_async(op);
1046 mday            1.18     }
1047                          
1048                          
1049 mday            1.39     Boolean MessageQueueService::SendAsync(Message *msg, 
1050                          				       Uint32 destination,
1051                          				       void (*callback)(Message *response, 
1052                          							void *handle, 
1053                          							void *parameter),
1054                          				       void *handle, 
1055                          				       void *parameter)
1056                          {
1057                             if(msg == NULL)
1058                                return false;
1059                             if(callback == NULL)
1060                                return SendForget(msg);
1061                             AsyncOpNode *op = get_op();
1062 mday            1.43        msg->dest = destination;
1063 mday            1.39        if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
1064                             {
1065                                op->release();
1066                                return_op(op);
1067                                return false;
1068                             }
1069                             op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
1070                             op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
1071                             op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1072                             op->__async_callback = callback;
1073                             op->_callback_node = op;
1074                             op->_callback_handle = handle;
1075                             op->_callback_parameter = parameter;
1076                             op->_callback_response_q = this;
1077                             
1078                          
1079                             if( ! (msg->getMask() & message_mask::ha_async) )
1080                             {
1081                                AsyncLegacyOperationStart *wrapper = 
1082                          	 new AsyncLegacyOperationStart(get_next_xid(),
1083                          				       op, 
1084 mday            1.39     				       destination, 
1085                          				       msg, 
1086                          				       destination);
1087                             }
1088                             else 
1089                             {
1090                                op->_request.insert_first(msg);
1091                                (static_cast<AsyncMessage *>(msg))->op = op;
1092                             }
1093                             
1094                             _callback.insert_last(op);
1095                             return _meta_dispatcher->route_async(op);
1096                          }
1097                          
1098                          
1099 mday            1.18     Boolean MessageQueueService::SendForget(Message *msg)
1100                          {
1101                          
1102 mday            1.24        
1103 mday            1.18        AsyncOpNode *op = 0;
1104 mday            1.22        Uint32 mask = msg->getMask();
1105                             
1106                             if (mask & message_mask::ha_async)
1107 mday            1.18        {
1108                                op = (static_cast<AsyncMessage *>(msg))->op ;
1109                             }
1110 mday            1.22     
1111 mday            1.18        if( op == 0 )
1112 mday            1.20        {
1113 mday            1.18           op = get_op();
1114 mday            1.20           op->_request.insert_first(msg);
1115 mday            1.22           if (mask & message_mask::ha_async)
1116                          	 (static_cast<AsyncMessage *>(msg))->op = op;
1117 mday            1.20        }
1118 mday            1.30        op->_op_dest = MessageQueue::lookup(msg->dest);
1119 mday            1.22        op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
1120 mday            1.41        op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK 
1121                          		   | ASYNC_OPFLAGS_SIMPLE_STATUS);
1122 mday            1.22        op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1123 mday            1.30        if ( op->_op_dest == 0 )
1124 mday            1.39        {
1125                                op->release();
1126                                return_op(op);
1127 mday            1.21           return false;
1128 mday            1.39        }
1129 mday            1.24        
1130 mday            1.18        // now see if the meta dispatcher will take it
1131 mday            1.30        return  _meta_dispatcher->route_async(op);
1132 mday            1.18     }
1133 mday            1.2      
1134 mday            1.1      
1135 mday            1.4      AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
1136 mday            1.1      {
1137 mday            1.4         if ( request == 0 )
1138                                return 0 ;
1139 mday            1.5      
1140                             Boolean destroy_op = false;
1141                             
1142 s.hills         1.80        if (request->op == 0)
1143 mday            1.5         {
1144                                request->op = get_op();
1145 mday            1.7            request->op->_request.insert_first(request);
1146 mday            1.5            destroy_op = true;
1147                             }
1148 mday            1.4         
1149 mday            1.33        request->block = false;
1150 mday            1.35        request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1151 mday            1.33        SendAsync(request->op, 
1152                          	     request->dest,
1153 mday            1.36      	     _sendwait_callback,
1154 mday            1.33     	     this,
1155                          	     (void *)0);
1156 mday            1.4         
1157 mday            1.33        request->op->_client_sem.wait();
1158 mday            1.6         request->op->lock();
1159                             AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
1160                             rpl->op = 0;
1161                             request->op->unlock();
1162                             
1163 mday            1.5         if( destroy_op == true)
1164                             {
1165 mday            1.6            request->op->lock();
1166                                request->op->_request.remove(request);
1167                                request->op->_state |= ASYNC_OPSTATE_RELEASED;
1168                                request->op->unlock();
1169                                return_op(request->op);
1170 mday            1.7            request->op = 0;
1171 mday            1.5         }
1172                             return rpl;
1173 mday            1.1      }
1174                          
1175                          
1176                          Boolean MessageQueueService::register_service(String name, 
1177                          					      Uint32 capabilities, 
1178                          					      Uint32 mask)
1179                          
1180                          {
1181                             RegisterCimService *msg = new RegisterCimService(get_next_xid(),
1182 mday            1.5      						    0, 
1183 mday            1.1      						    true, 
1184                          						    name, 
1185                          						    capabilities, 
1186                          						    mask,
1187                          						    _queueId);
1188 mday            1.44        msg->dest = CIMOM_Q_ID;
1189                             
1190 mday            1.1         Boolean registered = false;
1191 mday            1.7         AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
1192 mday            1.1         
1193 mday            1.2         if ( reply != 0 )
1194 mday            1.1         {
1195                                if(reply->getMask() & message_mask:: ha_async)
1196                                {
1197                          	 if(reply->getMask() & message_mask::ha_reply)
1198                          	 {
1199 mday            1.15     	    if(reply->result == async_results::OK || 
1200                          	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
1201 mday            1.1      	       registered = true;
1202                          	 }
1203                                }
1204                                
1205 mday            1.7            delete reply; 
1206 mday            1.1         }
1207 mday            1.5         delete msg;
1208 mday            1.1         return registered;
1209                          }
1210                          
1211                          Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1212                          {
1213                             
1214                             
1215                             UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
1216 mday            1.5      						0, 
1217 mday            1.1      						true, 
1218                          						_queueId,
1219                          						_capabilities, 
1220                          						_mask);
1221                             Boolean registered = false;
1222 mday            1.2      
1223                             AsyncMessage *reply = SendWait(msg);
1224                             if (reply)
1225 mday            1.1         {
1226                                if(reply->getMask() & message_mask:: ha_async)
1227                                {
1228                          	 if(reply->getMask() & message_mask::ha_reply)
1229                          	 {
1230 mday            1.2      	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
1231 mday            1.1      	       registered = true;
1232                          	 }
1233                                }
1234                                delete reply;
1235                             }
1236 mday            1.5         delete msg;
1237 mday            1.1         return registered;
1238                          }
1239                          
1240                          
1241                          Boolean MessageQueueService::deregister_service(void)
1242                          {
1243 mday            1.3      
1244 mday            1.5         _meta_dispatcher->deregister_module(_queueId);
1245                             return true;
1246 mday            1.1      }
1247                          
1248                          
1249                          void MessageQueueService::find_services(String name, 
1250                          					Uint32 capabilities, 
1251                          					Uint32 mask, 
1252                          					Array<Uint32> *results)
1253                          {
1254                             
1255                             if( results == 0 )
1256                                throw NullPointer();
1257 mday            1.5          
1258 mday            1.1         results->clear();
1259                             
1260                             FindServiceQueue *req = 
1261                                new FindServiceQueue(get_next_xid(), 
1262 mday            1.5      			   0, 
1263 mday            1.1      			   _queueId, 
1264                          			   true, 
1265                          			   name, 
1266                          			   capabilities, 
1267                          			   mask);
1268 mday            1.44        
1269                             req->dest = CIMOM_Q_ID;
1270 mday            1.1         
1271 mday            1.2         AsyncMessage *reply = SendWait(req); 
1272                             if(reply)
1273 mday            1.1         {
1274                                if( reply->getMask() & message_mask::ha_async)
1275                                {
1276                          	 if(reply->getMask() & message_mask::ha_reply)
1277                          	 {
1278                          	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1279                          	    {
1280                          	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1281                          		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1282                          	    }
1283                          	 }
1284                                }
1285                                delete reply;
1286                             }
1287 mday            1.5         delete req;
1288 mday            1.1         return ;
1289                          }
1290                          
1291                          void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1292                          {
1293                             if(result == 0)
1294                                throw NullPointer();
1295                             
1296                             EnumerateService *req 
1297                                = new EnumerateService(get_next_xid(),
1298 mday            1.5      			     0, 
1299 mday            1.1      			     _queueId, 
1300                          			     true, 
1301                          			     queue);
1302                             
1303 mday            1.2         AsyncMessage *reply = SendWait(req);
1304 mday            1.1         
1305 mday            1.2         if (reply)
1306 mday            1.1         {
1307                                Boolean found = false;
1308                                
1309                                if( reply->getMask() & message_mask::ha_async)
1310                                {
1311                          	 if(reply->getMask() & message_mask::ha_reply)
1312                          	 {
1313                          	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1314                          	    {
1315                          	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1316                          	       {
1317                          		  if( found == false)
1318                          		  {
1319                          		     found = true;
1320                          		     
1321                          		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1322                          		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1323                          		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1324                          		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1325                          		  }
1326                          	       }
1327 mday            1.1      	    }
1328                          	 }
1329                                }
1330                                delete reply;
1331                             }
1332 mday            1.5         delete req;
1333                             
1334 mday            1.1         return;
1335                          }
1336                          
1337                          Uint32 MessageQueueService::get_next_xid(void)
1338                          {
1339 mday            1.78        static Mutex _monitor;
1340                             Uint32 value;
1341 a.arora         1.87        AutoMutex autoMut(_monitor);   
1342 mday            1.1         _xid++;
1343 mday            1.78        value =  _xid.value();
1344                             return value;
1345                             
1346 mday            1.1      }
1347                          
1348                          PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2