(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.13 and 1.157

version 1.13, 2002/02/06 15:15:35 version 1.157, 2008/12/09 18:14:20
Line 1 
Line 1 
 //%////-*-c++-*-////////////////////////////////////////////////////////////////  //%LICENSE////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Licensed to The Open Group (TOG) under one or more contributor license
   // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
   // this work for additional information regarding copyright ownership.
   // Each contributor licenses this file to you under the OpenPegasus Open
   // Source License; you may not use this file except in compliance with the
   // License.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy  // Permission is hereby granted, free of charge, to any person obtaining a
 // of this software and associated documentation files (the "Software"), to  // copy of this software and associated documentation files (the "Software"),
 // deal in the Software without restriction, including without limitation the  // to deal in the Software without restriction, including without limitation
 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or  // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 // sell copies of the Software, and to permit persons to whom the Software is  // and/or sell copies of the Software, and to permit persons to whom the
 // furnished to do so, subject to the following conditions:  // Software is furnished to do so, subject to the following conditions:
 // //
 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN  // The above copyright notice and this permission notice shall be included
 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED  // in all copies or substantial portions of the Software.
 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT  
 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR  
 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT  
 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN  
 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION  
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
 // //
 //==============================================================================  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
   // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
   // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
   // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
   // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 // //
 // Author: Mike Day (mdday@us.ibm.com)  //////////////////////////////////////////////////////////////////////////
 //  
 // Modified By:  
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
   #include <Pegasus/Common/Tracer.h>
 PEGASUS_USING_STD;  #include <Pegasus/Common/MessageLoader.h>
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 MessageQueueService::MessageQueueService(const char *name,  cimom *MessageQueueService::_meta_dispatcher = 0;
                                          Uint32 queueID,  AtomicInt MessageQueueService::_service_count(0);
                                          Uint32 capabilities,  Mutex MessageQueueService::_meta_dispatcher_mutex;
                                          Uint32 mask)  
    : Base(name, false,  queueID),  
      _capabilities(capabilities),  
      _mask(mask),  
      _die(0),  
      _pending(true),  
      _incoming(true, 1000),  
      _incoming_queue_shutdown(0),  
      _req_thread(_req_proc, this, false)  
 {  
    _default_op_timeout.tv_sec = 30;  
    _default_op_timeout.tv_usec = 100;  
    _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));  
    if(_meta_dispatcher == 0 )  
       throw NullPointer();  
    _req_thread.run();  
  
 }  static struct timeval deallocateWait = {300, 0};
  
   ThreadPool *MessageQueueService::_thread_pool = 0;
  
 MessageQueueService::~MessageQueueService(void)  MessageQueueService::PollingList* MessageQueueService::_polling_list;
 {  Boolean MessageQueueService::_monitoring_polling_list = false;
    _die = 1;  
    if (_incoming_queue_shutdown.value() == 0 )  
        _incoming.shutdown_queue();  
  
    _req_thread.join();  Thread* MessageQueueService::_polling_thread = 0;
  
 }  /*
       PollingListEntry holds the service and it's status whether the service
 AtomicInt MessageQueueService::_xid(1);      is dead or not. Each service creates its own PollingListEntry and added
       to the PollingList which is monitored by the polling thread. Polling thread
       monitors the service only if it's die flag is not set.
   */
  
 void MessageQueueService::_shutdown_incoming_queue(void)  struct PollingListEntry : public Linkable
 { {
       MessageQueueService *service;
       Boolean die;
  
    if (_incoming_queue_shutdown.value() > 0 )      PollingListEntry(MessageQueueService *service)
       return ;          :service(service),
            die(false)
       {
       }
       ~PollingListEntry()
       {
       }
   private:
       PollingListEntry(const PollingListEntry&);
       PollingListEntry& operator = (const PollingListEntry&);
   };
  
    AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),  ThreadPool *MessageQueueService::get_thread_pool()
                                     0,  {
                                     _queueId,     return _thread_pool;
                                     _queueId,  }
                                     true,  
                                     AsyncIoctl::IO_CLOSE,  
                                     0,  
                                     0);  
  
    msg->op = get_op();  //
    msg->op->_request.insert_first(msg);  // MAX_THREADS_PER_SVC_QUEUE
   //
   // JR Wunderlich Jun 6, 2005
   //
  
   #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
   #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  
   #ifndef MAX_THREADS_PER_SVC_QUEUE
   # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
   #endif
  
    _incoming.insert_last_wait(msg->op);  Uint32 max_threads_per_svc_queue;
    msg->op->_client_sem.wait();  
  
    msg->op->lock();  ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
    AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());      void* parm)
    reply->op = 0;  {
    msg->op->unlock();      Thread *myself = reinterpret_cast<Thread *>(parm);
    delete reply;      MessageQueueService::PollingList *list =
           reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm());
  
    msg->op->_request.remove(msg);      while (_stop_polling.get()  == 0)
    msg->op->_state |= ASYNC_OPSTATE_RELEASED;      {
    return_op(msg->op);          _polling_sem.wait();
  
    msg->op = 0;          if (_stop_polling.get() != 0)
    delete msg;          {
               break;
 } }
  
           PollingListEntry *entry = list->front();
           ThreadStatus rtn = PEGASUS_THREAD_OK;
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)          // Setting this flag to 'true' will prevent race condition between
 {          // the polling thread and thread executing the MessageQueueService
    Thread *myself = reinterpret_cast<Thread *>(parm);          // destructor.
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());          PEGASUS_ASSERT(_monitoring_polling_list == false);
           _monitoring_polling_list = true;
    // pull messages off the incoming queue and dispatch them. then  
    // check pending messages that are non-blocking          do
    AsyncOpNode *operation = 0;          {
               MessageQueueService *service = entry->service;
               // Note: MessageQueueService destructor sets die flag when service
               // gets destroyed during CIMOM shutdown. Don't monitor the service
               // if die flag set.
               if ((entry->die == false) &&
                   (service->_incoming.count() > 0) &&
                   (service->_threads.get() < max_threads_per_svc_queue))
               {
                   // The _threads count is used to track the
                   // number of active threads that have been allocated
                   // to process messages for this service.
  
    while ( service->_die.value() == 0 )                  service->_threads++;
    {  
       try       try
       {       {
          operation = service->_incoming.remove_first_wait();                      rtn = _thread_pool->allocate_and_awaken(
                           service, _req_proc, &_polling_sem);
       }       }
       catch(ListClosed & )                  catch (...)
       {       {
          break;                      service->_threads--;
   
                       // allocate_and_awaken should never generate an exception.
                       PEGASUS_ASSERT(0);
       }       }
       if( operation )                  // if no more threads available, break from processing loop
                   if (rtn != PEGASUS_THREAD_OK )
       {       {
                       service->_threads--;
                       PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
                           "Could not allocate thread for %s.  Queue has %d "
                               "messages waiting and %d threads servicing."
                               "Skipping the service for right now. ",
                           service->getQueueName(),
                           service->_incoming.count(),
                           service->_threads.get()));
  
          service->_handle_incoming_operation(operation, myself, service);                      Threads::yield();
                       break;
       }       }
    }    }
               entry = list->next_of(entry);
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );          } while (entry != NULL);
    return(0);          _monitoring_polling_list = false;
       }
       return ThreadReturnType(0);
 } }
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,  Semaphore MessageQueueService::_polling_sem(0);
                                                      Thread *thread,  AtomicInt MessageQueueService::_stop_polling(0);
                                                      MessageQueue *queue)  
 {  
    if ( operation != 0 )  MessageQueueService::MessageQueueService(
       const char* name,
       Uint32 queueID)
       : Base(name, true,  queueID),
         _threads(0),
         _incoming(),
         _incoming_queue_shutdown(0)
    {    {
       operation->lock();      _isRunning = true;
       Message *rq = operation->_request.next(0);  
       operation->unlock();  
  
       PEGASUS_ASSERT(rq != 0 );      max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );  
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);  
       static_cast<AsyncMessage *>(rq)->_myself = thread;  
       static_cast<AsyncMessage *>(rq)->_service = queue;  
       _handle_async_request(static_cast<AsyncRequest *>(rq));  
    }  
  
    return;      // if requested thread max is out of range, then set to
       // MAX_THREADS_PER_SVC_QUEUE_LIMIT
  
       if ((max_threads_per_svc_queue < 1) ||
           (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
       {
           max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 } }
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req)      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
 {         "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
    if ( req != 0 )  
    {  
       req->op->processing();  
  
       Uint32 type = req->getType();      AutoMutex mtx(_meta_dispatcher_mutex);
       if( type == async_messages::HEARTBEAT )  
          handle_heartbeat_request(req);      if (_meta_dispatcher == 0)
       else if (type == async_messages::IOCTL)  
          handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));  
       else if (type == async_messages::CIMSERVICE_START)  
          handle_CimServiceStart(static_cast<CimServiceStart *>(req));  
       else if (type == async_messages::CIMSERVICE_STOP)  
          handle_CimServiceStop(static_cast<CimServiceStop *>(req));  
       else if (type == async_messages::CIMSERVICE_PAUSE)  
       {       {
          handle_CimServicePause(static_cast<CimServicePause *>(req));          _stop_polling = 0;
           PEGASUS_ASSERT(_service_count.get() == 0);
           _meta_dispatcher = new cimom();
   
           //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
           //   minimum_cnt, maximum_cnt, deallocateWait);
           //
           _thread_pool =
               new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
       }       }
       _service_count++;
  
       else if (type == async_messages::CIMSERVICE_RESUME)      // Add to the polling list
          handle_CimServiceResume(static_cast<CimServiceResume *>(req));      if (!_polling_list)
       else if ( type == async_messages::ASYNC_OP_START)  
          handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));  
       else  
       {       {
          // we don't handle this request message          _polling_list = new PollingList;
          _make_response(req, async_results::CIM_NAK );  
       }  
    }    }
       pollingListEntry = new PollingListEntry(this);
       _polling_list->insert_back(pollingListEntry);
 } }
  
 void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code)  
   MessageQueueService::~MessageQueueService()
 { {
    AsyncReply *reply =      // Close incoming queue.
       new AsyncReply(async_messages::REPLY,      if (_incoming_queue_shutdown.get() == 0)
                      req->getKey(),      {
                      req->getRouting(),          AsyncIoClose *msg = new AsyncIoClose(
                      0,                      0,
                      req->op,              _queueId,
                      code,              _queueId,
                      req->resp,              true);
                      false);          SendForget(msg);
    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );          // Wait until our queue has been shutdown.
           while (_incoming_queue_shutdown.get() == 0)
           {
               Threads::yield();
           }
 } }
  
       // Die now. Setting this flag to true instructs the polling thread not to
       // monitor this service.
       pollingListEntry->die = true;
  
 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,      // Wait until all threads processing the messages
                                                 AsyncReply *reply,      // for this service have completed.
                                                 Uint32 state,      while (_threads.get() > 0)
                                                 Uint32 flag)  
 { {
    PEGASUS_ASSERT(request != 0  && reply != 0 );          Threads::yield();
   
    AsyncOpNode *op = request->op;  
    op->lock();  
    op->_state |= state ;  
    op->_flags |= flag;  
    gettimeofday(&(op->_updated), NULL);  
    if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) )  
       op->_response.insert_last(reply);  
    op->unlock();  
   
    op->_client_sem.signal();  
   
   
 } }
  
       // Wait until monitoring the polling list is done.
       while (_monitoring_polling_list)
       {
           Threads::yield();
       }
  
       {
           AutoMutex mtx(_meta_dispatcher_mutex);
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op)          _service_count--;
           // If we are last service to die, delete metadispatcher.
           if (_service_count.get() == 0)
 { {
    if (_incoming_queue_shutdown.value() > 0 )              _stop_polling++;
       return false;              _polling_sem.signal();
               if (_polling_thread)
               {
                   _polling_thread->join();
                   delete _polling_thread;
                   _polling_thread = 0;
               }
               delete _meta_dispatcher;
               _meta_dispatcher = 0;
  
    op->lock();              delete _thread_pool;
    Message *rq = op->_request.next(0);              _thread_pool = 0;
    op->unlock();  
  
    if( true == messageOK(rq) &&  _die.value() == 0  )              // Cleanup polling list
               PollingListEntry *entry;
               while ((entry = _polling_list->remove_front()))
    {    {
       _incoming.insert_last_wait(op);                  delete entry;
       return true;              }
    }    }
    return false;  
 } }
  
 Boolean MessageQueueService::messageOK(const Message *msg)      // Clean up any extra stuff on the queue.
       AsyncOpNode* op = 0;
       while ((op = _incoming.dequeue()))
 { {
    if (_incoming_queue_shutdown.value() > 0 )          delete op;
       return false;      }
   }
  
    if ( msg != 0 )  void MessageQueueService::enqueue(Message* msg)
    {    {
       Uint32 mask = msg->getMask();      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
       if ( mask & message_mask::ha_async)  
          if ( mask & message_mask::ha_request)      Base::enqueue(msg);
             return true;  
    }      PEG_METHOD_EXIT();
    return false;  
 } }
  
  
 void MessageQueueService::handleEnqueue(void)  ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
       void* parm)
 { {
    Message *msg = dequeue();      MessageQueueService* service =
    if( msg )          reinterpret_cast<MessageQueueService*>(parm);
       PEGASUS_ASSERT(service != 0);
       try
    {    {
       delete msg;          if (service->pollingListEntry->die)
    }          {
               service->_threads--;
               return 0;
 } }
           // pull messages off the incoming queue and dispatch them. then
           // check pending messages that are non-blocking
           AsyncOpNode *operation = 0;
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)          // many operations may have been queued.
           do
 { {
    // default action is to echo a heartbeat response              operation = service->_incoming.dequeue();
  
    AsyncReply *reply =              if (operation)
       new AsyncReply(async_messages::HEARTBEAT,              {
                      req->getKey(),                 operation->_service_ptr = service;
                      req->getRouting(),                 service->_handle_incoming_operation(operation);
                      0,              }
                      req->op,          } while (operation);
                      async_results::OK,      }
                      req->resp,      catch (const Exception& e)
                      false);      {
    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );          PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
               "Caught exception: \"%s\".  Exiting _req_proc.",
               (const char*)e.getMessage().getCString()));
       }
       catch (...)
       {
           PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
               "Caught unrecognized exception.  Exiting _req_proc.");
       }
       service->_threads--;
       return 0;
 } }
  
  
 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)  void MessageQueueService::_sendwait_callback(
       AsyncOpNode* op,
       MessageQueue* q,
       void *parm)
 { {
    ;      op->_client_sem.signal();
 } }
  
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)  
 {  
  
    switch( req->ctl )  // callback function is responsible for cleaning up all resources
    {  // including op, op->_callback_node, and op->_callback_ptr
       case AsyncIoctl::IO_CLOSE:  void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
       {       {
          // save my bearings      PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_CALLBACK);
          Thread *myself = req->_myself;      // note that _callback_node may be different from op
          MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);      // op->_callback_response_q is a "this" pointer we can use for
       // static callback methods
          // respond to this message.      op->_async_callback(
          _make_response(req, async_results::OK);          op->_callback_node, op->_callback_response_q, op->_callback_ptr);
          // ensure we do not accept any further messages  }
  
          // ensure we don't recurse on IO_CLOSE  
          if( _incoming_queue_shutdown.value() > 0 )  
             break;  
  
          // set the closing flag  void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
          service->_incoming_queue_shutdown = 1;  
          // empty out the queue  
          while( 1 )  
          {          {
             AsyncOpNode *operation;      if (operation != 0)
             try  
             {             {
                operation = service->_incoming.remove_first();          Message *rq = operation->_request.get();
   
   // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
   // move this to the bottom of the loop when the majority of
   // messages become async messages.
   
           // divert legacy messages to handleEnqueue
           if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
           {
               operation->_request.release();
               // delete the op node
               return_op(operation);
               handleEnqueue(rq);
               return;
             }             }
             catch(IPCException & )  
           if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK) &&
               (operation->_state & ASYNC_OPSTATE_COMPLETE))
             {             {
                break;              _handle_async_callback(operation);
             }             }
             if( operation )          else
             {             {
                service->_handle_incoming_operation(operation, myself, service);              PEGASUS_ASSERT(rq != 0);
               _handle_async_request(static_cast<AsyncRequest *>(rq));
           }
             }             }
             else  
                break;  
          } // message processing loop  
   
          // shutdown the AsyncDQueue  
          service->_incoming.shutdown_queue();  
          // exit the thread !  
          myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  
          return;          return;
       }       }
  
       default:  void MessageQueueService::_handle_async_request(AsyncRequest *req)
          _make_response(req, async_results::CIM_NAK);  {
    }      MessageType type = req->getType();
       if (type == ASYNC_IOCLOSE)
       {
           handle_AsyncIoClose(static_cast<AsyncIoClose*>(req));
 } }
       else if (type == ASYNC_CIMSERVICE_START)
 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)  
 { {
    // clear the stoped bit and update          handle_CimServiceStart(static_cast<CimServiceStart *>(req));
    _capabilities &= (~(module_capabilities::stopped));  
    _make_response(req, async_results::OK);  
    // now tell the meta dispatcher we are stopped  
    update_service(_capabilities, _mask);  
   
 } }
 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)      else if (type == ASYNC_CIMSERVICE_STOP)
 { {
    // set the stopeed bit and update          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
    _capabilities |= module_capabilities::stopped;  
    _make_response(req, async_results::CIM_STOPPED);  
    // now tell the meta dispatcher we are stopped  
    update_service(_capabilities, _mask);  
   
 }  
 void MessageQueueService::handle_CimServicePause(CimServicePause *req)  
 {  
    // set the paused bit and update  
    _capabilities |= module_capabilities::paused;  
    update_service(_capabilities, _mask);  
    _make_response(req, async_results::CIM_PAUSED);  
    // now tell the meta dispatcher we are stopped  
 }  
 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)  
 {  
    // clear the paused  bit and update  
    _capabilities &= (~(module_capabilities::paused));  
    update_service(_capabilities, _mask);  
    _make_response(req, async_results::OK);  
    // now tell the meta dispatcher we are stopped  
 } }
       else
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)  
 { {
           // we don't handle this request message
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
 } }
   
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)  
 {  
   
 } }
  
 AsyncOpNode *MessageQueueService::get_op(void)  Boolean MessageQueueService::_enqueueResponse(
       Message* request,
       Message* response)
 { {
    AsyncOpNode *op = new AsyncOpNode();      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
           "MessageQueueService::_enqueueResponse");
  
    op->_state = ASYNC_OPSTATE_UNKNOWN;      if (request->getMask() & MessageMask::ha_async)
    op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;      {
           if (response->getMask() & MessageMask::ha_async)
           {
               _completeAsyncResponse(
                   static_cast<AsyncRequest *>(request),
                   static_cast<AsyncReply *>(response));
  
    return op;              PEG_METHOD_EXIT();
               return true;
 } }
   
 void MessageQueueService::return_op(AsyncOpNode *op)  
 {  
    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );  
    delete op;  
 } }
  
       AsyncRequest* asyncRequest =
           static_cast<AsyncRequest*>(request->get_async());
  
       if (asyncRequest != 0)
 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)  
 { {
    if ( request == 0 )          PEGASUS_ASSERT(asyncRequest->getMask() &
       return 0 ;              (MessageMask::ha_async | MessageMask::ha_request));
  
    Boolean destroy_op = false;          AsyncOpNode* op = asyncRequest->op;
   
    if (request->op == false)  
    {  
       request->op = get_op();  
       request->op->_request.insert_first(request);  
       destroy_op = true;  
    }  
  
    request->block = true;          // the legacy request is going to be deleted by its handler
    request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;          // remove it from the op node
    request->op->put_response(0);  
  
    // first link it on our pending list          static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action();
    // _pending.insert_last_wait(request->op);  
  
    // now see if the meta dispatcher will take it          AsyncLegacyOperationResult *async_result =
               new AsyncLegacyOperationResult(
                   op,
                   response);
           _completeAsyncResponse(
               asyncRequest,
               async_result);
  
    if (true == _meta_dispatcher->route_async(request->op))          PEG_METHOD_EXIT();
    {          return true;
       request->op->_client_sem.wait();      }
       PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);  
  
       // ensure that the destination queue is in response->dest
       PEG_METHOD_EXIT();
       return SendForget(response);
    }    }
  
    request->op->lock();  void MessageQueueService::_make_response(Message* req, Uint32 code)
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());  {
    rpl->op = 0;      cimom::_make_response(req, code);
    request->op->unlock();  }
  
    if( destroy_op == true)  void MessageQueueService::_completeAsyncResponse(
       AsyncRequest* request,
       AsyncReply* reply)
    {    {
       request->op->lock();      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
       request->op->_request.remove(request);          "MessageQueueService::_completeAsyncResponse");
       request->op->_state |= ASYNC_OPSTATE_RELEASED;  
       request->op->unlock();  
  
       return_op(request->op);      cimom::_completeAsyncResponse(request, reply);
       request->op = 0;  
    }  
  
    return rpl;      PEG_METHOD_EXIT();
 } }
  
  
 Boolean MessageQueueService::register_service(String name,  void MessageQueueService::_complete_op_node(
                                               Uint32 capabilities,      AsyncOpNode* op)
                                               Uint32 mask)  
   
 { {
    RegisterCimService *msg = new RegisterCimService(get_next_xid(),      cimom::_complete_op_node(op);
                                                     0,  }
                                                     true,  
                                                     name,  
                                                     capabilities,  
                                                     mask,  
                                                     _queueId);  
    Boolean registered = false;  
    AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));  
  
    if ( reply != 0 )  Boolean MessageQueueService::accept_async(AsyncOpNode* op)
    {    {
       if(reply->getMask() & message_mask:: ha_async)      if (_incoming_queue_shutdown.get() > 0)
           return false;
       if (_polling_thread == NULL)
       {       {
          if(reply->getMask() & message_mask::ha_reply)          PEGASUS_ASSERT(_polling_list);
           _polling_thread = new Thread(
               polling_routine,
               reinterpret_cast<void *>(_polling_list),
               false);
           ThreadStatus tr = PEGASUS_THREAD_OK;
           while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
          {          {
             if(reply->result == async_results::OK)              if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
                registered = true;                  Threads::yield();
               else
                   throw Exception(MessageLoaderParms(
                       "Common.MessageQueueService.NOT_ENOUGH_THREAD",
                       "Could not allocate thread for the polling thread."));
          }          }
       }       }
       if (pollingListEntry->die == false)
       delete reply;      {
           if (_incoming.enqueue(op))
           {
               _polling_sem.signal();
               return true;
           }
    }    }
    delete msg;      return false;
    return registered;  
 } }
  
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)  void MessageQueueService::handle_AsyncIoClose(AsyncIoClose *req)
 { {
       MessageQueueService *service =
           static_cast<MessageQueueService*>(req->op->_op_dest);
  
   #ifdef MESSAGEQUEUESERVICE_DEBUG
       PEGASUS_STD(cout) << service->getQueueName() <<
           " Received AsyncIoClose " << PEGASUS_STD(endl);
   #endif
       // set the closing flag, don't accept any more messages
       service->_incoming_queue_shutdown = 1;
  
    UpdateCimService *msg = new UpdateCimService(get_next_xid(),      // respond to this message. this is fire and forget, so we
                                                 0,      // don't need to delete anything.
                                                 true,      // this takes care of two problems that were being found
                                                 _queueId,      // << Thu Oct  9 10:52:48 2003 mdd >>
                                                 _capabilities,      _make_response(req, async_results::OK);
                                                 _mask);  }
    Boolean registered = false;  
  
    AsyncMessage *reply = SendWait(msg);  void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
    if (reply)  
    {  
       if(reply->getMask() & message_mask:: ha_async)  
       {  
          if(reply->getMask() & message_mask::ha_reply)  
          {          {
             if(static_cast<AsyncReply *>(reply)->result == async_results::OK)  #ifdef MESSAGEQUEUESERVICE_DEBUG
                registered = true;      PEGASUS_STD(cout) << getQueueName() << "received START" <<
          }          PEGASUS_STD(endl);
       }  #endif
       delete reply;      PEGASUS_ASSERT(!_isRunning);
    }      _isRunning = true;
    delete msg;      _make_response(req, async_results::OK);
    return registered;  
 } }
  
   void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
   {
   #ifdef MESSAGEQUEUESERVICE_DEBUG
       PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
   #endif
       PEGASUS_ASSERT(_isRunning);
       _isRunning = false;
       _make_response(req, async_results::CIM_SERVICE_STOPPED);
   }
  
 Boolean MessageQueueService::deregister_service(void)  AsyncOpNode* MessageQueueService::get_op()
 { {
      AsyncOpNode* op = new AsyncOpNode();
  
    _meta_dispatcher->deregister_module(_queueId);     op->_state = ASYNC_OPSTATE_UNKNOWN;
    return true;     op->_flags = ASYNC_OPFLAGS_UNKNOWN;
 }  
  
      return op;
   }
  
 void MessageQueueService::find_services(String name,  void MessageQueueService::return_op(AsyncOpNode* op)
                                         Uint32 capabilities,  
                                         Uint32 mask,  
                                         Array<Uint32> *results)  
 { {
       delete op;
   }
  
    if( results == 0 )  
       throw NullPointer();  
  
    results->clear();  Boolean MessageQueueService::SendAsync(
       AsyncOpNode* op,
       Uint32 destination,
       void (*callback)(AsyncOpNode*, MessageQueue*, void*),
       MessageQueue* callback_response_q,
       void* callback_ptr)
   {
       return _sendAsync(
           op,
           destination,
           callback,
           callback_response_q,
           callback_ptr,
           ASYNC_OPFLAGS_CALLBACK);
  
    FindServiceQueue *req =  }
       new FindServiceQueue(get_next_xid(),  
                            0,  
                            _queueId,  
                            true,  
                            name,  
                            capabilities,  
                            mask);  
  
    AsyncMessage *reply = SendWait(req);  Boolean MessageQueueService::_sendAsync(
    if(reply)      AsyncOpNode* op,
       Uint32 destination,
       void (*callback)(AsyncOpNode*, MessageQueue*, void*),
       MessageQueue* callback_response_q,
       void* callback_ptr,
       Uint32 flags)
   {
       PEGASUS_ASSERT(op != 0 && callback != 0);
   
       // destination of this message
       op->_op_dest = MessageQueue::lookup(destination);
       if (op->_op_dest == 0)
    {    {
       if( reply->getMask() & message_mask::ha_async)          return false;
       {  
          if(reply->getMask() & message_mask::ha_reply)  
          {  
             if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)  
             {  
                if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )  
                   *results = (static_cast<FindServiceQueueResult *>(reply))->qids;  
             }  
          }  
       }  
       delete reply;  
    }    }
    delete req;      op->_flags = flags;
    return ;      // initialize the callback data
       // callback function to be executed by recpt. of response
       op->_async_callback = callback;
       // the op node
       op->_callback_node = op;
       // the queue that will receive the response
       op->_callback_response_q = callback_response_q;
       // user data for callback
       op->_callback_ptr = callback_ptr;
       // I am the originator of this request
       op->_callback_request_q = this;
   
       return  _meta_dispatcher->route_async(op);
 } }
  
 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)  Boolean MessageQueueService::SendForget(Message* msg)
 { {
    if(result == 0)      AsyncOpNode* op = 0;
       throw NullPointer();      Uint32 mask = msg->getMask();
   
    EnumerateService *req  
       = new EnumerateService(get_next_xid(),  
                              0,  
                              _queueId,  
                              true,  
                              queue);  
   
    AsyncMessage *reply = SendWait(req);  
  
    if (reply)      if (mask & MessageMask::ha_async)
    {    {
       Boolean found = false;          op = (static_cast<AsyncMessage *>(msg))->op;
       }
  
       if( reply->getMask() & message_mask::ha_async)      if (op == 0)
       {  
          if(reply->getMask() & message_mask::ha_reply)  
          {  
             if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)  
             {             {
                if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )          op = get_op();
           op->_request.reset(msg);
           if (mask & MessageMask::ha_async)
                {                {
                   if( found == false)              (static_cast<AsyncMessage *>(msg))->op = op;
                   {  
                      found = true;  
   
                      result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);  
                      result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);  
                      result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);  
                      result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);  
                   }  
                }                }
             }             }
   
       PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(op->_state == ASYNC_OPSTATE_UNKNOWN);
       op->_op_dest = MessageQueue::lookup(msg->dest);
       if (op->_op_dest == 0)
       {
           return_op(op);
           return false;
          }          }
   
       op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
   
       // now see if the meta dispatcher will take it
       return  _meta_dispatcher->route_async(op);
       }       }
       delete reply;  
   
   AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
   {
       if (request == 0)
           return 0;
   
       Boolean destroy_op = false;
   
       if (request->op == 0)
       {
           request->op = get_op();
           request->op->_request.reset(request);
           destroy_op = true;
    }    }
    delete req;  
  
    return;      PEGASUS_ASSERT(request->op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(request->op->_state == ASYNC_OPSTATE_UNKNOWN);
   
       request->block = false;
       _sendAsync(
           request->op,
           request->dest,
           _sendwait_callback,
           this,
           (void *)0,
           ASYNC_OPFLAGS_PSEUDO_CALLBACK);
   
       request->op->_client_sem.wait();
   
       AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
       rpl->op = 0;
   
       if (destroy_op == true)
       {
           request->op->_request.release();
           return_op(request->op);
           request->op = 0;
       }
       return rpl;
 } }
  
 Uint32 MessageQueueService::get_next_xid(void)  Uint32 MessageQueueService::find_service_qid(const String &name)
 { {
    _xid++;      MessageQueue *queue = MessageQueue::lookup((const char*)name.getCString());
    return _xid.value();      PEGASUS_ASSERT(queue);
       return queue->getQueueId();
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.13  
changed lines
  Added in v.1.157

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2