(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.138 and 1.157

version 1.138, 2008/06/19 17:57:01 version 1.157, 2008/12/09 18:14:20
Line 1 
Line 1 
 //%2006////////////////////////////////////////////////////////////////////////  //%LICENSE////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development  // Licensed to The Open Group (TOG) under one or more contributor license
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.  // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;  // this work for additional information regarding copyright ownership.
 // IBM Corp.; EMC Corporation, The Open Group.  // Each contributor licenses this file to you under the OpenPegasus Open
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;  // Source License; you may not use this file except in compliance with the
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.  // License.
 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;  
 // EMC Corporation; VERITAS Software Corporation; The Open Group.  
 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;  
 // EMC Corporation; Symantec Corporation; The Open Group.  
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy  // Permission is hereby granted, free of charge, to any person obtaining a
 // of this software and associated documentation files (the "Software"), to  // copy of this software and associated documentation files (the "Software"),
 // deal in the Software without restriction, including without limitation the  // to deal in the Software without restriction, including without limitation
 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or  // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 // sell copies of the Software, and to permit persons to whom the Software is  // and/or sell copies of the Software, and to permit persons to whom the
 // furnished to do so, subject to the following conditions:  // Software is furnished to do so, subject to the following conditions:
 // //
 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN  // The above copyright notice and this permission notice shall be included
 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED  // in all copies or substantial portions of the Software.
 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT  
 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR  
 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT  
 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN  
 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION  
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
 // //
 //==============================================================================  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
   // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
   // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
   // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
   // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   //
   //////////////////////////////////////////////////////////////////////////
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
Line 46 
Line 44 
 ThreadPool *MessageQueueService::_thread_pool = 0; ThreadPool *MessageQueueService::_thread_pool = 0;
  
 MessageQueueService::PollingList* MessageQueueService::_polling_list; MessageQueueService::PollingList* MessageQueueService::_polling_list;
 Mutex MessageQueueService::_polling_list_mutex;  Boolean MessageQueueService::_monitoring_polling_list = false;
  
 Thread* MessageQueueService::_polling_thread = 0; Thread* MessageQueueService::_polling_thread = 0;
  
   /*
       PollingListEntry holds the service and it's status whether the service
       is dead or not. Each service creates its own PollingListEntry and added
       to the PollingList which is monitored by the polling thread. Polling thread
       monitors the service only if it's die flag is not set.
   */
   
   struct PollingListEntry : public Linkable
   {
       MessageQueueService *service;
       Boolean die;
   
       PollingListEntry(MessageQueueService *service)
           :service(service),
            die(false)
       {
       }
       ~PollingListEntry()
       {
       }
   private:
       PollingListEntry(const PollingListEntry&);
       PollingListEntry& operator = (const PollingListEntry&);
   };
   
 ThreadPool *MessageQueueService::get_thread_pool() ThreadPool *MessageQueueService::get_thread_pool()
 { {
    return _thread_pool;    return _thread_pool;
Line 74 
Line 97 
     void* parm)     void* parm)
 { {
     Thread *myself = reinterpret_cast<Thread *>(parm);     Thread *myself = reinterpret_cast<Thread *>(parm);
     List<MessageQueueService, Mutex> *list =      MessageQueueService::PollingList *list =
         reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());          reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm());
  
     while (_stop_polling.get()  == 0)     while (_stop_polling.get()  == 0)
     {     {
Line 86 
Line 109 
             break;             break;
         }         }
  
         // The polling_routine thread must hold the lock on the          PollingListEntry *entry = list->front();
         // _polling_list while processing incoming messages.  
         // This lock is used to give this thread ownership of  
         // services on the _polling_routine list.  
   
         // This is necessary to avoid confict with other threads  
         // processing the _polling_list  
         // (e.g., MessageQueueServer::~MessageQueueService).  
   
         list->lock();  
         MessageQueueService *service = list->front();  
         ThreadStatus rtn = PEGASUS_THREAD_OK;         ThreadStatus rtn = PEGASUS_THREAD_OK;
         while (service != NULL)  
           // Setting this flag to 'true' will prevent race condition between
           // the polling thread and thread executing the MessageQueueService
           // destructor.
           PEGASUS_ASSERT(_monitoring_polling_list == false);
           _monitoring_polling_list = true;
   
           do
         {         {
             if ((service->_incoming.count() > 0) &&              MessageQueueService *service = entry->service;
                 (service->_die.get() == 0) &&              // Note: MessageQueueService destructor sets die flag when service
               // gets destroyed during CIMOM shutdown. Don't monitor the service
               // if die flag set.
               if ((entry->die == false) &&
                   (service->_incoming.count() > 0) &&
                 (service->_threads.get() < max_threads_per_svc_queue))                 (service->_threads.get() < max_threads_per_svc_queue))
             {             {
                 // The _threads count is used to track the                 // The _threads count is used to track the
                 // number of active threads that have been allocated                 // number of active threads that have been allocated
                 // to process messages for this service.                 // to process messages for this service.
  
                 // The _threads count MUST be incremented while  
                 // the polling_routine owns the _polling_thread  
                 // lock and has ownership of the service object.  
   
                 service->_threads++;                 service->_threads++;
                 try                 try
                 {                 {
Line 138 
Line 158 
                         service->_threads.get()));                         service->_threads.get()));
  
                     Threads::yield();                     Threads::yield();
                     service = NULL;                      break;
                 }  
             }  
             if (service != NULL)  
             {  
                 service = list->next_of(service);  
             }             }
         }         }
         list->unlock();              entry = list->next_of(entry);
           } while (entry != NULL);
           _monitoring_polling_list = false;
     }     }
     return ThreadReturnType(0);     return ThreadReturnType(0);
 } }
Line 158 
Line 175 
  
 MessageQueueService::MessageQueueService( MessageQueueService::MessageQueueService(
     const char* name,     const char* name,
     Uint32 queueID,      Uint32 queueID)
     Uint32 capabilities,  
     Uint32 mask)  
     : Base(name, true,  queueID),     : Base(name, true,  queueID),
       _mask(mask),  
       _die(0),  
       _threads(0),       _threads(0),
       _incoming(),       _incoming(),
       _incoming_queue_shutdown(0)       _incoming_queue_shutdown(0)
 { {
     _capabilities = (capabilities | module_capabilities::async);      _isRunning = true;
   
     _default_op_timeout.tv_sec = 30;  
     _default_op_timeout.tv_usec = 100;  
  
     max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;     max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
  
Line 187 
Line 197 
     PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,     PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
        "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));        "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
  
     AutoMutex autoMut(_meta_dispatcher_mutex);      AutoMutex mtx(_meta_dispatcher_mutex);
  
     if (_meta_dispatcher == 0)     if (_meta_dispatcher == 0)
     {     {
Line 203 
Line 213 
     }     }
     _service_count++;     _service_count++;
  
     if (false == register_service(name, _capabilities, _mask))      // Add to the polling list
       if (!_polling_list)
     {     {
         MessageLoaderParms parms(          _polling_list = new PollingList;
             "Common.MessageQueueService.UNABLE_TO_REGISTER",  
             "CIM base message queue service is unable to register with the "  
                 "CIMOM dispatcher.");  
         throw BindFailedException(parms);  
     }     }
       pollingListEntry = new PollingListEntry(this);
     _get_polling_list()->insert_back(this);      _polling_list->insert_back(pollingListEntry);
 } }
  
  
 MessageQueueService::~MessageQueueService() MessageQueueService::~MessageQueueService()
 { {
     _die = 1;      // Close incoming queue.
   
     // The polling_routine locks the _polling_list while  
     // processing the incoming messages for services on the  
     // list.  Deleting the service from the _polling_list  
     // prior to processing, avoids synchronization issues  
     // with the _polling_routine.  
   
     // ATTN: added to prevent assertion in List in which the list does not  
     // contain this element.  
   
     if (_get_polling_list()->contains(this))  
         _get_polling_list()->remove(this);  
   
     // ATTN: The code for closing the _incoming queue  
     // is not working correctly. In OpenPegasus 2.5,  
     // execution of the following code is very timing  
     // dependent. This needs to be fix.  
     // See Bug 4079 for details.  
     if (_incoming_queue_shutdown.get() == 0)     if (_incoming_queue_shutdown.get() == 0)
     {     {
         _shutdown_incoming_queue();          AsyncIoClose *msg = new AsyncIoClose(
               0,
               _queueId,
               _queueId,
               true);
           SendForget(msg);
           // Wait until our queue has been shutdown.
           while (_incoming_queue_shutdown.get() == 0)
           {
               Threads::yield();
     }     }
       }
   
       // Die now. Setting this flag to true instructs the polling thread not to
       // monitor this service.
       pollingListEntry->die = true;
  
     // Wait until all threads processing the messages     // Wait until all threads processing the messages
     // for this service have completed.     // for this service have completed.
   
     while (_threads.get() > 0)     while (_threads.get() > 0)
     {     {
         Threads::yield();         Threads::yield();
     }     }
  
       // Wait until monitoring the polling list is done.
       while (_monitoring_polling_list)
       {
           Threads::yield();
       }
   
     {     {
         AutoMutex autoMut(_meta_dispatcher_mutex);          AutoMutex mtx(_meta_dispatcher_mutex);
   
         _service_count--;         _service_count--;
           // If we are last service to die, delete metadispatcher.
         if (_service_count.get() == 0)         if (_service_count.get() == 0)
         {         {
   
             _stop_polling++;             _stop_polling++;
             _polling_sem.signal();             _polling_sem.signal();
             if (_polling_thread)             if (_polling_thread)
Line 264 
Line 273 
                 delete _polling_thread;                 delete _polling_thread;
                 _polling_thread = 0;                 _polling_thread = 0;
             }             }
             _meta_dispatcher->_shutdown_routed_queue();  
             delete _meta_dispatcher;             delete _meta_dispatcher;
             _meta_dispatcher = 0;             _meta_dispatcher = 0;
  
             delete _thread_pool;             delete _thread_pool;
             _thread_pool = 0;             _thread_pool = 0;
         }  
     } // mutex unlocks here              // Cleanup polling list
     // Clean up in case there are extra stuff on the queue.              PollingListEntry *entry;
     while (_incoming.count())              while ((entry = _polling_list->remove_front()))
     {  
         try  
         {         {
             delete _incoming.dequeue();                  delete entry;
         }         }
         catch (const ListClosed&)  
         {  
             // If the list is closed, there is nothing we can do.  
             break;  
         }         }
     }     }
 }  
   
 void MessageQueueService::_shutdown_incoming_queue()  
 {  
     if (_incoming_queue_shutdown.get() > 0)  
         return;  
   
     AsyncIoctl *msg = new AsyncIoctl(  
         0,  
         _queueId,  
         _queueId,  
         true,  
         AsyncIoctl::IO_CLOSE,  
         0,  
         0);  
  
     msg->op = get_op();      // Clean up any extra stuff on the queue.
     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;      AsyncOpNode* op = 0;
     msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK      while ((op = _incoming.dequeue()))
         | ASYNC_OPFLAGS_SIMPLE_STATUS);  
     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
   
     msg->op->_op_dest = this;  
     msg->op->_request.reset(msg);  
     try  
     {  
         _incoming.enqueue_wait(msg->op);  
         _polling_sem.signal();  
     }  
     catch (const ListClosed&)  
     {  
         // This means the queue has already been shut-down (happens  when there  
         // are two AsyncIoctrl::IO_CLOSE messages generated and one got first  
         // processed.  
         delete msg;  
     }  
     catch (const Permission&)  
     {     {
         delete msg;          delete op;
     }     }
 } }
  
   
   
 void MessageQueueService::enqueue(Message* msg) void MessageQueueService::enqueue(Message* msg)
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
Line 347 
Line 314 
     PEGASUS_ASSERT(service != 0);     PEGASUS_ASSERT(service != 0);
     try     try
     {     {
         if (service->_die.get() != 0)          if (service->pollingListEntry->die)
         {         {
             service->_threads--;             service->_threads--;
             return 0;             return 0;
Line 359 
Line 326 
         // many operations may have been queued.         // many operations may have been queued.
         do         do
         {         {
             try  
             {  
                 operation = service->_incoming.dequeue();                 operation = service->_incoming.dequeue();
             }  
             catch (ListClosed&)  
             {  
                 // ATTN: This appears to be a common loop exit path.  
                 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                 //    "Caught ListClosed exception.  Exiting _req_proc.");  
                 break;  
             }  
  
             if (operation)             if (operation)
             {             {
Line 380 
Line 337 
     }     }
     catch (const Exception& e)     catch (const Exception& e)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,          PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
             String("Caught exception: \"") + e.getMessage() +              "Caught exception: \"%s\".  Exiting _req_proc.",
                 "\".  Exiting _req_proc.");              (const char*)e.getMessage().getCString()));
     }     }
     catch (...)     catch (...)
     {     {
Line 407 
Line 364 
 // including op, op->_callback_node, and op->_callback_ptr // including op, op->_callback_node, and op->_callback_ptr
 void MessageQueueService::_handle_async_callback(AsyncOpNode* op) void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
 { {
     if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)      PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_CALLBACK);
     {  
         Message *msg = op->removeRequest();  
         if (msg && (msg->getMask() & MessageMask::ha_async))  
         {  
             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)  
             {  
                 AsyncLegacyOperationStart *wrapper =  
                     static_cast<AsyncLegacyOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)  
             {  
                 AsyncModuleOperationStart *wrapper =  
                     static_cast<AsyncModuleOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_OP_START)  
             {  
                 AsyncOperationStart *wrapper =  
                     static_cast<AsyncOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             delete msg;  
         }  
   
         msg = op->removeResponse();  
         if (msg && (msg->getMask() & MessageMask::ha_async))  
         {  
             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)  
             {  
                 AsyncLegacyOperationResult *wrapper =  
                     static_cast<AsyncLegacyOperationResult *>(msg);  
                 msg = wrapper->get_result();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)  
             {  
                 AsyncModuleOperationResult *wrapper =  
                     static_cast<AsyncModuleOperationResult *>(msg);  
                 msg = wrapper->get_result();  
                 delete wrapper;  
             }  
         }  
         void (*callback)(Message *, void *, void *) = op->__async_callback;  
         void *handle = op->_callback_handle;  
         void *parm = op->_callback_parameter;  
         op->release();  
         return_op(op);  
         callback(msg, handle, parm);  
     }  
     else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)  
     {  
         // note that _callback_node may be different from op         // note that _callback_node may be different from op
         // op->_callback_response_q is a "this" pointer we can use for         // op->_callback_response_q is a "this" pointer we can use for
         // static callback methods         // static callback methods
         op->_async_callback(         op->_async_callback(
             op->_callback_node, op->_callback_response_q, op->_callback_ptr);             op->_callback_node, op->_callback_response_q, op->_callback_ptr);
     }     }
 }  
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation) void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
 { {
     if (operation != 0)     if (operation != 0)
     {     {
   
 // ATTN: optimization  
 // << Tue Feb 19 14:10:38 2002 mdd >>  
         operation->lock();  
   
         Message *rq = operation->_request.get();         Message *rq = operation->_request.get();
  
 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>> // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
Line 491 
Line 387 
         if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))         if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
         {         {
             operation->_request.release();             operation->_request.release();
             operation->unlock();  
             // delete the op node             // delete the op node
             operation->release();  
             return_op(operation);             return_op(operation);
   
             handleEnqueue(rq);             handleEnqueue(rq);
             return;             return;
         }         }
  
         if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||          if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK) &&
              operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&  
             (operation->_state & ASYNC_OPSTATE_COMPLETE))             (operation->_state & ASYNC_OPSTATE_COMPLETE))
         {         {
             operation->unlock();  
             _handle_async_callback(operation);             _handle_async_callback(operation);
         }         }
         else         else
         {         {
             PEGASUS_ASSERT(rq != 0);             PEGASUS_ASSERT(rq != 0);
             operation->unlock();  
             _handle_async_request(static_cast<AsyncRequest *>(rq));             _handle_async_request(static_cast<AsyncRequest *>(rq));
         }         }
     }     }
Line 519 
Line 409 
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
     if (req != 0)  
     {  
         req->op->processing();  
   
         MessageType type = req->getType();         MessageType type = req->getType();
         if (type == ASYNC_HEARTBEAT)      if (type == ASYNC_IOCLOSE)
             handle_heartbeat_request(req);      {
         else if (type == ASYNC_IOCTL)          handle_AsyncIoClose(static_cast<AsyncIoClose*>(req));
             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));      }
         else if (type == ASYNC_CIMSERVICE_START)         else if (type == ASYNC_CIMSERVICE_START)
       {
             handle_CimServiceStart(static_cast<CimServiceStart *>(req));             handle_CimServiceStart(static_cast<CimServiceStart *>(req));
       }
         else if (type == ASYNC_CIMSERVICE_STOP)         else if (type == ASYNC_CIMSERVICE_STOP)
       {
             handle_CimServiceStop(static_cast<CimServiceStop *>(req));             handle_CimServiceStop(static_cast<CimServiceStop *>(req));
         else if (type == ASYNC_CIMSERVICE_PAUSE)      }
             handle_CimServicePause(static_cast<CimServicePause *>(req));  
         else if (type == ASYNC_CIMSERVICE_RESUME)  
             handle_CimServiceResume(static_cast<CimServiceResume *>(req));  
         else if (type == ASYNC_ASYNC_OP_START)  
             handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));  
         else         else
         {         {
             // we don't handle this request message             // we don't handle this request message
             _make_response(req, async_results::CIM_NAK);             _make_response(req, async_results::CIM_NAK);
         }         }
     }     }
 }  
   
  
 Boolean MessageQueueService::_enqueueResponse( Boolean MessageQueueService::_enqueueResponse(
     Message* request,     Message* request,
Line 560 
Line 442 
         {         {
             _completeAsyncResponse(             _completeAsyncResponse(
                 static_cast<AsyncRequest *>(request),                 static_cast<AsyncRequest *>(request),
                 static_cast<AsyncReply *>(response),                  static_cast<AsyncReply *>(response));
                 ASYNC_OPSTATE_COMPLETE, 0);  
             PEG_METHOD_EXIT();             PEG_METHOD_EXIT();
             return true;             return true;
         }         }
Line 588 
Line 470 
                 response);                 response);
         _completeAsyncResponse(         _completeAsyncResponse(
             asyncRequest,             asyncRequest,
             async_result,              async_result);
             ASYNC_OPSTATE_COMPLETE,  
             0);  
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return true;         return true;
     }     }
Line 605 
Line 486 
     cimom::_make_response(req, code);     cimom::_make_response(req, code);
 } }
  
   
 void MessageQueueService::_completeAsyncResponse( void MessageQueueService::_completeAsyncResponse(
     AsyncRequest* request,     AsyncRequest* request,
     AsyncReply* reply,      AsyncReply* reply)
     Uint32 state,  
     Uint32 flag)  
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
         "MessageQueueService::_completeAsyncResponse");         "MessageQueueService::_completeAsyncResponse");
  
     cimom::_completeAsyncResponse(request, reply, state, flag);      cimom::_completeAsyncResponse(request, reply);
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
  
 void MessageQueueService::_complete_op_node( void MessageQueueService::_complete_op_node(
     AsyncOpNode* op,      AsyncOpNode* op)
     Uint32 state,  
     Uint32 flag,  
     Uint32 code)  
 { {
     cimom::_complete_op_node(op, state, flag, code);      cimom::_complete_op_node(op);
 } }
  
  
Line 637 
Line 512 
         return false;         return false;
     if (_polling_thread == NULL)     if (_polling_thread == NULL)
     {     {
           PEGASUS_ASSERT(_polling_list);
         _polling_thread = new Thread(         _polling_thread = new Thread(
             polling_routine,             polling_routine,
             reinterpret_cast<void *>(_get_polling_list()),              reinterpret_cast<void *>(_polling_list),
             false);             false);
         ThreadStatus tr = PEGASUS_THREAD_OK;         ThreadStatus tr = PEGASUS_THREAD_OK;
         while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)         while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
Line 652 
Line 528 
                     "Could not allocate thread for the polling thread."));                     "Could not allocate thread for the polling thread."));
         }         }
     }     }
 // ATTN optimization remove the message checking altogether in the base      if (pollingListEntry->die == false)
 // << Mon Feb 18 14:02:20 2002 mdd >>      {
     op->lock();          if (_incoming.enqueue(op))
     Message *rq = op->_request.get();  
     Message *rp = op->_response.get();  
     op->unlock();  
   
     if ((rq != 0 && (true == messageOK(rq))) ||  
         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)  
     {     {
         _incoming.enqueue_wait(op);  
         _polling_sem.signal();         _polling_sem.signal();
         return true;         return true;
     }     }
     return false;  
 } }
   
 Boolean MessageQueueService::messageOK(const Message* msg)  
 {  
     if (_incoming_queue_shutdown.get() > 0)  
         return false;         return false;
     return true;  
 } }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)  void MessageQueueService::handle_AsyncIoClose(AsyncIoClose *req)
 {  
     // default action is to echo a heartbeat response  
   
     AsyncReply *reply = new AsyncReply(  
         ASYNC_HEARTBEAT,  
         0,  
         req->op,  
         async_results::OK,  
         req->resp,  
         false);  
     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);  
 }  
   
   
 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)  
 {  
 }  
   
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)  
 {  
     switch (req->ctl)  
     {  
         case AsyncIoctl::IO_CLOSE:  
         {         {
             MessageQueueService *service =             MessageQueueService *service =
                 static_cast<MessageQueueService *>(req->op->_service_ptr);          static_cast<MessageQueueService*>(req->op->_op_dest);
  
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
             PEGASUS_STD(cout) << service->getQueueName() <<             PEGASUS_STD(cout) << service->getQueueName() <<
                 " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);          " Received AsyncIoClose " << PEGASUS_STD(endl);
 #endif #endif
       // set the closing flag, don't accept any more messages
       service->_incoming_queue_shutdown = 1;
  
             // respond to this message. this is fire and forget, so we             // respond to this message. this is fire and forget, so we
             // don't need to delete anything.             // don't need to delete anything.
             // this takes care of two problems that were being found             // this takes care of two problems that were being found
             // << Thu Oct  9 10:52:48 2003 mdd >>             // << Thu Oct  9 10:52:48 2003 mdd >>
             _make_response(req, async_results::OK);             _make_response(req, async_results::OK);
             // ensure we do not accept any further messages  
   
             // ensure we don't recurse on IO_CLOSE  
             if (_incoming_queue_shutdown.get() > 0)  
                 break;  
   
             // set the closing flag  
             service->_incoming_queue_shutdown = 1;  
             // empty out the queue  
             while (1)  
             {  
                 AsyncOpNode *operation;  
                 try  
                 {  
                     operation = service->_incoming.dequeue();  
                 }  
                 catch (IPCException&)  
                 {  
                     break;  
                 }  
                 if (operation)  
                 {  
                     operation->_service_ptr = service;  
                     service->_handle_incoming_operation(operation);  
                 }  
                 else  
                     break;  
             } // message processing loop  
   
             // shutdown the AsyncQueue  
             service->_incoming.close();  
             return;  
         }  
   
         default:  
             _make_response(req, async_results::CIM_NAK);  
     }  
 } }
  
 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req) void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
Line 759 
Line 564 
     PEGASUS_STD(cout) << getQueueName() << "received START" <<     PEGASUS_STD(cout) << getQueueName() << "received START" <<
         PEGASUS_STD(endl);         PEGASUS_STD(endl);
 #endif #endif
       PEGASUS_ASSERT(!_isRunning);
     // clear the stoped bit and update      _isRunning = true;
     _capabilities &= (~(module_capabilities::stopped));  
     _make_response(req, async_results::OK);     _make_response(req, async_results::OK);
     // now tell the meta dispatcher we are stopped  
     update_service(_capabilities, _mask);  
 } }
  
 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req) void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
Line 772 
Line 574 
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 #endif #endif
     // set the stopeed bit and update      PEGASUS_ASSERT(_isRunning);
     _capabilities |= module_capabilities::stopped;      _isRunning = false;
     _make_response(req, async_results::CIM_STOPPED);      _make_response(req, async_results::CIM_SERVICE_STOPPED);
     // now tell the meta dispatcher we are stopped  
     update_service(_capabilities, _mask);  
 }  
   
 void MessageQueueService::handle_CimServicePause(CimServicePause* req)  
 {  
     // set the paused bit and update  
     _capabilities |= module_capabilities::paused;  
     update_service(_capabilities, _mask);  
     _make_response(req, async_results::CIM_PAUSED);  
     // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)  
 {  
     // clear the paused  bit and update  
     _capabilities &= (~(module_capabilities::paused));  
     update_service(_capabilities, _mask);  
     _make_response(req, async_results::OK);  
     // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)  
 {  
     _make_response(req, async_results::CIM_NAK);  
 }  
   
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)  
 {  
 }  
   
   
 void MessageQueueService::handle_AsyncLegacyOperationStart(  
     AsyncLegacyOperationStart* req)  
 {  
     // remove the legacy message from the request and enqueue it to its  
     // destination  
     Uint32 result = async_results::CIM_NAK;  
   
     Message* legacy = req->_act;  
     if (legacy != 0)  
     {  
         MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);  
         if (queue != 0)  
         {  
             if (queue->isAsync() == true)  
             {  
                 (static_cast<MessageQueueService *>(queue))->handleEnqueue(  
                     legacy);  
             }  
             else  
             {  
                 // Enqueue the response:  
                 queue->enqueue(req->get_action());  
             }  
   
             result = async_results::OK;  
         }  
     }  
     _make_response(req, result);  
 }  
   
 void MessageQueueService::handle_AsyncLegacyOperationResult(  
     AsyncLegacyOperationResult* rep)  
 {  
 } }
  
 AsyncOpNode* MessageQueueService::get_op() AsyncOpNode* MessageQueueService::get_op()
Line 847 
Line 584 
    AsyncOpNode* op = new AsyncOpNode();    AsyncOpNode* op = new AsyncOpNode();
  
    op->_state = ASYNC_OPSTATE_UNKNOWN;    op->_state = ASYNC_OPSTATE_UNKNOWN;
    op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;     op->_flags = ASYNC_OPFLAGS_UNKNOWN;
  
    return op;    return op;
 } }
  
 void MessageQueueService::return_op(AsyncOpNode* op) void MessageQueueService::return_op(AsyncOpNode* op)
 { {
     PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);  
     delete op;     delete op;
 } }
  
Line 866 
Line 602 
     MessageQueue* callback_response_q,     MessageQueue* callback_response_q,
     void* callback_ptr)     void* callback_ptr)
 { {
     PEGASUS_ASSERT(op != 0 && callback != 0);      return _sendAsync(
           op,
           destination,
           callback,
           callback_response_q,
           callback_ptr,
           ASYNC_OPFLAGS_CALLBACK);
   
   }
  
     // get the queue handle for the destination  Boolean MessageQueueService::_sendAsync(
       AsyncOpNode* op,
       Uint32 destination,
       void (*callback)(AsyncOpNode*, MessageQueue*, void*),
       MessageQueue* callback_response_q,
       void* callback_ptr,
       Uint32 flags)
   {
       PEGASUS_ASSERT(op != 0 && callback != 0);
  
     op->lock();  
     // destination of this message     // destination of this message
     op->_op_dest = MessageQueue::lookup(destination);     op->_op_dest = MessageQueue::lookup(destination);
     op->_flags |= ASYNC_OPFLAGS_CALLBACK;      if (op->_op_dest == 0)
     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);      {
           return false;
       }
       op->_flags = flags;
     // initialize the callback data     // initialize the callback data
     // callback function to be executed by recpt. of response     // callback function to be executed by recpt. of response
     op->_async_callback = callback;     op->_async_callback = callback;
Line 887 
Line 641 
     // I am the originator of this request     // I am the originator of this request
     op->_callback_request_q = this;     op->_callback_request_q = this;
  
     op->unlock();  
     if (op->_op_dest == 0)  
         return false;  
   
     return  _meta_dispatcher->route_async(op);     return  _meta_dispatcher->route_async(op);
 } }
  
   
 Boolean MessageQueueService::SendAsync(  
     Message* msg,  
     Uint32 destination,  
     void (*callback)(Message* response, void* handle, void* parameter),  
     void* handle,  
     void* parameter)  
 {  
     if (msg == NULL)  
         return false;  
     if (callback == NULL)  
         return SendForget(msg);  
     AsyncOpNode *op = get_op();  
     msg->dest = destination;  
     if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))  
     {  
         op->release();  
         return_op(op);  
         return false;  
     }  
     op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;  
     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);  
     op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
     op->__async_callback = callback;  
     op->_callback_node = op;  
     op->_callback_handle = handle;  
     op->_callback_parameter = parameter;  
     op->_callback_response_q = this;  
   
     if (!(msg->getMask() & MessageMask::ha_async))  
     {  
         AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(  
             op,  
             destination,  
             msg,  
             destination);  
     }  
     else  
     {  
         op->_request.reset(msg);  
         (static_cast<AsyncMessage *>(msg))->op = op;  
     }  
     return _meta_dispatcher->route_async(op);  
 }  
   
   
 Boolean MessageQueueService::SendForget(Message* msg) Boolean MessageQueueService::SendForget(Message* msg)
 { {
     AsyncOpNode* op = 0;     AsyncOpNode* op = 0;
Line 959 
Line 663 
             (static_cast<AsyncMessage *>(msg))->op = op;             (static_cast<AsyncMessage *>(msg))->op = op;
         }         }
     }     }
   
       PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(op->_state == ASYNC_OPSTATE_UNKNOWN);
     op->_op_dest = MessageQueue::lookup(msg->dest);     op->_op_dest = MessageQueue::lookup(msg->dest);
     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
         | ASYNC_OPFLAGS_SIMPLE_STATUS);  
     op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
     if (op->_op_dest == 0)     if (op->_op_dest == 0)
     {     {
         op->release();  
         return_op(op);         return_op(op);
         return false;         return false;
     }     }
  
       op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
   
     // now see if the meta dispatcher will take it     // now see if the meta dispatcher will take it
     return  _meta_dispatcher->route_async(op);     return  _meta_dispatcher->route_async(op);
 } }
Line 990 
Line 694 
         destroy_op = true;         destroy_op = true;
     }     }
  
       PEGASUS_ASSERT(request->op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(request->op->_state == ASYNC_OPSTATE_UNKNOWN);
   
     request->block = false;     request->block = false;
     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;      _sendAsync(
     SendAsync(  
         request->op,         request->op,
         request->dest,         request->dest,
         _sendwait_callback,         _sendwait_callback,
         this,         this,
         (void *)0);          (void *)0,
           ASYNC_OPFLAGS_PSEUDO_CALLBACK);
  
     request->op->_client_sem.wait();     request->op->_client_sem.wait();
  
Line 1006 
Line 713 
  
     if (destroy_op == true)     if (destroy_op == true)
     {     {
         request->op->lock();  
         request->op->_request.release();         request->op->_request.release();
         request->op->_state |= ASYNC_OPSTATE_RELEASED;  
         request->op->unlock();  
         return_op(request->op);         return_op(request->op);
         request->op = 0;         request->op = 0;
     }     }
     return rpl;     return rpl;
 } }
  
   Uint32 MessageQueueService::find_service_qid(const String &name)
 Boolean MessageQueueService::register_service(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask)  
 {  
     RegisterCimService *msg = new RegisterCimService(  
         0,  
         true,  
         name,  
         capabilities,  
         mask,  
         _queueId);  
     msg->dest = CIMOM_Q_ID;  
   
     Boolean registered = false;  
     AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));  
   
     if (reply != 0)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {         {
             if (reply->getMask() & MessageMask::ha_reply)      MessageQueue *queue = MessageQueue::lookup((const char*)name.getCString());
             {      PEGASUS_ASSERT(queue);
                 if (reply->result == async_results::OK ||      return queue->getQueueId();
                     reply->result == async_results::MODULE_ALREADY_REGISTERED)  
                 {  
                     registered = true;  
                 }  
             }  
         }  
   
         delete reply;  
     }  
     delete msg;  
     return registered;  
 }  
   
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)  
 {  
     UpdateCimService *msg = new UpdateCimService(  
         0,  
         true,  
         _queueId,  
         _capabilities,  
         _mask);  
     Boolean registered = false;  
   
     AsyncMessage* reply = SendWait(msg);  
     if (reply)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (static_cast<AsyncReply *>(reply)->result ==  
                         async_results::OK)  
                 {  
                     registered = true;  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete msg;  
     return registered;  
 }  
   
   
 Boolean MessageQueueService::deregister_service()  
 {  
     _meta_dispatcher->deregister_module(_queueId);  
     return true;  
 }  
   
   
 void MessageQueueService::find_services(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask,  
     Array<Uint32>* results)  
 {  
     if (results == 0)  
     {  
         throw NullPointer();  
     }  
   
     results->clear();  
   
     FindServiceQueue *req = new FindServiceQueue(  
         0,  
         _queueId,  
         true,  
         name,  
         capabilities,  
         mask);  
   
     req->dest = CIMOM_Q_ID;  
   
     AsyncMessage *reply = SendWait(req);  
     if (reply)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)  
                 {  
                     if ((static_cast<FindServiceQueueResult*>(reply))->result ==  
                             async_results::OK)  
                         *results =  
                             (static_cast<FindServiceQueueResult*>(reply))->qids;  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete req;  
 }  
   
 void MessageQueueService::enumerate_service(  
     Uint32 queue,  
     message_module* result)  
 {  
     if (result == 0)  
     {  
         throw NullPointer();  
     }  
   
     EnumerateService *req = new EnumerateService(  
         0,  
         _queueId,  
         true,  
         queue);  
   
     AsyncMessage* reply = SendWait(req);  
   
     if (reply)  
     {  
         Boolean found = false;  
   
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)  
                 {  
                     if ((static_cast<EnumerateServiceResponse*>(reply))->  
                             result == async_results::OK)  
                     {  
                         if (found == false)  
                         {  
                             found = true;  
   
                             result->put_name((static_cast<  
                                 EnumerateServiceResponse*>(reply))->name);  
                             result->put_capabilities((static_cast<  
                                 EnumerateServiceResponse*>(reply))->  
                                     capabilities);  
                             result->put_mask((static_cast<  
                                 EnumerateServiceResponse*>(reply))->mask);  
                             result->put_queue((static_cast<  
                                 EnumerateServiceResponse*>(reply))->qid);  
                         }  
                     }  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete req;  
 }  
   
 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()  
 {  
     _polling_list_mutex.lock();  
   
     if (!_polling_list)  
         _polling_list = new PollingList;  
   
     _polling_list_mutex.unlock();  
   
     return _polling_list;  
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.138  
changed lines
  Added in v.1.157

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2