(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.112 and 1.155

version 1.112, 2005/06/14 17:10:37 version 1.155, 2008/12/04 18:23:07
Line 1 
Line 1 
 //%2005////////////////////////////////////////////////////////////////////////  //%LICENSE////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development  // Licensed to The Open Group (TOG) under one or more contributor license
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.  // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;  // this work for additional information regarding copyright ownership.
 // IBM Corp.; EMC Corporation, The Open Group.  // Each contributor licenses this file to you under the OpenPegasus Open
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;  // Source License; you may not use this file except in compliance with the
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.  // License.
 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;  //
 // EMC Corporation; VERITAS Software Corporation; The Open Group.  // Permission is hereby granted, free of charge, to any person obtaining a
   // copy of this software and associated documentation files (the "Software"),
   // to deal in the Software without restriction, including without limitation
   // the rights to use, copy, modify, merge, publish, distribute, sublicense,
   // and/or sell copies of the Software, and to permit persons to whom the
   // Software is furnished to do so, subject to the following conditions:
   //
   // The above copyright notice and this permission notice shall be included
   // in all copies or substantial portions of the Software.
   //
   // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
   // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
   // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
   // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
   // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy  //////////////////////////////////////////////////////////////////////////
 // of this software and associated documentation files (the "Software"), to  
 // deal in the Software without restriction, including without limitation the  
 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or  
 // sell copies of the Software, and to permit persons to whom the Software is  
 // furnished to do so, subject to the following conditions:  
 //  
 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN  
 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED  
 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT  
 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR  
 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT  
 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN  
 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION  
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
 //  
 //==============================================================================  
 //  
 // Author: Mike Day (mdday@us.ibm.com)  
 //  
 // Modified By:  
 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657  
 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259  
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 // #include <iostream.h>  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/MessageLoader.h> //l10n  #include <Pegasus/Common/MessageLoader.h>
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 cimom *MessageQueueService::_meta_dispatcher = 0; cimom *MessageQueueService::_meta_dispatcher = 0;
 AtomicInt MessageQueueService::_service_count = 0;  AtomicInt MessageQueueService::_service_count(0);
 AtomicInt MessageQueueService::_xid(1);  
 Mutex MessageQueueService::_meta_dispatcher_mutex;  
  
 static struct timeval deallocateWait = {300, 0}; static struct timeval deallocateWait = {300, 0};
  
 ThreadPool *MessageQueueService::_thread_pool = 0; ThreadPool *MessageQueueService::_thread_pool = 0;
  
 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);  MessageQueueService::PollingList* MessageQueueService::_polling_list;
  
 Thread* MessageQueueService::_polling_thread = 0; Thread* MessageQueueService::_polling_thread = 0;
  
   /*
       PollingListEntry holds the service and it's status whether the service
       is dead or not. Each service creates its own PollingListEntry and added
       to the PollingList which is monitored by the polling thread. Polling thread
       monitors the service only if it's die flag is not set.
   */
   
   struct PollingListEntry : public Linkable
   {
       MessageQueueService *service;
       Boolean die;
   
       PollingListEntry(MessageQueueService *service)
           :service(service),
            die(false)
       {
       }
       ~PollingListEntry()
       {
       }
   private:
       PollingListEntry(const PollingListEntry&);
       PollingListEntry& operator = (const PollingListEntry&);
   };
   
 ThreadPool *MessageQueueService::get_thread_pool() ThreadPool *MessageQueueService::get_thread_pool()
 { {
    return _thread_pool;    return _thread_pool;
 } }
   
 // //
 // MAX_THREADS_PER_SVC_QUEUE_LIMIT  // MAX_THREADS_PER_SVC_QUEUE
 //  
 // 5000 is seriously too high a number for the limit but since  
 // previously there was no limit at all this is intended to approximate  
 // that behavior. In my testing on a unit processor system the system  
 // behaved best with a low number 2 to 5 for the MAX_THREADS_PER_SVC_QUEUE.  
 // When set to 1000 the system deadlocked with indications that were  
 // not delivered and apparently left sitting within the server in a queue.  
 // //
 // JR Wunderlich Jun 6, 2005 // JR Wunderlich Jun 6, 2005
 // //
  
 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
   #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  
 Uint32 max_threads_per_svc_queue;  #ifndef MAX_THREADS_PER_SVC_QUEUE
   # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  
 MessageQueueService::kill_idle_threads(void *parm)  
 {  
   
    static struct timeval now, last = {0,0};  
    gettimeofday(&now, NULL);  
    int dead_threads = 0;  
   
    if (now.tv_sec - last.tv_sec > 120)  
    {  
       gettimeofday(&last, NULL);  
       try  
       {  
          dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();  
       }  
       catch(...)  
       {  
   
       }  
    }  
   
 #ifdef PEGASUS_POINTER_64BIT  
    return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;  
 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX  
    return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;  
 #else  
    return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;  
 #endif #endif
 }  
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)  Uint32 max_threads_per_svc_queue;
   
   ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
       void* parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());      MessageQueueService::PollingList *list =
    while (_stop_polling.value()  == 0)          reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm());
   
       while (_stop_polling.get()  == 0)
    {    {
       _polling_sem.wait();       _polling_sem.wait();
  
       if (_stop_polling.value() != 0)          if (_stop_polling.get() != 0)
       {       {
          break;          break;
       }       }
  
       list->lock();          PollingListEntry *entry = list->front();
       int list_index = 0;          ThreadStatus rtn = PEGASUS_THREAD_OK;
       MessageQueueService *service = list->next(0);  
       while(service != NULL)  
         {  
           int rtn;  
           rtn = true;  
           if (service->_incoming.count() > 0  
               && service->_die.value() == 0  
               && service->_threads <= max_threads_per_svc_queue)  
             rtn = _thread_pool->allocate_and_awaken(service, _req_proc,  
                                                         &_polling_sem);  
  
           // if no more threads available, break from processing loop          do
           if (rtn == false)  
             {             {
               service = NULL;              MessageQueueService *service = entry->service;
             }              // Note: MessageQueueService destructor sets die flag when service
           else              // gets destroyed during CIMOM shutdown. Don't monitor the service
               // if die flag set.
               if ((entry->die == false) &&
                   (service->_incoming.count() > 0) &&
                   (service->_threads.get() < max_threads_per_svc_queue))
               {
                   // The _threads count is used to track the
                   // number of active threads that have been allocated
                   // to process messages for this service.
   
                   // The _threads count MUST be incremented while
                   // the polling_routine owns the _polling_thread
                   // lock and has ownership of the service object.
   
                   service->_threads++;
                   try
             {             {
               service = list->next(service);                      rtn = _thread_pool->allocate_and_awaken(
             }                          service, _req_proc, &_polling_sem);
         }         }
       list->unlock();                  catch (...)
   
       if (_check_idle_flag.value() != 0)  
       {       {
          _check_idle_flag = 0;                      service->_threads--;
          // try to do idle thread clean up processing when system is not busy  
          // if system is busy there may not be a thread available to allocate  
          // so nothing will be done and that is OK.  
  
          _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem);                      // allocate_and_awaken should never generate an exception.
                       PEGASUS_ASSERT(0);
                   }
                   // if no more threads available, break from processing loop
                   if (rtn != PEGASUS_THREAD_OK )
                   {
                       service->_threads--;
                       PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
                           "Could not allocate thread for %s.  Queue has %d "
                               "messages waiting and %d threads servicing."
                               "Skipping the service for right now. ",
                           service->getQueueName(),
                           service->_incoming.count(),
                           service->_threads.get()));
  
                       Threads::yield();
                       break;
       }       }
    }    }
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );              entry = list->next_of(entry);
    return(0);          } while (entry != NULL);
       }
       return ThreadReturnType(0);
 } }
  
  
 Semaphore MessageQueueService::_polling_sem(0); Semaphore MessageQueueService::_polling_sem(0);
 AtomicInt MessageQueueService::_stop_polling(0); AtomicInt MessageQueueService::_stop_polling(0);
 AtomicInt MessageQueueService::_check_idle_flag(0);  
  
  
 MessageQueueService::MessageQueueService( MessageQueueService::MessageQueueService(
    const char *name,    const char *name,
    Uint32 queueID,      Uint32 queueID)
    Uint32 capabilities,  
    Uint32 mask)  
    : Base(name, true,  queueID),    : Base(name, true,  queueID),
      _mask(mask),  
      _die(0),  
         _threads(0),         _threads(0),
      _incoming(true, 0),        _incoming(),
      _incoming_queue_shutdown(0)      _incoming_queue_shutdown(0)
 { {
       _isRunning = true;
    _capabilities = (capabilities | module_capabilities::async);  
   
    _default_op_timeout.tv_sec = 30;  
    _default_op_timeout.tv_usec = 100;  
  
    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
  
    // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT      // if requested thread max is out of range, then set to
    // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT      // MAX_THREADS_PER_SVC_QUEUE_LIMIT
   
    if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)  
      {  
        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;  
      }  
   
    // if requested threads eq 0 (unlimited)  
    // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT  
  
    if (max_threads_per_svc_queue == 0)      if ((max_threads_per_svc_queue < 1) ||
           (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
      {      {
        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
      }      }
  
    // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
    // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;         "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
   
   
    AutoMutex autoMut(_meta_dispatcher_mutex);  
  
    if (_meta_dispatcher == 0)    if (_meta_dispatcher == 0)
    {    {
       _stop_polling = 0;       _stop_polling = 0;
       PEGASUS_ASSERT(_service_count.value() == 0);          PEGASUS_ASSERT(_service_count.get() == 0);
       _meta_dispatcher = new cimom();       _meta_dispatcher = new cimom();
       if (_meta_dispatcher == NULL)  
       {  
          throw NullPointer();  
       }  
       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
       //   minimum_cnt, maximum_cnt, deallocateWait);       //   minimum_cnt, maximum_cnt, deallocateWait);
       //       //
Line 224 
Line 206 
    }    }
    _service_count++;    _service_count++;
  
    if (false == register_service(name, _capabilities, _mask))      // Add to the polling list
       if (!_polling_list)
    {    {
       //l10n          _polling_list = new PollingList;
       //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");  
       MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",  
          "MessageQueueService Base Unable to register with  Meta Dispatcher");  
   
       throw BindFailedException(parms);  
    }    }
       pollingListEntry = new PollingListEntry(this);
    _polling_list.insert_last(this);      _polling_list->insert_back(pollingListEntry);
   
 } }
  
  
 MessageQueueService::~MessageQueueService() MessageQueueService::~MessageQueueService()
 { {
    _die = 1;      // Close incoming queue.
       if (_incoming_queue_shutdown.get() == 0)
    if (_incoming_queue_shutdown.value() == 0)  
    {    {
       _shutdown_incoming_queue();          AsyncIoClose *msg = new AsyncIoClose(
               0,
               _queueId,
               _queueId,
               true);
           SendForget(msg);
           // Wait until our queue has been shutdown.
           while (_incoming_queue_shutdown.get() == 0)
           {
               Threads::yield();
    }    }
       }
   
       // Die now. Setting this flag to true instructs the polling thread not to
       // monitor this service.
       pollingListEntry->die = true;
  
  while (_threads.value() > 0)      // Wait until all threads processing the messages
       // for this service have completed.
       while (_threads.get() > 0)
      {      {
           pegasus_yield();          Threads::yield();
      }      }
    _polling_list.remove(this);  
    {    {
      AutoMutex autoMut(_meta_dispatcher_mutex);  
      _service_count--;      _service_count--;
      if (_service_count.value() == 0)          // If we are last service to die, delete metadispatcher.
           if (_service_count.get() == 0)
      {      {
   
       _stop_polling++;       _stop_polling++;
       _polling_sem.signal();       _polling_sem.signal();
       if (_polling_thread) {              if (_polling_thread)
               {
           _polling_thread->join();           _polling_thread->join();
           delete _polling_thread;           delete _polling_thread;
           _polling_thread = 0;           _polling_thread = 0;
       }       }
       _meta_dispatcher->_shutdown_routed_queue();  
       delete _meta_dispatcher;       delete _meta_dispatcher;
       _meta_dispatcher = 0;       _meta_dispatcher = 0;
  
       delete _thread_pool;       delete _thread_pool;
       _thread_pool = 0;       _thread_pool = 0;
      }  
    } // mutex unlocks here              // Cleanup polling list
    // Clean up in case there are extra stuff on the queue.              PollingListEntry *entry;
   while (_incoming.count())              while ((entry = _polling_list->remove_front()))
   {  
     try {  
       delete _incoming.remove_first();  
     } catch (const ListClosed &e)  
     {     {
       // If the list is closed, there is nothing we can do.                  delete entry;
       break;  
     }     }
   }   }
 } }
  
 void MessageQueueService::_shutdown_incoming_queue()      // Clean up any extra stuff on the queue.
 {      AsyncOpNode* op = 0;
    if (_incoming_queue_shutdown.value() > 0)      while ((op = _incoming.dequeue()))
       return;  
   
    AsyncIoctl *msg = new AsyncIoctl(  
       get_next_xid(),  
       0,  
       _queueId,  
       _queueId,  
       true,  
       AsyncIoctl::IO_CLOSE,  
       0,  
       0);  
   
    msg->op = get_op();  
    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
    msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
        | ASYNC_OPFLAGS_SIMPLE_STATUS);  
    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
   
    msg->op->_op_dest = this;  
    msg->op->_request.insert_first(msg);  
    try {  
      _incoming.insert_last_wait(msg->op);  
      _polling_sem.signal();  
    } catch (const ListClosed &)  
    {  
         // This means the queue has already been shut-down (happens  when there  
     // are two AsyncIoctrl::IO_CLOSE messages generated and one got first  
     // processed.  
      delete msg;  
    }  
    catch (const Permission &)  
    {    {
      delete msg;          delete op;
    }    }
 } }
  
   
   
 void MessageQueueService::enqueue(Message *msg) void MessageQueueService::enqueue(Message *msg)
 { {
    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
Line 339 
Line 291 
 } }
  
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(  ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
     void * parm)     void * parm)
 { {
     MessageQueueService* service =     MessageQueueService* service =
Line 347 
Line 299 
     PEGASUS_ASSERT(service != 0);     PEGASUS_ASSERT(service != 0);
     try     try
     {     {
           if (service->pollingListEntry->die)
         if (service->_die.value() != 0)  
         {         {
             return (0);              service->_threads--;
               return 0;
         }         }
             service->_threads++;  
         // pull messages off the incoming queue and dispatch them. then         // pull messages off the incoming queue and dispatch them. then
         // check pending messages that are non-blocking         // check pending messages that are non-blocking
         AsyncOpNode *operation = 0;         AsyncOpNode *operation = 0;
Line 360 
Line 311 
         // many operations may have been queued.         // many operations may have been queued.
         do         do
         {         {
             try              operation = service->_incoming.dequeue();
             {  
                 operation = service->_incoming.remove_first();  
             }  
             catch (ListClosed &)  
             {  
                 // ATTN: This appears to be a common loop exit path.  
                 //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                 //    "Caught ListClosed exception.  Exiting _req_proc.");  
                 break;  
             }  
  
             if (operation)             if (operation)
             {             {
Line 381 
Line 322 
     }     }
     catch (const Exception& e)     catch (const Exception& e)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
             String("Caught exception: \"") + e.getMessage() +              "Caught exception: \"%s\".  Exiting _req_proc.",
                 "\".  Exiting _req_proc.");              (const char*)e.getMessage().getCString()));
     }     }
     catch (...)     catch (...)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
             "Caught unrecognized exception.  Exiting _req_proc.");             "Caught unrecognized exception.  Exiting _req_proc.");
     }     }
     service->_threads--;     service->_threads--;
     return(0);      return 0;
 } }
  
  
Line 408 
Line 349 
 // including op, op->_callback_node, and op->_callback_ptr // including op, op->_callback_node, and op->_callback_ptr
 void MessageQueueService::_handle_async_callback(AsyncOpNode *op) void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 { {
    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)      PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_CALLBACK);
    {  
   
       Message *msg = op->get_request();  
       if (msg && (msg->getMask() & message_mask::ha_async))  
       {  
          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)  
          {  
             AsyncLegacyOperationStart *wrapper =  
                static_cast<AsyncLegacyOperationStart *>(msg);  
             msg = wrapper->get_action();  
             delete wrapper;  
          }  
          else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)  
          {  
             AsyncModuleOperationStart *wrapper =  
                static_cast<AsyncModuleOperationStart *>(msg);  
             msg = wrapper->get_action();  
             delete wrapper;  
          }  
          else if (msg->getType() == async_messages::ASYNC_OP_START)  
          {  
             AsyncOperationStart *wrapper =  
                static_cast<AsyncOperationStart *>(msg);  
             msg = wrapper->get_action();  
             delete wrapper;  
          }  
          delete msg;  
       }  
   
       msg = op->get_response();  
       if (msg && (msg->getMask() & message_mask::ha_async))  
       {  
          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)  
          {  
             AsyncLegacyOperationResult *wrapper =  
                static_cast<AsyncLegacyOperationResult *>(msg);  
             msg = wrapper->get_result();  
             delete wrapper;  
          }  
          else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)  
          {  
             AsyncModuleOperationResult *wrapper =  
                static_cast<AsyncModuleOperationResult *>(msg);  
             msg = wrapper->get_result();  
             delete wrapper;  
          }  
       }  
       void (*callback)(Message *, void *, void *) = op->__async_callback;  
       void *handle = op->_callback_handle;  
       void *parm = op->_callback_parameter;  
       op->release();  
       return_op(op);  
       callback(msg, handle, parm);  
    }  
    else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)  
    {  
       // note that _callback_node may be different from op       // note that _callback_node may be different from op
       // op->_callback_response_q is a "this" pointer we can use for       // op->_callback_response_q is a "this" pointer we can use for
       // static callback methods       // static callback methods
       op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);      op->_async_callback(
    }          op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 } }
  
  
Line 477 
Line 362 
 { {
    if (operation != 0)    if (operation != 0)
    {    {
           Message *rq = operation->_request.get();
 // ATTN: optimization  
 // << Tue Feb 19 14:10:38 2002 mdd >>  
       operation->lock();  
   
       Message *rq = operation->_request.next(0);  
  
 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>> // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 // move this to the bottom of the loop when the majority of // move this to the bottom of the loop when the majority of
 // messages become async messages. // messages become async messages.
  
       // divert legacy messages to handleEnqueue       // divert legacy messages to handleEnqueue
       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))          if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
       {       {
          rq = operation->_request.remove_first() ;              operation->_request.release();
          operation->unlock();  
          // delete the op node          // delete the op node
          operation->release();  
          return_op(operation);          return_op(operation);
   
          handleEnqueue(rq);          handleEnqueue(rq);
          return;          return;
       }       }
  
       if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||          if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK) &&
            operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&  
           (operation->_state & ASYNC_OPSTATE_COMPLETE))           (operation->_state & ASYNC_OPSTATE_COMPLETE))
       {       {
          operation->unlock();  
          _handle_async_callback(operation);          _handle_async_callback(operation);
       }       }
       else       else
       {       {
          PEGASUS_ASSERT(rq != 0);          PEGASUS_ASSERT(rq != 0);
          operation->unlock();  
          _handle_async_request(static_cast<AsyncRequest *>(rq));          _handle_async_request(static_cast<AsyncRequest *>(rq));
       }       }
    }    }
Line 520 
Line 394 
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
    if (req != 0)      MessageType type = req->getType();
       if (type == ASYNC_IOCLOSE)
       {
           handle_AsyncIoClose(static_cast<AsyncIoClose*>(req));
       }
       else if (type == ASYNC_CIMSERVICE_START)
    {    {
       req->op->processing();  
   
       Uint32 type = req->getType();  
       if (type == async_messages::HEARTBEAT)  
          handle_heartbeat_request(req);  
       else if (type == async_messages::IOCTL)  
          handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));  
       else if (type == async_messages::CIMSERVICE_START)  
          handle_CimServiceStart(static_cast<CimServiceStart *>(req));          handle_CimServiceStart(static_cast<CimServiceStart *>(req));
       else if (type == async_messages::CIMSERVICE_STOP)      }
       else if (type == ASYNC_CIMSERVICE_STOP)
       {
          handle_CimServiceStop(static_cast<CimServiceStop *>(req));          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
       else if (type == async_messages::CIMSERVICE_PAUSE)      }
          handle_CimServicePause(static_cast<CimServicePause *>(req));  
       else if (type == async_messages::CIMSERVICE_RESUME)  
          handle_CimServiceResume(static_cast<CimServiceResume *>(req));  
       else if (type == async_messages::ASYNC_OP_START)  
          handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));  
       else       else
       {       {
          // we don't handle this request message          // we don't handle this request message
          _make_response(req, async_results::CIM_NAK);          _make_response(req, async_results::CIM_NAK);
       }       }
    }    }
 }  
   
  
 Boolean MessageQueueService::_enqueueResponse( Boolean MessageQueueService::_enqueueResponse(
    Message* request,    Message* request,
    Message* response)    Message* response)
 { {
   
   STAT_COPYDISPATCHER  
   
    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
                     "MessageQueueService::_enqueueResponse");                     "MessageQueueService::_enqueueResponse");
  
    if (request->getMask() & message_mask::ha_async)      if (request->getMask() & MessageMask::ha_async)
    {    {
       if (response->getMask() & message_mask::ha_async)          if (response->getMask() & MessageMask::ha_async)
       {       {
          _completeAsyncResponse(static_cast<AsyncRequest *>(request),              _completeAsyncResponse(
                                 static_cast<AsyncReply *>(response),                  static_cast<AsyncRequest *>(request),
                                 ASYNC_OPSTATE_COMPLETE, 0);                  static_cast<AsyncReply *>(response));
   
          PEG_METHOD_EXIT();          PEG_METHOD_EXIT();
          return true;          return true;
       }       }
    }    }
  
    if (request->_async != 0)      AsyncRequest* asyncRequest =
           static_cast<AsyncRequest*>(request->get_async());
   
       if (asyncRequest != 0)
    {    {
       Uint32 mask = request->_async->getMask();          PEGASUS_ASSERT(asyncRequest->getMask() &
       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));              (MessageMask::ha_async | MessageMask::ha_request));
   
           AsyncOpNode* op = asyncRequest->op;
  
       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);  
       AsyncOpNode *op = async->op;  
       request->_async = 0;  
       // the legacy request is going to be deleted by its handler       // the legacy request is going to be deleted by its handler
       // remove it from the op node       // remove it from the op node
  
       static_cast<AsyncLegacyOperationStart *>(async)->get_action();          static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action();
  
       AsyncLegacyOperationResult *async_result =       AsyncLegacyOperationResult *async_result =
          new AsyncLegacyOperationResult(          new AsyncLegacyOperationResult(
             async->getKey(),  
             async->getRouting(),  
             op,             op,
             response);             response);
       _completeAsyncResponse(       _completeAsyncResponse(
          async,              asyncRequest,
          async_result,              async_result);
          ASYNC_OPSTATE_COMPLETE,  
          0);  
       PEG_METHOD_EXIT();       PEG_METHOD_EXIT();
       return true;       return true;
    }    }
Line 601 
Line 464 
    // ensure that the destination queue is in response->dest    // ensure that the destination queue is in response->dest
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
    return SendForget(response);    return SendForget(response);
   
 } }
  
 void MessageQueueService::_make_response(Message *req, Uint32 code) void MessageQueueService::_make_response(Message *req, Uint32 code)
Line 609 
Line 471 
    cimom::_make_response(req, code);    cimom::_make_response(req, code);
 } }
  
   
 void MessageQueueService::_completeAsyncResponse( void MessageQueueService::_completeAsyncResponse(
     AsyncRequest *request,     AsyncRequest *request,
     AsyncReply *reply,      AsyncReply* reply)
     Uint32 state,  
     Uint32 flag)  
 { {
    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
                     "MessageQueueService::_completeAsyncResponse");                     "MessageQueueService::_completeAsyncResponse");
  
    cimom::_completeAsyncResponse(request, reply, state, flag);      cimom::_completeAsyncResponse(request, reply);
  
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
 } }
  
  
 void MessageQueueService::_complete_op_node( void MessageQueueService::_complete_op_node(
     AsyncOpNode *op,      AsyncOpNode* op)
     Uint32 state,  
     Uint32 flag,  
     Uint32 code)  
 { {
    cimom::_complete_op_node(op, state, flag, code);      cimom::_complete_op_node(op);
 } }
  
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
    if (_incoming_queue_shutdown.value() > 0)      if (_incoming_queue_shutdown.get() > 0)
       return false;       return false;
    if (_polling_thread == NULL)    if (_polling_thread == NULL)
    {    {
           PEGASUS_ASSERT(_polling_list);
       _polling_thread = new Thread(       _polling_thread = new Thread(
           polling_routine,           polling_routine,
           reinterpret_cast<void *>(&_polling_list),              reinterpret_cast<void *>(_polling_list),
           false);           false);
       while (!_polling_thread->run())          ThreadStatus tr = PEGASUS_THREAD_OK;
           while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
       {       {
          pegasus_yield();              if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
                   Threads::yield();
               else
                   throw Exception(MessageLoaderParms(
                       "Common.MessageQueueService.NOT_ENOUGH_THREAD",
                       "Could not allocate thread for the polling thread."));
       }       }
    }    }
 // ATTN optimization remove the message checking altogether in the base      if (pollingListEntry->die == false)
 // << Mon Feb 18 14:02:20 2002 mdd >>      {
    op->lock();          if (_incoming.enqueue(op))
    Message *rq = op->_request.next(0);  
    Message *rp = op->_response.next(0);  
    op->unlock();  
   
    if ((rq != 0 && (true == messageOK(rq))) ||  
        (rp != 0 && (true == messageOK(rp))) && _die.value() == 0)  
    {    {
       _incoming.insert_last_wait(op);  
       _polling_sem.signal();       _polling_sem.signal();
       return true;       return true;
    }    }
    return false;  
 } }
   
 Boolean MessageQueueService::messageOK(const Message *msg)  
 {  
    if (_incoming_queue_shutdown.value() > 0)  
       return false;       return false;
    return true;  
 } }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)  void MessageQueueService::handle_AsyncIoClose(AsyncIoClose *req)
 { {
    // default action is to echo a heartbeat response      MessageQueueService *service =
           static_cast<MessageQueueService*>(req->op->_op_dest);
    AsyncReply *reply = new AsyncReply(  
       async_messages::HEARTBEAT,  
       req->getKey(),  
       req->getRouting(),  
       0,  
       req->op,  
       async_results::OK,  
       req->resp,  
       false);  
    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);  
 }  
   
   
 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)  
 {  
 }  
   
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)  
 {  
    switch (req->ctl)  
    {  
       case AsyncIoctl::IO_CLOSE:  
       {  
          MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);  
  
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
          PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);      PEGASUS_STD(cout) << service->getQueueName() <<
           " Received AsyncIoClose " << PEGASUS_STD(endl);
 #endif #endif
       // set the closing flag, don't accept any more messages
       service->_incoming_queue_shutdown = 1;
  
          // respond to this message. this is fire and forget, so we don't need to delete anything.      // respond to this message. this is fire and forget, so we
       // don't need to delete anything.
          // this takes care of two problems that were being found          // this takes care of two problems that were being found
          // << Thu Oct  9 10:52:48 2003 mdd >>          // << Thu Oct  9 10:52:48 2003 mdd >>
           _make_response(req, async_results::OK);           _make_response(req, async_results::OK);
          // ensure we do not accept any further messages  
   
          // ensure we don't recurse on IO_CLOSE  
          if (_incoming_queue_shutdown.value() > 0)  
             break;  
   
          // set the closing flag  
          service->_incoming_queue_shutdown = 1;  
          // empty out the queue  
          while (1)  
          {  
             AsyncOpNode *operation;  
             try  
             {  
                operation = service->_incoming.remove_first();  
             }  
             catch(IPCException &)  
             {  
                break;  
             }  
             if (operation)  
             {  
                operation->_service_ptr = service;  
                service->_handle_incoming_operation(operation);  
             }  
             else  
                break;  
          } // message processing loop  
   
          // shutdown the AsyncDQueue  
          service->_incoming.shutdown_queue();  
          return;  
       }  
   
       default:  
          _make_response(req, async_results::CIM_NAK);  
    }  
 } }
  
 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req) void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 { {
   
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
    PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);      PEGASUS_STD(cout) << getQueueName() << "received START" <<
           PEGASUS_STD(endl);
 #endif #endif
       PEGASUS_ASSERT(!_isRunning);
    // clear the stoped bit and update      _isRunning = true;
    _capabilities &= (~(module_capabilities::stopped));  
    _make_response(req, async_results::OK);    _make_response(req, async_results::OK);
    // now tell the meta dispatcher we are stopped  
    update_service(_capabilities, _mask);  
   
 } }
   
 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req) void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 { {
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
    PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);    PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 #endif #endif
    // set the stopeed bit and update      PEGASUS_ASSERT(_isRunning);
    _capabilities |= module_capabilities::stopped;      _isRunning = false;
    _make_response(req, async_results::CIM_STOPPED);      _make_response(req, async_results::CIM_SERVICE_STOPPED);
    // now tell the meta dispatcher we are stopped  
    update_service(_capabilities, _mask);  
 }  
   
 void MessageQueueService::handle_CimServicePause(CimServicePause *req)  
 {  
    // set the paused bit and update  
    _capabilities |= module_capabilities::paused;  
    update_service(_capabilities, _mask);  
    _make_response(req, async_results::CIM_PAUSED);  
    // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)  
 {  
    // clear the paused  bit and update  
    _capabilities &= (~(module_capabilities::paused));  
    update_service(_capabilities, _mask);  
    _make_response(req, async_results::OK);  
    // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)  
 {  
    _make_response(req, async_results::CIM_NAK);  
 }  
   
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)  
 {  
    ;  
 }  
   
   
 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)  
 {  
    // remove the legacy message from the request and enqueue it to its destination  
    Uint32 result = async_results::CIM_NAK;  
   
    Message *legacy = req->_act;  
    if (legacy != 0)  
    {  
       MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);  
       if (queue != 0)  
       {  
          if (queue->isAsync() == true)  
          {  
             (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);  
          }  
          else  
          {  
             // Enqueue the response:  
             queue->enqueue(req->get_action());  
          }  
   
          result = async_results::OK;  
       }  
    }  
    _make_response(req, result);  
 }  
   
 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)  
 {  
    ;  
 } }
  
 AsyncOpNode *MessageQueueService::get_op() AsyncOpNode *MessageQueueService::get_op()
Line 842 
Line 569 
    AsyncOpNode *op = new AsyncOpNode();    AsyncOpNode *op = new AsyncOpNode();
  
    op->_state = ASYNC_OPSTATE_UNKNOWN;    op->_state = ASYNC_OPSTATE_UNKNOWN;
    op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;     op->_flags = ASYNC_OPFLAGS_UNKNOWN;
  
    return op;    return op;
 } }
  
 void MessageQueueService::return_op(AsyncOpNode *op) void MessageQueueService::return_op(AsyncOpNode *op)
 { {
    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);  
    delete op;    delete op;
 } }
  
  
 Boolean MessageQueueService::ForwardOp(  Boolean MessageQueueService::SendAsync(
     AsyncOpNode *op,     AsyncOpNode *op,
     Uint32 destination)      Uint32 destination,
       void (*callback)(AsyncOpNode*, MessageQueue*, void*),
       MessageQueue* callback_response_q,
       void* callback_ptr)
 { {
    PEGASUS_ASSERT(op != 0);      return _sendAsync(
    op->lock();          op,
    op->_op_dest = MessageQueue::lookup(destination);          destination,
    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);          callback,
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);          callback_response_q,
    op->unlock();          callback_ptr,
    if (op->_op_dest == 0)          ASYNC_OPFLAGS_CALLBACK);
       return false;  
  
    return  _meta_dispatcher->route_async(op);  
 } }
  
   Boolean MessageQueueService::_sendAsync(
 Boolean MessageQueueService::SendAsync(  
     AsyncOpNode *op,     AsyncOpNode *op,
     Uint32 destination,     Uint32 destination,
     void (*callback)(AsyncOpNode *, MessageQueue *, void *),     void (*callback)(AsyncOpNode *, MessageQueue *, void *),
     MessageQueue *callback_response_q,     MessageQueue *callback_response_q,
     void *callback_ptr)      void* callback_ptr,
       Uint32 flags)
 { {
    PEGASUS_ASSERT(op != 0 && callback != 0);    PEGASUS_ASSERT(op != 0 && callback != 0);
  
    // get the queue handle for the destination      // destination of this message
       op->_op_dest = MessageQueue::lookup(destination);
    op->lock();  
    op->_op_dest = MessageQueue::lookup(destination); // destination of this message  
    op->_flags |= ASYNC_OPFLAGS_CALLBACK;  
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);  
    // initialize the callback data  
    op->_async_callback = callback;   // callback function to be executed by recpt. of response  
    op->_callback_node = op;          // the op node  
    op->_callback_response_q = callback_response_q;  // the queue that will receive the response  
    op->_callback_ptr = callback_ptr;   // user data for callback  
    op->_callback_request_q = this;     // I am the originator of this request  
   
    op->unlock();  
    if (op->_op_dest == 0)    if (op->_op_dest == 0)
       return false;  
   
    return  _meta_dispatcher->route_async(op);  
 }  
   
   
 Boolean MessageQueueService::SendAsync(  
     Message *msg,  
     Uint32 destination,  
     void (*callback)(Message *response, void *handle, void *parameter),  
     void *handle,  
     void *parameter)  
 { {
    if (msg == NULL)  
       return false;  
    if (callback == NULL)  
       return SendForget(msg);  
    AsyncOpNode *op = get_op();  
    msg->dest = destination;  
    if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))  
    {  
       op->release();  
       return_op(op);  
       return false;       return false;
    }    }
    op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;      op->_flags = flags;
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);      // initialize the callback data
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;      // callback function to be executed by recpt. of response
    op->__async_callback = callback;      op->_async_callback = callback;
       // the op node
    op->_callback_node = op;    op->_callback_node = op;
    op->_callback_handle = handle;      // the queue that will receive the response
    op->_callback_parameter = parameter;      op->_callback_response_q = callback_response_q;
    op->_callback_response_q = this;      // user data for callback
       op->_callback_ptr = callback_ptr;
       // I am the originator of this request
       op->_callback_request_q = this;
  
    if (!(msg->getMask() & message_mask::ha_async))  
    {  
       AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(  
          get_next_xid(),  
          op,  
          destination,  
          msg,  
          destination);  
    }  
    else  
    {  
       op->_request.insert_first(msg);  
       (static_cast<AsyncMessage *>(msg))->op = op;  
    }  
    return _meta_dispatcher->route_async(op);    return _meta_dispatcher->route_async(op);
 } }
  
   
 Boolean MessageQueueService::SendForget(Message *msg) Boolean MessageQueueService::SendForget(Message *msg)
 { {
    AsyncOpNode *op = 0;    AsyncOpNode *op = 0;
    Uint32 mask = msg->getMask();    Uint32 mask = msg->getMask();
  
    if (mask & message_mask::ha_async)      if (mask & MessageMask::ha_async)
    {    {
       op = (static_cast<AsyncMessage *>(msg))->op ;       op = (static_cast<AsyncMessage *>(msg))->op ;
    }    }
Line 960 
Line 642 
    if (op == 0)    if (op == 0)
    {    {
       op = get_op();       op = get_op();
       op->_request.insert_first(msg);          op->_request.reset(msg);
       if (mask & message_mask::ha_async)          if (mask & MessageMask::ha_async)
       {       {
          (static_cast<AsyncMessage *>(msg))->op = op;          (static_cast<AsyncMessage *>(msg))->op = op;
       }       }
    }    }
   
       PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(op->_state == ASYNC_OPSTATE_UNKNOWN);
    op->_op_dest = MessageQueue::lookup(msg->dest);    op->_op_dest = MessageQueue::lookup(msg->dest);
    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
        | ASYNC_OPFLAGS_SIMPLE_STATUS);  
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
    if (op->_op_dest == 0)    if (op->_op_dest == 0)
    {    {
       op->release();  
       return_op(op);       return_op(op);
       return false;       return false;
    }    }
  
       op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
   
    // now see if the meta dispatcher will take it    // now see if the meta dispatcher will take it
    return  _meta_dispatcher->route_async(op);    return  _meta_dispatcher->route_async(op);
 } }
Line 993 
Line 675 
    if (request->op == 0)    if (request->op == 0)
    {    {
       request->op = get_op();       request->op = get_op();
       request->op->_request.insert_first(request);          request->op->_request.reset(request);
       destroy_op = true;       destroy_op = true;
    }    }
  
       PEGASUS_ASSERT(request->op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(request->op->_state == ASYNC_OPSTATE_UNKNOWN);
   
    request->block = false;    request->block = false;
    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;      _sendAsync(
    SendAsync(  
       request->op,       request->op,
       request->dest,       request->dest,
       _sendwait_callback,       _sendwait_callback,
       this,       this,
       (void *)0);          (void *)0,
           ASYNC_OPFLAGS_PSEUDO_CALLBACK);
  
    request->op->_client_sem.wait();    request->op->_client_sem.wait();
  
    request->op->lock();      AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());  
    rpl->op = 0;    rpl->op = 0;
    request->op->unlock();  
  
    if (destroy_op == true)    if (destroy_op == true)
    {    {
       request->op->lock();          request->op->_request.release();
       request->op->_request.remove(request);  
       request->op->_state |= ASYNC_OPSTATE_RELEASED;  
       request->op->unlock();  
       return_op(request->op);       return_op(request->op);
       request->op = 0;       request->op = 0;
    }    }
    return rpl;    return rpl;
 } }
  
   Uint32 MessageQueueService::find_service_qid(const String &name)
 Boolean MessageQueueService::register_service(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask)  
 {  
    RegisterCimService *msg = new RegisterCimService(  
       get_next_xid(),  
       0,  
       true,  
       name,  
       capabilities,  
       mask,  
       _queueId);  
    msg->dest = CIMOM_Q_ID;  
   
    Boolean registered = false;  
    AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));  
   
    if (reply != 0)  
    {  
       if (reply->getMask() & message_mask::ha_async)  
       {  
          if (reply->getMask() & message_mask::ha_reply)  
          {          {
             if (reply->result == async_results::OK ||      MessageQueue *queue = MessageQueue::lookup((const char*)name.getCString());
                 reply->result == async_results::MODULE_ALREADY_REGISTERED)      PEGASUS_ASSERT(queue);
             {      return queue->getQueueId();
                 registered = true;  
             }  
          }  
       }  
   
       delete reply;  
    }  
    delete msg;  
    return registered;  
 }  
   
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)  
 {  
    UpdateCimService *msg = new UpdateCimService(  
       get_next_xid(),  
       0,  
       true,  
       _queueId,  
       _capabilities,  
       _mask);  
    Boolean registered = false;  
   
    AsyncMessage *reply = SendWait(msg);  
    if (reply)  
    {  
       if (reply->getMask() & message_mask::ha_async)  
       {  
          if (reply->getMask() & message_mask::ha_reply)  
          {  
             if (static_cast<AsyncReply *>(reply)->result == async_results::OK)  
             {  
                registered = true;  
             }  
          }  
       }  
       delete reply;  
    }  
    delete msg;  
    return registered;  
 }  
   
   
 Boolean MessageQueueService::deregister_service()  
 {  
   
    _meta_dispatcher->deregister_module(_queueId);  
    return true;  
 }  
   
   
 void MessageQueueService::find_services(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask,  
     Array<Uint32> *results)  
 {  
    if (results == 0)  
    {  
       throw NullPointer();  
    }  
   
    results->clear();  
   
    FindServiceQueue *req = new FindServiceQueue(  
       get_next_xid(),  
       0,  
       _queueId,  
       true,  
       name,  
       capabilities,  
       mask);  
   
    req->dest = CIMOM_Q_ID;  
   
    AsyncMessage *reply = SendWait(req);  
    if (reply)  
    {  
       if (reply->getMask() & message_mask::ha_async)  
       {  
          if (reply->getMask() & message_mask::ha_reply)  
          {  
             if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)  
             {  
                if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)  
                   *results = (static_cast<FindServiceQueueResult *>(reply))->qids;  
             }  
          }  
       }  
       delete reply;  
    }  
    delete req;  
    return ;  
 }  
   
 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)  
 {  
    if (result == 0)  
    {  
       throw NullPointer();  
    }  
   
    EnumerateService *req = new EnumerateService(  
       get_next_xid(),  
       0,  
       _queueId,  
       true,  
       queue);  
   
    AsyncMessage *reply = SendWait(req);  
   
    if (reply)  
    {  
       Boolean found = false;  
   
       if (reply->getMask() & message_mask::ha_async)  
       {  
          if (reply->getMask() & message_mask::ha_reply)  
          {  
             if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)  
             {  
                if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)  
                {  
                   if (found == false)  
                   {  
                      found = true;  
   
                      result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);  
                      result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);  
                      result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);  
                      result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);  
                   }  
                }  
             }  
          }  
       }  
       delete reply;  
    }  
    delete req;  
   
    return;  
 }  
   
 Uint32 MessageQueueService::get_next_xid()  
 {  
    static Mutex _monitor;  
    Uint32 value;  
    AutoMutex autoMut(_monitor);  
    _xid++;  
    value =  _xid.value();  
    return value;  
   
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.112  
changed lines
  Added in v.1.155

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2