(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.91 //%2005////////////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 karl  1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82 // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 mday  1.1  //
  12            // Permission is hereby granted, free of charge, to any person obtaining a copy
  13            // of this software and associated documentation files (the "Software"), to
  14            // deal in the Software without restriction, including without limitation the
  15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  16            // sell copies of the Software, and to permit persons to whom the Software is
  17            // furnished to do so, subject to the following conditions:
  18 karl  1.82 // 
  19 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  20            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  21            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  22            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  23            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  25            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  26            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27            //
  28            //==============================================================================
  29            //
  30            // Author: Mike Day (mdday@us.ibm.com)
  31            //
  32            // Modified By:
  33 a.arora 1.93 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
  34 joyce.j 1.101 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
  35 jim.wunderlich 1.114 //              Jim Wunderlich (Jim_Wunderlich@prodigy.net)
  36 mday           1.1   //
  37                      //%/////////////////////////////////////////////////////////////////////////////
  38                      
  39 jim.wunderlich 1.111 // #include <iostream.h>
  40 mday           1.1   #include "MessageQueueService.h"
  41 mday           1.22  #include <Pegasus/Common/Tracer.h>
  42 humberto       1.71  #include <Pegasus/Common/MessageLoader.h> //l10n
  43 mday           1.1   
  44                      PEGASUS_NAMESPACE_BEGIN
  45                      
  46 mday           1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  47                      AtomicInt MessageQueueService::_service_count = 0;
  48                      AtomicInt MessageQueueService::_xid(1);
  49 mday           1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  50 mday           1.15  
  51 kumpf          1.104 static struct timeval deallocateWait = {300, 0};
  52 mday           1.53  
  53 mday           1.55  ThreadPool *MessageQueueService::_thread_pool = 0;
  54 mday           1.53  
  55                      DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  56                      
  57 kumpf          1.62  Thread* MessageQueueService::_polling_thread = 0;
  58 mday           1.61  
  59 kumpf          1.104 ThreadPool *MessageQueueService::get_thread_pool()
  60 mday           1.69  {
  61                         return _thread_pool;
  62                      }
  63 jim.wunderlich 1.110 //
  64                      // MAX_THREADS_PER_SVC_QUEUE_LIMIT
  65                      //
  66                      // JR Wunderlich Jun 6, 2005
  67                      //
  68                      
  69                      #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000 
  70 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  71 jim.wunderlich 1.110 
  72                      Uint32 max_threads_per_svc_queue;
  73 mday           1.69  
  74 kumpf          1.104 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
  75                      MessageQueueService::kill_idle_threads(void *parm)
  76 mday           1.53  {
  77 mday           1.69  
  78                         static struct timeval now, last = {0,0};
  79 mday           1.53     gettimeofday(&now, NULL);
  80 mday           1.56     int dead_threads = 0;
  81 kumpf          1.104 
  82                         if (now.tv_sec - last.tv_sec > 120)
  83 mday           1.53     {
  84                            gettimeofday(&last, NULL);
  85 kumpf          1.104       try
  86 mday           1.56        {
  87 kumpf          1.104          dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
  88 mday           1.56        }
  89 mday           1.69        catch(...)
  90 mday           1.56        {
  91                      
  92                            }
  93 mday           1.53     }
  94 jenny.yu       1.92  
  95 w.otsuka       1.94  #ifdef PEGASUS_POINTER_64BIT
  96 jenny.yu       1.92     return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
  97                      #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
  98                         return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
  99                      #else
 100                         return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
 101                      #endif
 102 mday           1.53  }
 103                      
 104                      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 105                      {
 106                         Thread *myself = reinterpret_cast<Thread *>(parm);
 107                         DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
 108 kumpf          1.104    while (_stop_polling.value()  == 0)
 109 mday           1.53     {
 110 baggio.br      1.100       _polling_sem.wait();
 111                      
 112 kumpf          1.104       if (_stop_polling.value() != 0)
 113 mday           1.61        {
 114 kumpf          1.62           break;
 115 mday           1.61        }
 116 kumpf          1.104 
 117 denise.eckstein 1.116       // The polling_routine thread must hold the lock on the
 118                             // _polling_thread list while processing incoming messages.
 119                             // This lock is used to give this thread ownership of
 120                             // services on the _polling_routine list.
 121                       
 122                             // This is necessary to avoid confict with other threads
 123                             // processing the _polling_list
 124                             // (e.g., MessageQueueServer::~MessageQueueService).
 125                       
 126 mday            1.53        list->lock();
 127                             MessageQueueService *service = list->next(0);
 128 denise.eckstein 1.116       ThreadStatus rtn = PEGASUS_THREAD_OK;
 129                             while (service != NULL)
 130                             {
 131                                 if ((service->_incoming.count() > 0) &&
 132                                     (service->_die.value() == 0) &&
 133                                     (service->_threads < max_threads_per_svc_queue))
 134                                 {
 135                                    // The _threads count is used to track the 
 136                                    // number of active threads that have been allocated
 137                                    // to process messages for this service.
 138                       
 139                                    // The _threads count MUST be incremented while
 140                                    // the polling_routine owns the _polling_thread
 141                                    // lock and has ownership of the service object.
 142                       
 143                                    service->_threads++;
 144                                    try
 145                                    {
 146                                        rtn = _thread_pool->allocate_and_awaken(
 147                                             service, _req_proc, &_polling_sem);
 148                                    }
 149 denise.eckstein 1.116              catch (...)
 150                                    {
 151                                        service->_threads--;
 152                       
 153                                        // allocate_and_awaken should never generate an exception.
 154                                        PEGASUS_ASSERT(0);
 155                                    }
 156                                    // if no more threads available, break from processing loop
 157                                    if (rtn != PEGASUS_THREAD_OK )
 158                                    {
 159                                        service->_threads--;
 160                                        Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 161                                           "Not enough threads to process this request. Skipping.");
 162                       
 163                                        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 164                                           "Could not allocate thread for %s. " \
 165                                           "Queue has %d messages waiting and %d threads servicing." \
 166                                           "Skipping the service for right now. ",
 167                                           service->getQueueName(),
 168                                           service->_incoming.count(),
 169                                           service->_threads.value());
 170 denise.eckstein 1.116 
 171                                        pegasus_yield();
 172                                        service = NULL;
 173                                     } 
 174                                 }
 175                                 if (service != NULL) 
 176                                 {
 177                                    service = list->next(service);
 178                                 }
 179                             }
 180 mday            1.53        list->unlock();
 181 jim.wunderlich  1.110 
 182 kumpf           1.104       if (_check_idle_flag.value() != 0)
 183 mday            1.69        {
 184 kumpf           1.104          _check_idle_flag = 0;
 185 denise.eckstein 1.116          // try to do idle thread clean up processing when system is not busy
 186                                // if system is busy there may not be a thread available to allocate 
 187                                // so nothing will be done and that is OK. 
 188 kumpf           1.84  
 189 konrad.r        1.113          if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)
 190 denise.eckstein 1.116          {
 191                                       Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 192                                               "Not enough threads to kill idle threads. What an irony.");
 193 konrad.r        1.113 
 194 denise.eckstein 1.116                 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 195                                               "Could not allocate thread to kill idle threads." \
 196                                               "Skipping. ");
 197                                }
 198                       
 199                                
 200 mday            1.69        }
 201 mday            1.53     }
 202                          myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 203                          return(0);
 204                       }
 205                       
 206                       
 207                       Semaphore MessageQueueService::_polling_sem(0);
 208                       AtomicInt MessageQueueService::_stop_polling(0);
 209 mday            1.69  AtomicInt MessageQueueService::_check_idle_flag(0);
 210 mday            1.53  
 211 mday            1.15  
 212 kumpf           1.104 MessageQueueService::MessageQueueService(
 213                          const char *name,
 214                          Uint32 queueID,
 215                          Uint32 capabilities,
 216                          Uint32 mask)
 217 mday            1.15     : Base(name, true,  queueID),
 218 mday            1.1        _mask(mask),
 219 mday            1.4        _die(0),
 220 denise.eckstein 1.116      _threads(0),
 221 mday            1.41       _incoming(true, 0),
 222 konrad.r        1.106      _incoming_queue_shutdown(0)
 223 kumpf           1.104 {
 224 jim.wunderlich  1.110 
 225 kumpf           1.104    _capabilities = (capabilities | module_capabilities::async);
 226 mday            1.53  
 227 mday            1.1      _default_op_timeout.tv_sec = 30;
 228                          _default_op_timeout.tv_usec = 100;
 229 mday            1.15  
 230 jim.wunderlich  1.110    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 231                       
 232                          // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT
 233                          // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
 234                       
 235                          if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)
 236                            {
 237                              max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 238                            }
 239                       
 240                          // if requested threads eq 0 (unlimited) 
 241                          // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
 242                       
 243                          if (max_threads_per_svc_queue == 0)
 244                            {
 245 jim.wunderlich  1.114        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_DEFAULT;
 246 jim.wunderlich  1.110      }
 247                       
 248                          // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;
 249                          // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;
 250                          
 251                       
 252 a.arora         1.87     AutoMutex autoMut(_meta_dispatcher_mutex);
 253 kumpf           1.104 
 254                          if (_meta_dispatcher == 0)
 255 mday            1.15     {
 256 joyce.j         1.101       _stop_polling = 0;
 257 kumpf           1.104       PEGASUS_ASSERT(_service_count.value() == 0);
 258 mday            1.15        _meta_dispatcher = new cimom();
 259 kumpf           1.104       if (_meta_dispatcher == NULL)
 260 mday            1.15        {
 261 a.arora         1.87           throw NullPointer();
 262 mday            1.15        }
 263 jim.wunderlich  1.110       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 264                             //   minimum_cnt, maximum_cnt, deallocateWait); 
 265                             //
 266 kumpf           1.99        _thread_pool =
 267 kumpf           1.104           new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
 268 mday            1.15     }
 269                          _service_count++;
 270                       
 271 kumpf           1.104    if (false == register_service(name, _capabilities, _mask))
 272 mday            1.15     {
 273 humberto        1.71        //l10n
 274                             //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 275                             MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 276 kumpf           1.104          "MessageQueueService Base Unable to register with  Meta Dispatcher");
 277                       
 278 humberto        1.71        throw BindFailedException(parms);
 279 mday            1.15     }
 280 kumpf           1.104 
 281 mday            1.53     _polling_list.insert_last(this);
 282 kumpf           1.104 
 283 mday            1.1   }
 284                       
 285 mday            1.4   
 286 kumpf           1.104 MessageQueueService::~MessageQueueService()
 287 mday            1.1   {
 288                          _die = 1;
 289 mday            1.76  
 290 denise.eckstein 1.116    // The polling_routine locks the _polling_list while
 291                          // processing the incoming messages for services on the 
 292                          // list.  Deleting the service from the _polling_list
 293                          // prior to processing, avoids synchronization issues
 294                          // with the _polling_routine.
 295                       
 296                          _polling_list.remove(this);
 297                       
 298                          // ATTN: The code for closing the _incoming queue
 299                          // is not working correctly. In OpenPegasus 2.5,
 300                          // execution of the following code is very timing
 301                          // dependent. This needs to be fix.
 302                          // See Bug 4079 for details. 
 303 kumpf           1.104    if (_incoming_queue_shutdown.value() == 0)
 304 mday            1.76     {
 305 denise.eckstein 1.116        _shutdown_incoming_queue();
 306                          }
 307 konrad.r        1.107 
 308 denise.eckstein 1.116    // Wait until all threads processing the messages
 309                          // for this service have completed.
 310                       
 311                          while (_threads.value() > 0)
 312                          {
 313                             pegasus_yield();
 314 mday            1.76     }
 315 kumpf           1.104 
 316 mday            1.15     {
 317 a.arora         1.87       AutoMutex autoMut(_meta_dispatcher_mutex);
 318                            _service_count--;
 319 kumpf           1.104      if (_service_count.value() == 0)
 320 a.arora         1.87       {
 321 mday            1.76  
 322 mday            1.53        _stop_polling++;
 323                             _polling_sem.signal();
 324 kumpf           1.104       if (_polling_thread) {
 325                                 _polling_thread->join();
 326                                 delete _polling_thread;
 327                                 _polling_thread = 0;
 328                             }
 329 mday            1.15        _meta_dispatcher->_shutdown_routed_queue();
 330                             delete _meta_dispatcher;
 331 kumpf           1.45        _meta_dispatcher = 0;
 332 mday            1.76  
 333 kumpf           1.65        delete _thread_pool;
 334                             _thread_pool = 0;
 335 a.arora         1.87       }
 336                          } // mutex unlocks here
 337 kumpf           1.104    // Clean up in case there are extra stuff on the queue.
 338 konrad.r        1.72    while (_incoming.count())
 339                         {
 340 konrad.r        1.109     try { 
 341                             delete _incoming.remove_first();
 342                           } catch (const ListClosed &e)
 343                           {
 344                             // If the list is closed, there is nothing we can do. 
 345                             break;
 346                           }
 347 konrad.r        1.72    }
 348 kumpf           1.104 }
 349 mday            1.1   
 350 kumpf           1.104 void MessageQueueService::_shutdown_incoming_queue()
 351 mday            1.7   {
 352 kumpf           1.104    if (_incoming_queue_shutdown.value() > 0)
 353                             return;
 354                       
 355                          AsyncIoctl *msg = new AsyncIoctl(
 356                             get_next_xid(),
 357                             0,
 358                             _queueId,
 359                             _queueId,
 360                             true,
 361                             AsyncIoctl::IO_CLOSE,
 362                             0,
 363                             0);
 364 mday            1.9   
 365 mday            1.8      msg->op = get_op();
 366 mday            1.41     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 367 kumpf           1.104    msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 368                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
 369 mday            1.32     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 370 mday            1.41  
 371 mday            1.32     msg->op->_op_dest = this;
 372 mday            1.8      msg->op->_request.insert_first(msg);
 373 konrad.r        1.108    try {
 374                            _incoming.insert_last_wait(msg->op);
 375                            _polling_sem.signal();
 376                          } catch (const ListClosed &) 
 377                          { 
 378 denise.eckstein 1.116         // This means the queue has already been shut-down (happens  when there
 379 konrad.r        1.108     // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 380                           // processed.
 381                            delete msg; 
 382                          }
 383 konrad.r        1.112    catch (const Permission &)
 384                          {
 385                            delete msg;
 386                          }
 387 mday            1.7   }
 388                       
 389 mday            1.22  
 390                       
 391 kumpf           1.85  void MessageQueueService::enqueue(Message *msg)
 392 mday            1.22  {
 393 kumpf           1.28     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 394                       
 395 mday            1.22     Base::enqueue(msg);
 396 kumpf           1.28  
 397                          PEG_METHOD_EXIT();
 398 mday            1.22  }
 399                       
 400                       
 401 kumpf           1.103 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 402                           void * parm)
 403                       {
 404 konrad.r        1.107     MessageQueueService* service =
 405                                   reinterpret_cast<MessageQueueService*>(parm);
 406                           PEGASUS_ASSERT(service != 0);
 407 kumpf           1.103     try
 408                           {
 409                       
 410                               if (service->_die.value() != 0)
 411                               {
 412 denise.eckstein 1.116             service->_threads--;
 413 kumpf           1.103             return (0);
 414                               }
 415                               // pull messages off the incoming queue and dispatch them. then
 416                               // check pending messages that are non-blocking
 417                               AsyncOpNode *operation = 0;
 418                       
 419                               // many operations may have been queued.
 420                               do
 421                               {
 422                                   try
 423                                   {
 424                                       operation = service->_incoming.remove_first();
 425                                   }
 426                                   catch (ListClosed &)
 427                                   {
 428 kumpf           1.104                 // ATTN: This appears to be a common loop exit path.
 429                                       //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 430                                       //    "Caught ListClosed exception.  Exiting _req_proc.");
 431 kumpf           1.103                 break;
 432                                   }
 433                       
 434                                   if (operation)
 435                                   {
 436                                      operation->_service_ptr = service;
 437                                      service->_handle_incoming_operation(operation);
 438                                   }
 439                               } while (operation);
 440                           }
 441                           catch (const Exception& e)
 442                           {
 443                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 444                                   String("Caught exception: \"") + e.getMessage() +
 445                                       "\".  Exiting _req_proc.");
 446                           }
 447                           catch (...)
 448                           {
 449                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 450                                   "Caught unrecognized exception.  Exiting _req_proc.");
 451                           }
 452 konrad.r        1.107     service->_threads--;
 453 kumpf           1.103     return(0);
 454 mday            1.1   }
 455                       
 456 mday            1.43  
 457 kumpf           1.104 void MessageQueueService::_sendwait_callback(
 458                           AsyncOpNode *op,
 459                           MessageQueue *q,
 460                           void *parm)
 461 mday            1.33  {
 462                          op->_client_sem.signal();
 463                       }
 464                       
 465 mday            1.30  
 466                       // callback function is responsible for cleaning up all resources
 467                       // including op, op->_callback_node, and op->_callback_ptr
 468 mday            1.22  void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 469                       {
 470 kumpf           1.104    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 471 mday            1.39     {
 472                       
 473                             Message *msg = op->get_request();
 474 kumpf           1.104       if (msg && (msg->getMask() & message_mask::ha_async))
 475 mday            1.39        {
 476 kumpf           1.104          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
 477                                {
 478                                   AsyncLegacyOperationStart *wrapper =
 479                                      static_cast<AsyncLegacyOperationStart *>(msg);
 480                                   msg = wrapper->get_action();
 481                                   delete wrapper;
 482                                }
 483                                else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 484                                {
 485                                   AsyncModuleOperationStart *wrapper =
 486                                      static_cast<AsyncModuleOperationStart *>(msg);
 487                                   msg = wrapper->get_action();
 488                                   delete wrapper;
 489                                }
 490                                else if (msg->getType() == async_messages::ASYNC_OP_START)
 491                                {
 492                                   AsyncOperationStart *wrapper =
 493                                      static_cast<AsyncOperationStart *>(msg);
 494                                   msg = wrapper->get_action();
 495                                   delete wrapper;
 496                                }
 497 kumpf           1.104          delete msg;
 498 mday            1.39        }
 499                       
 500                             msg = op->get_response();
 501 kumpf           1.104       if (msg && (msg->getMask() & message_mask::ha_async))
 502 mday            1.39        {
 503 kumpf           1.104          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
 504                                {
 505                                   AsyncLegacyOperationResult *wrapper =
 506                                      static_cast<AsyncLegacyOperationResult *>(msg);
 507                                   msg = wrapper->get_result();
 508                                   delete wrapper;
 509                                }
 510                                else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 511                                {
 512                                   AsyncModuleOperationResult *wrapper =
 513                                      static_cast<AsyncModuleOperationResult *>(msg);
 514                                   msg = wrapper->get_result();
 515                                   delete wrapper;
 516                                }
 517 mday            1.39        }
 518                             void (*callback)(Message *, void *, void *) = op->__async_callback;
 519                             void *handle = op->_callback_handle;
 520                             void *parm = op->_callback_parameter;
 521                             op->release();
 522                             return_op(op);
 523                             callback(msg, handle, parm);
 524                          }
 525 kumpf           1.104    else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 526 mday            1.39     {
 527 kumpf           1.104       // note that _callback_node may be different from op
 528                             // op->_callback_response_q is a "this" pointer we can use for
 529 mday            1.39        // static callback methods
 530                             op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 531                          }
 532 mday            1.22  }
 533                       
 534 mday            1.1   
 535 kumpf           1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
 536 mday            1.1   {
 537 kumpf           1.104    if (operation != 0)
 538 mday            1.5      {
 539 kumpf           1.104 
 540                       // ATTN: optimization
 541 mday            1.22  // << Tue Feb 19 14:10:38 2002 mdd >>
 542 mday            1.6         operation->lock();
 543 kumpf           1.104 
 544 mday            1.6         Message *rq = operation->_request.next(0);
 545 kumpf           1.104 
 546 mday            1.31  // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 547 kumpf           1.104 // move this to the bottom of the loop when the majority of
 548                       // messages become async messages.
 549 mday            1.31  
 550 mday            1.18        // divert legacy messages to handleEnqueue
 551 mday            1.29        if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 552 mday            1.18        {
 553 kumpf           1.104          rq = operation->_request.remove_first() ;
 554                                operation->unlock();
 555                                // delete the op node
 556                                operation->release();
 557                                return_op(operation);
 558 mday            1.24  
 559 kumpf           1.104          handleEnqueue(rq);
 560                                return;
 561 mday            1.18        }
 562                       
 563 kumpf           1.104       if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 564                                  operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 565                                 (operation->_state & ASYNC_OPSTATE_COMPLETE))
 566 mday            1.29        {
 567 kumpf           1.104          operation->unlock();
 568                                _handle_async_callback(operation);
 569 mday            1.29        }
 570 kumpf           1.104       else
 571 mday            1.29        {
 572 kumpf           1.104          PEGASUS_ASSERT(rq != 0);
 573                                operation->unlock();
 574                                _handle_async_request(static_cast<AsyncRequest *>(rq));
 575 mday            1.29        }
 576 mday            1.5      }
 577                          return;
 578 mday            1.1   }
 579                       
 580                       void MessageQueueService::_handle_async_request(AsyncRequest *req)
 581                       {
 582 kumpf           1.104    if (req != 0)
 583 mday            1.4      {
 584                             req->op->processing();
 585 kumpf           1.104 
 586 mday            1.4         Uint32 type = req->getType();
 587 kumpf           1.104       if (type == async_messages::HEARTBEAT)
 588                                handle_heartbeat_request(req);
 589 mday            1.4         else if (type == async_messages::IOCTL)
 590 kumpf           1.104          handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 591 mday            1.4         else if (type == async_messages::CIMSERVICE_START)
 592 kumpf           1.104          handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 593 mday            1.4         else if (type == async_messages::CIMSERVICE_STOP)
 594 kumpf           1.104          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 595 mday            1.4         else if (type == async_messages::CIMSERVICE_PAUSE)
 596 kumpf           1.104          handle_CimServicePause(static_cast<CimServicePause *>(req));
 597 mday            1.4         else if (type == async_messages::CIMSERVICE_RESUME)
 598 kumpf           1.104          handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 599                             else if (type == async_messages::ASYNC_OP_START)
 600                                handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 601                             else
 602 mday            1.4         {
 603 kumpf           1.104          // we don't handle this request message
 604                                _make_response(req, async_results::CIM_NAK);
 605 mday            1.4         }
 606 mday            1.1      }
 607                       }
 608                       
 609 mday            1.17  
 610                       Boolean MessageQueueService::_enqueueResponse(
 611 kumpf           1.104    Message* request,
 612 mday            1.17     Message* response)
 613                       {
 614 w.white         1.102 
 615                         STAT_COPYDISPATCHER
 616                       
 617 kumpf           1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 618                                           "MessageQueueService::_enqueueResponse");
 619 mday            1.25  
 620 kumpf           1.104    if (request->getMask() & message_mask::ha_async)
 621 mday            1.25     {
 622 kumpf           1.104       if (response->getMask() & message_mask::ha_async)
 623 mday            1.25        {
 624 kumpf           1.104          _completeAsyncResponse(static_cast<AsyncRequest *>(request),
 625                                                       static_cast<AsyncReply *>(response),
 626                                                       ASYNC_OPSTATE_COMPLETE, 0);
 627 kumpf           1.37           PEG_METHOD_EXIT();
 628 kumpf           1.104          return true;
 629 mday            1.25        }
 630                          }
 631 kumpf           1.104 
 632                          if (request->_async != 0)
 633 mday            1.17     {
 634                             Uint32 mask = request->_async->getMask();
 635 kumpf           1.104       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));
 636                       
 637 mday            1.18        AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 638                             AsyncOpNode *op = async->op;
 639                             request->_async = 0;
 640 mday            1.63        // the legacy request is going to be deleted by its handler
 641 kumpf           1.104       // remove it from the op node
 642 mday            1.63  
 643                             static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 644 kumpf           1.104 
 645                             AsyncLegacyOperationResult *async_result =
 646                                new AsyncLegacyOperationResult(
 647                                   async->getKey(),
 648                                   async->getRouting(),
 649                                   op,
 650                                   response);
 651                             _completeAsyncResponse(
 652                                async,
 653                                async_result,
 654                                ASYNC_OPSTATE_COMPLETE,
 655                                0);
 656 kumpf           1.37        PEG_METHOD_EXIT();
 657 mday            1.18        return true;
 658 mday            1.17     }
 659 kumpf           1.104 
 660 mday            1.18     // ensure that the destination queue is in response->dest
 661 kumpf           1.37     PEG_METHOD_EXIT();
 662 mday            1.24     return SendForget(response);
 663 kumpf           1.104 
 664 mday            1.17  }
 665                       
 666 mday            1.18  void MessageQueueService::_make_response(Message *req, Uint32 code)
 667 mday            1.1   {
 668 mday            1.19     cimom::_make_response(req, code);
 669 mday            1.1   }
 670                       
 671                       
 672 kumpf           1.104 void MessageQueueService::_completeAsyncResponse(
 673                           AsyncRequest *request,
 674                           AsyncReply *reply,
 675                           Uint32 state,
 676                           Uint32 flag)
 677 mday            1.5   {
 678 kumpf           1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 679                                           "MessageQueueService::_completeAsyncResponse");
 680                       
 681 mday            1.19     cimom::_completeAsyncResponse(request, reply, state, flag);
 682 kumpf           1.37  
 683                          PEG_METHOD_EXIT();
 684 mday            1.5   }
 685                       
 686                       
 687 kumpf           1.104 void MessageQueueService::_complete_op_node(
 688                           AsyncOpNode *op,
 689                           Uint32 state,
 690                           Uint32 flag,
 691                           Uint32 code)
 692 mday            1.32  {
 693                          cimom::_complete_op_node(op, state, flag, code);
 694                       }
 695                       
 696 mday            1.5   
 697                       Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 698                       {
 699 kumpf           1.104    if (_incoming_queue_shutdown.value() > 0)
 700 mday            1.8         return false;
 701 kumpf           1.104    if (_polling_thread == NULL)
 702                          {
 703                             _polling_thread = new Thread(
 704                                 polling_routine,
 705                                 reinterpret_cast<void *>(&_polling_list),
 706                                 false);
 707 konrad.r        1.113       ThreadStatus tr = PEGASUS_THREAD_OK;
 708                             while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 709 a.arora         1.93        {
 710 denise.eckstein 1.116         if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 711 konrad.r        1.113            pegasus_yield();
 712 denise.eckstein 1.116         else
 713                                  throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
 714                                               "Could not allocate thread for the polling thread."));
 715 a.arora         1.93        }
 716 kumpf           1.104    }
 717                       // ATTN optimization remove the message checking altogether in the base
 718 mday            1.20  // << Mon Feb 18 14:02:20 2002 mdd >>
 719 mday            1.6      op->lock();
 720                          Message *rq = op->_request.next(0);
 721 mday            1.20     Message *rp = op->_response.next(0);
 722 mday            1.6      op->unlock();
 723 kumpf           1.104 
 724                          if ((rq != 0 && (true == messageOK(rq))) ||
 725                              (rp != 0 && (true == messageOK(rp))) && _die.value() == 0)
 726 mday            1.5      {
 727 mday            1.6         _incoming.insert_last_wait(op);
 728 mday            1.53        _polling_sem.signal();
 729 mday            1.5         return true;
 730                          }
 731                          return false;
 732                       }
 733                       
 734                       Boolean MessageQueueService::messageOK(const Message *msg)
 735                       {
 736 kumpf           1.104    if (_incoming_queue_shutdown.value() > 0)
 737 mday            1.8         return false;
 738 mday            1.18     return true;
 739                       }
 740                       
 741 mday            1.1   void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 742                       {
 743 kumpf           1.104    // default action is to echo a heartbeat response
 744                       
 745                          AsyncReply *reply = new AsyncReply(
 746                             async_messages::HEARTBEAT,
 747                             req->getKey(),
 748                             req->getRouting(),
 749                             0,
 750                             req->op,
 751                             async_results::OK,
 752                             req->resp,
 753                             false);
 754                          _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 755 mday            1.1   }
 756                       
 757                       
 758                       void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 759 kumpf           1.104 {
 760 mday            1.1   }
 761 kumpf           1.104 
 762 mday            1.1   void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 763                       {
 764 kumpf           1.104    switch (req->ctl)
 765 mday            1.8      {
 766                             case AsyncIoctl::IO_CLOSE:
 767                             {
 768 kumpf           1.104          MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 769 kumpf           1.81  
 770                       #ifdef MESSAGEQUEUESERVICE_DEBUG
 771 kumpf           1.104          PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 772 kumpf           1.81  #endif
 773 kumpf           1.104 
 774                                // respond to this message. this is fire and forget, so we don't need to delete anything.
 775                                // this takes care of two problems that were being found
 776                                // << Thu Oct  9 10:52:48 2003 mdd >>
 777                                 _make_response(req, async_results::OK);
 778                                // ensure we do not accept any further messages
 779                       
 780                                // ensure we don't recurse on IO_CLOSE
 781                                if (_incoming_queue_shutdown.value() > 0)
 782                                   break;
 783                       
 784                                // set the closing flag
 785                                service->_incoming_queue_shutdown = 1;
 786                                // empty out the queue
 787                                while (1)
 788                                {
 789                                   AsyncOpNode *operation;
 790                                   try
 791                                   {
 792                                      operation = service->_incoming.remove_first();
 793                                   }
 794 kumpf           1.104             catch(IPCException &)
 795                                   {
 796                                      break;
 797                                   }
 798                                   if (operation)
 799                                   {
 800                                      operation->_service_ptr = service;
 801                                      service->_handle_incoming_operation(operation);
 802                                   }
 803                                   else
 804                                      break;
 805                                } // message processing loop
 806                       
 807                                // shutdown the AsyncDQueue
 808                                service->_incoming.shutdown_queue();
 809                                return;
 810 mday            1.8         }
 811                       
 812                             default:
 813 kumpf           1.104          _make_response(req, async_results::CIM_NAK);
 814 mday            1.8      }
 815 mday            1.1   }
 816 mday            1.8   
 817 mday            1.1   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 818                       {
 819 mday            1.79  
 820 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 821 mday            1.79     PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 822 kumpf           1.81  #endif
 823 kumpf           1.104 
 824 mday            1.10     // clear the stoped bit and update
 825 mday            1.13     _capabilities &= (~(module_capabilities::stopped));
 826 mday            1.10     _make_response(req, async_results::OK);
 827 kumpf           1.104    // now tell the meta dispatcher we are stopped
 828 mday            1.10     update_service(_capabilities, _mask);
 829                       
 830 mday            1.1   }
 831                       void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 832                       {
 833 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 834 mday            1.79     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 835 kumpf           1.81  #endif
 836 mday            1.10     // set the stopeed bit and update
 837                          _capabilities |= module_capabilities::stopped;
 838                          _make_response(req, async_results::CIM_STOPPED);
 839 kumpf           1.104    // now tell the meta dispatcher we are stopped
 840 mday            1.10     update_service(_capabilities, _mask);
 841 mday            1.1   }
 842 kumpf           1.104 
 843 mday            1.1   void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 844                       {
 845 mday            1.10     // set the paused bit and update
 846 mday            1.13     _capabilities |= module_capabilities::paused;
 847 mday            1.11     update_service(_capabilities, _mask);
 848 mday            1.10     _make_response(req, async_results::CIM_PAUSED);
 849 kumpf           1.104    // now tell the meta dispatcher we are stopped
 850 mday            1.1   }
 851 kumpf           1.104 
 852 mday            1.1   void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 853                       {
 854 mday            1.10     // clear the paused  bit and update
 855 mday            1.13     _capabilities &= (~(module_capabilities::paused));
 856 mday            1.11     update_service(_capabilities, _mask);
 857 mday            1.10     _make_response(req, async_results::OK);
 858 kumpf           1.104    // now tell the meta dispatcher we are stopped
 859 mday            1.1   }
 860 kumpf           1.104 
 861 mday            1.1   void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 862                       {
 863                          _make_response(req, async_results::CIM_NAK);
 864                       }
 865                       
 866                       void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 867                       {
 868 mday            1.14     ;
 869                       }
 870                       
 871 mday            1.10  
 872 mday            1.14  void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 873                       {
 874                          // remove the legacy message from the request and enqueue it to its destination
 875                          Uint32 result = async_results::CIM_NAK;
 876 kumpf           1.104 
 877 mday            1.25     Message *legacy = req->_act;
 878 kumpf           1.104    if (legacy != 0)
 879 mday            1.14     {
 880 mday            1.25        MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 881 kumpf           1.104       if (queue != 0)
 882 mday            1.14        {
 883 kumpf           1.104          if (queue->isAsync() == true)
 884                                {
 885                                   (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 886                                }
 887                                else
 888                                {
 889                                   // Enqueue the response:
 890                                   queue->enqueue(req->get_action());
 891                                }
 892                       
 893                                result = async_results::OK;
 894 mday            1.14        }
 895                          }
 896                          _make_response(req, result);
 897                       }
 898                       
 899                       void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 900                       {
 901                          ;
 902 mday            1.1   }
 903                       
 904 kumpf           1.104 AsyncOpNode *MessageQueueService::get_op()
 905 mday            1.1   {
 906 mday            1.4      AsyncOpNode *op = new AsyncOpNode();
 907 kumpf           1.104 
 908 mday            1.9      op->_state = ASYNC_OPSTATE_UNKNOWN;
 909                          op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 910 kumpf           1.104 
 911 mday            1.1      return op;
 912                       }
 913                       
 914                       void MessageQueueService::return_op(AsyncOpNode *op)
 915                       {
 916 kumpf           1.104    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);
 917 mday            1.4      delete op;
 918 mday            1.1   }
 919                       
 920 mday            1.18  
 921 kumpf           1.104 Boolean MessageQueueService::ForwardOp(
 922                           AsyncOpNode *op,
 923                           Uint32 destination)
 924 mday            1.29  {
 925 kumpf           1.104    PEGASUS_ASSERT(op != 0);
 926 mday            1.30     op->lock();
 927                          op->_op_dest = MessageQueue::lookup(destination);
 928 mday            1.29     op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 929                          op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 930 mday            1.30     op->unlock();
 931 kumpf           1.104    if (op->_op_dest == 0)
 932 mday            1.30        return false;
 933 kumpf           1.104 
 934 mday            1.29     return  _meta_dispatcher->route_async(op);
 935                       }
 936                       
 937 kumpf           1.104 
 938                       Boolean MessageQueueService::SendAsync(
 939                           AsyncOpNode *op,
 940                           Uint32 destination,
 941                           void (*callback)(AsyncOpNode *, MessageQueue *, void *),
 942                           MessageQueue *callback_response_q,
 943                           void *callback_ptr)
 944                       {
 945                          PEGASUS_ASSERT(op != 0 && callback != 0);
 946                       
 947 mday            1.21     // get the queue handle for the destination
 948                       
 949 mday            1.30     op->lock();
 950 mday            1.32     op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 951 mday            1.22     op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 952                          op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 953 mday            1.30     // initialize the callback data
 954 mday            1.32     op->_async_callback = callback;   // callback function to be executed by recpt. of response
 955                          op->_callback_node = op;          // the op node
 956                          op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 957                          op->_callback_ptr = callback_ptr;   // user data for callback
 958                          op->_callback_request_q = this;     // I am the originator of this request
 959 kumpf           1.104 
 960 mday            1.30     op->unlock();
 961 kumpf           1.104    if (op->_op_dest == 0)
 962 mday            1.30        return false;
 963 kumpf           1.104 
 964 mday            1.21     return  _meta_dispatcher->route_async(op);
 965 mday            1.18  }
 966                       
 967                       
 968 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 969                           Message *msg,
 970                           Uint32 destination,
 971                           void (*callback)(Message *response, void *handle, void *parameter),
 972                           void *handle,
 973                           void *parameter)
 974 mday            1.39  {
 975 kumpf           1.104    if (msg == NULL)
 976 mday            1.39        return false;
 977 kumpf           1.104    if (callback == NULL)
 978 mday            1.39        return SendForget(msg);
 979                          AsyncOpNode *op = get_op();
 980 mday            1.43     msg->dest = destination;
 981 kumpf           1.104    if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 982 mday            1.39     {
 983                             op->release();
 984                             return_op(op);
 985                             return false;
 986                          }
 987                          op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 988                          op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 989                          op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 990                          op->__async_callback = callback;
 991                          op->_callback_node = op;
 992                          op->_callback_handle = handle;
 993                          op->_callback_parameter = parameter;
 994                          op->_callback_response_q = this;
 995                       
 996 kumpf           1.104    if (!(msg->getMask() & message_mask::ha_async))
 997 mday            1.39     {
 998 kumpf           1.104       AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
 999                                get_next_xid(),
1000                                op,
1001                                destination,
1002                                msg,
1003                                destination);
1004 mday            1.39     }
1005 kumpf           1.104    else
1006 mday            1.39     {
1007                             op->_request.insert_first(msg);
1008                             (static_cast<AsyncMessage *>(msg))->op = op;
1009                          }
1010                          return _meta_dispatcher->route_async(op);
1011                       }
1012                       
1013                       
1014 mday            1.18  Boolean MessageQueueService::SendForget(Message *msg)
1015                       {
1016                          AsyncOpNode *op = 0;
1017 mday            1.22     Uint32 mask = msg->getMask();
1018 kumpf           1.104 
1019 mday            1.22     if (mask & message_mask::ha_async)
1020 mday            1.18     {
1021                             op = (static_cast<AsyncMessage *>(msg))->op ;
1022                          }
1023 mday            1.22  
1024 kumpf           1.104    if (op == 0)
1025 mday            1.20     {
1026 mday            1.18        op = get_op();
1027 mday            1.20        op->_request.insert_first(msg);
1028 mday            1.22        if (mask & message_mask::ha_async)
1029 kumpf           1.104       {
1030                                (static_cast<AsyncMessage *>(msg))->op = op;
1031                             }
1032 mday            1.20     }
1033 mday            1.30     op->_op_dest = MessageQueue::lookup(msg->dest);
1034 mday            1.22     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
1035 kumpf           1.104    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1036                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
1037 mday            1.22     op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1038 kumpf           1.104    if (op->_op_dest == 0)
1039 mday            1.39     {
1040                             op->release();
1041                             return_op(op);
1042 mday            1.21        return false;
1043 mday            1.39     }
1044 kumpf           1.104 
1045 mday            1.18     // now see if the meta dispatcher will take it
1046 mday            1.30     return  _meta_dispatcher->route_async(op);
1047 mday            1.18  }
1048 mday            1.2   
1049 mday            1.1   
1050 mday            1.4   AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
1051 mday            1.1   {
1052 kumpf           1.104    if (request == 0)
1053 mday            1.4         return 0 ;
1054 mday            1.5   
1055                          Boolean destroy_op = false;
1056 kumpf           1.104 
1057 s.hills         1.80     if (request->op == 0)
1058 mday            1.5      {
1059                             request->op = get_op();
1060 mday            1.7         request->op->_request.insert_first(request);
1061 mday            1.5         destroy_op = true;
1062                          }
1063 kumpf           1.104 
1064 mday            1.33     request->block = false;
1065 mday            1.35     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1066 kumpf           1.104    SendAsync(
1067                             request->op,
1068                             request->dest,
1069                             _sendwait_callback,
1070                             this,
1071                             (void *)0);
1072                       
1073 baggio.br       1.100    request->op->_client_sem.wait();
1074                       
1075 mday            1.6      request->op->lock();
1076                          AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
1077                          rpl->op = 0;
1078                          request->op->unlock();
1079 kumpf           1.104 
1080                          if (destroy_op == true)
1081 mday            1.5      {
1082 mday            1.6         request->op->lock();
1083                             request->op->_request.remove(request);
1084                             request->op->_state |= ASYNC_OPSTATE_RELEASED;
1085                             request->op->unlock();
1086                             return_op(request->op);
1087 mday            1.7         request->op = 0;
1088 mday            1.5      }
1089                          return rpl;
1090 mday            1.1   }
1091                       
1092                       
1093 kumpf           1.104 Boolean MessageQueueService::register_service(
1094                           String name,
1095                           Uint32 capabilities,
1096                           Uint32 mask)
1097                       {
1098                          RegisterCimService *msg = new RegisterCimService(
1099                             get_next_xid(),
1100                             0,
1101                             true,
1102                             name,
1103                             capabilities,
1104                             mask,
1105                             _queueId);
1106                          msg->dest = CIMOM_Q_ID;
1107 mday            1.1   
1108                          Boolean registered = false;
1109 kumpf           1.104    AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1110                       
1111                          if (reply != 0)
1112                          {
1113                             if (reply->getMask() & message_mask::ha_async)
1114                             {
1115                                if (reply->getMask() & message_mask::ha_reply)
1116                                {
1117                                   if (reply->result == async_results::OK ||
1118                                       reply->result == async_results::MODULE_ALREADY_REGISTERED)
1119                                   {
1120                                       registered = true;
1121                                   }
1122                                }
1123 mday            1.1         }
1124 kumpf           1.104 
1125                             delete reply;
1126 mday            1.1      }
1127 mday            1.5      delete msg;
1128 mday            1.1      return registered;
1129                       }
1130                       
1131                       Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1132                       {
1133 kumpf           1.104    UpdateCimService *msg = new UpdateCimService(
1134                             get_next_xid(),
1135                             0,
1136                             true,
1137                             _queueId,
1138                             _capabilities,
1139                             _mask);
1140 mday            1.1      Boolean registered = false;
1141 mday            1.2   
1142                          AsyncMessage *reply = SendWait(msg);
1143                          if (reply)
1144 mday            1.1      {
1145 kumpf           1.104       if (reply->getMask() & message_mask::ha_async)
1146 mday            1.1         {
1147 kumpf           1.104          if (reply->getMask() & message_mask::ha_reply)
1148                                {
1149                                   if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1150                                   {
1151                                      registered = true;
1152                                   }
1153                                }
1154 mday            1.1         }
1155                             delete reply;
1156                          }
1157 mday            1.5      delete msg;
1158 mday            1.1      return registered;
1159                       }
1160                       
1161                       
1162 kumpf           1.104 Boolean MessageQueueService::deregister_service()
1163 mday            1.1   {
1164 mday            1.3   
1165 mday            1.5      _meta_dispatcher->deregister_module(_queueId);
1166                          return true;
1167 mday            1.1   }
1168                       
1169                       
1170 kumpf           1.104 void MessageQueueService::find_services(
1171                           String name,
1172                           Uint32 capabilities,
1173                           Uint32 mask,
1174                           Array<Uint32> *results)
1175 mday            1.1   {
1176 kumpf           1.104    if (results == 0)
1177                          {
1178 mday            1.1         throw NullPointer();
1179 kumpf           1.104    }
1180                       
1181 mday            1.1      results->clear();
1182 kumpf           1.104 
1183                          FindServiceQueue *req = new FindServiceQueue(
1184                             get_next_xid(),
1185                             0,
1186                             _queueId,
1187                             true,
1188                             name,
1189                             capabilities,
1190                             mask);
1191                       
1192 mday            1.44     req->dest = CIMOM_Q_ID;
1193 kumpf           1.104 
1194                          AsyncMessage *reply = SendWait(req);
1195                          if (reply)
1196                          {
1197                             if (reply->getMask() & message_mask::ha_async)
1198                             {
1199                                if (reply->getMask() & message_mask::ha_reply)
1200                                {
1201                                   if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1202                                   {
1203                                      if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1204                                         *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1205                                   }
1206                                }
1207 mday            1.1         }
1208                             delete reply;
1209                          }
1210 mday            1.5      delete req;
1211 mday            1.1      return ;
1212                       }
1213                       
1214                       void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1215                       {
1216 kumpf           1.104    if (result == 0)
1217                          {
1218 mday            1.1         throw NullPointer();
1219 kumpf           1.104    }
1220                       
1221                          EnumerateService *req = new EnumerateService(
1222                             get_next_xid(),
1223                             0,
1224                             _queueId,
1225                             true,
1226                             queue);
1227                       
1228 mday            1.2      AsyncMessage *reply = SendWait(req);
1229 kumpf           1.104 
1230 mday            1.2      if (reply)
1231 mday            1.1      {
1232                             Boolean found = false;
1233 kumpf           1.104 
1234                             if (reply->getMask() & message_mask::ha_async)
1235 mday            1.1         {
1236 kumpf           1.104          if (reply->getMask() & message_mask::ha_reply)
1237                                {
1238                                   if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1239                                   {
1240                                      if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1241                                      {
1242                                         if (found == false)
1243                                         {
1244                                            found = true;
1245                       
1246                                            result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1247                                            result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1248                                            result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1249                                            result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1250                                         }
1251                                      }
1252                                   }
1253                                }
1254 mday            1.1         }
1255                             delete reply;
1256                          }
1257 mday            1.5      delete req;
1258 kumpf           1.104 
1259 mday            1.1      return;
1260                       }
1261                       
1262 kumpf           1.104 Uint32 MessageQueueService::get_next_xid()
1263 mday            1.1   {
1264 mday            1.78     static Mutex _monitor;
1265                          Uint32 value;
1266 kumpf           1.104    AutoMutex autoMut(_monitor);
1267 mday            1.1      _xid++;
1268 mday            1.78     value =  _xid.value();
1269                          return value;
1270 kumpf           1.104 
1271 mday            1.1   }
1272                       
1273                       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2