(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.138 and 1.159

version 1.138, 2008/06/19 17:57:01 version 1.159, 2008/12/16 18:56:00
Line 1 
Line 1 
 //%2006////////////////////////////////////////////////////////////////////////  //%LICENSE////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development  // Licensed to The Open Group (TOG) under one or more contributor license
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.  // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;  // this work for additional information regarding copyright ownership.
 // IBM Corp.; EMC Corporation, The Open Group.  // Each contributor licenses this file to you under the OpenPegasus Open
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;  // Source License; you may not use this file except in compliance with the
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.  // License.
 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;  
 // EMC Corporation; VERITAS Software Corporation; The Open Group.  
 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;  
 // EMC Corporation; Symantec Corporation; The Open Group.  
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy  // Permission is hereby granted, free of charge, to any person obtaining a
 // of this software and associated documentation files (the "Software"), to  // copy of this software and associated documentation files (the "Software"),
 // deal in the Software without restriction, including without limitation the  // to deal in the Software without restriction, including without limitation
 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or  // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 // sell copies of the Software, and to permit persons to whom the Software is  // and/or sell copies of the Software, and to permit persons to whom the
 // furnished to do so, subject to the following conditions:  // Software is furnished to do so, subject to the following conditions:
 // //
 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN  // The above copyright notice and this permission notice shall be included
 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED  // in all copies or substantial portions of the Software.
 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT  
 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR  
 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT  
 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN  
 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION  
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
 // //
 //==============================================================================  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
   // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
   // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
   // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
   // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   //
   //////////////////////////////////////////////////////////////////////////
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
Line 35 
Line 33 
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/MessageLoader.h> #include <Pegasus/Common/MessageLoader.h>
  
   PEGASUS_USING_STD;
   
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 cimom *MessageQueueService::_meta_dispatcher = 0; cimom *MessageQueueService::_meta_dispatcher = 0;
Line 74 
Line 74 
     void* parm)     void* parm)
 { {
     Thread *myself = reinterpret_cast<Thread *>(parm);     Thread *myself = reinterpret_cast<Thread *>(parm);
     List<MessageQueueService, Mutex> *list =      MessageQueueService::PollingList *list =
         reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());          reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm());
  
       try
       {
     while (_stop_polling.get()  == 0)     while (_stop_polling.get()  == 0)
     {     {
         _polling_sem.wait();         _polling_sem.wait();
Line 95 
Line 97 
         // processing the _polling_list         // processing the _polling_list
         // (e.g., MessageQueueServer::~MessageQueueService).         // (e.g., MessageQueueServer::~MessageQueueService).
  
         list->lock();              _polling_list_mutex.lock();
         MessageQueueService *service = list->front();         MessageQueueService *service = list->front();
         ThreadStatus rtn = PEGASUS_THREAD_OK;         ThreadStatus rtn = PEGASUS_THREAD_OK;
         while (service != NULL)         while (service != NULL)
Line 113 
Line 115 
                 // lock and has ownership of the service object.                 // lock and has ownership of the service object.
  
                 service->_threads++;                 service->_threads++;
                 try  
                 {  
                     rtn = _thread_pool->allocate_and_awaken(                     rtn = _thread_pool->allocate_and_awaken(
                         service, _req_proc, &_polling_sem);                         service, _req_proc, &_polling_sem);
                 }  
                 catch (...)  
                 {  
                     service->_threads--;  
   
                     // allocate_and_awaken should never generate an exception.  
                     PEGASUS_ASSERT(0);  
                 }  
                 // if no more threads available, break from processing loop                 // if no more threads available, break from processing loop
                 if (rtn != PEGASUS_THREAD_OK )                 if (rtn != PEGASUS_THREAD_OK )
                 {                 {
Line 138 
Line 130 
                         service->_threads.get()));                         service->_threads.get()));
  
                     Threads::yield();                     Threads::yield();
                     service = NULL;                          break;
                 }                 }
             }             }
             if (service != NULL)  
             {  
                 service = list->next_of(service);                 service = list->next_of(service);
             }             }
               _polling_list_mutex.unlock();
         }         }
         list->unlock();  
     }     }
       catch(const Exception &e)
       {
           PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
               "Exception caught in MessageQueueService::polling_routine : %s",
                   (const char*)e.getMessage().getCString()));
       }
       catch(const exception &e)
       {
           PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
               "Exception caught in MessageQueueService::polling_routine : %s",
                   e.what()));
       }
       catch(...)
       {
           PEG_TRACE_CSTRING(TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
               "Unknown Exception caught in MessageQueueService::polling_routine");
       }
   
       PEGASUS_ASSERT(_stop_polling.get());
   
     return ThreadReturnType(0);     return ThreadReturnType(0);
 } }
  
Line 158 
Line 168 
  
 MessageQueueService::MessageQueueService( MessageQueueService::MessageQueueService(
     const char* name,     const char* name,
     Uint32 queueID,      Uint32 queueID)
     Uint32 capabilities,  
     Uint32 mask)  
     : Base(name, true,  queueID),     : Base(name, true,  queueID),
       _mask(mask),  
       _die(0),       _die(0),
       _threads(0),       _threads(0),
       _incoming(),       _incoming(),
       _incoming_queue_shutdown(0)       _incoming_queue_shutdown(0)
 { {
     _capabilities = (capabilities | module_capabilities::async);      _isRunning = true;
   
     _default_op_timeout.tv_sec = 30;  
     _default_op_timeout.tv_usec = 100;  
  
     max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;     max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
  
Line 203 
Line 207 
     }     }
     _service_count++;     _service_count++;
  
     if (false == register_service(name, _capabilities, _mask))      // Add to the polling list
       if (!_polling_list)
     {     {
         MessageLoaderParms parms(          _polling_list = new PollingList;
             "Common.MessageQueueService.UNABLE_TO_REGISTER",  
             "CIM base message queue service is unable to register with the "  
                 "CIMOM dispatcher.");  
         throw BindFailedException(parms);  
     }     }
       _polling_list->insert_back(this);
     _get_polling_list()->insert_back(this);     _meta_dispatcher->registerCIMService(this);
 } }
  
  
 MessageQueueService::~MessageQueueService() MessageQueueService::~MessageQueueService()
 { {
     _die = 1;  
  
     // The polling_routine locks the _polling_list while      // Close incoming queue.
     // processing the incoming messages for services on the  
     // list.  Deleting the service from the _polling_list  
     // prior to processing, avoids synchronization issues  
     // with the _polling_routine.  
   
     // ATTN: added to prevent assertion in List in which the list does not  
     // contain this element.  
   
     if (_get_polling_list()->contains(this))  
         _get_polling_list()->remove(this);  
   
     // ATTN: The code for closing the _incoming queue  
     // is not working correctly. In OpenPegasus 2.5,  
     // execution of the following code is very timing  
     // dependent. This needs to be fix.  
     // See Bug 4079 for details.  
     if (_incoming_queue_shutdown.get() == 0)     if (_incoming_queue_shutdown.get() == 0)
     {     {
         _shutdown_incoming_queue();          AsyncIoClose *msg = new AsyncIoClose(
               0,
               _queueId,
               _queueId,
               true);
           SendForget(msg);
           // Wait until our queue has been shutdown.
           while (_incoming_queue_shutdown.get() == 0)
           {
               Threads::yield();
     }     }
       }
   
       // die now.
       _die = 1;
   
       _meta_dispatcher->deregisterCIMService(this);
  
     // Wait until all threads processing the messages     // Wait until all threads processing the messages
     // for this service have completed.     // for this service have completed.
   
     while (_threads.get() > 0)     while (_threads.get() > 0)
     {     {
         Threads::yield();         Threads::yield();
     }     }
  
   
       // The polling_routine locks the _polling_list while
       // processing the incoming messages for services on the
       // list.  Deleting the service from the _polling_list
       // prior to processing, avoids synchronization issues
       // with the _polling_routine.
       _removeFromPollingList(this);
   
     {     {
         AutoMutex autoMut(_meta_dispatcher_mutex);         AutoMutex autoMut(_meta_dispatcher_mutex);
   
         _service_count--;         _service_count--;
           // If we are last service to die, delete metadispatcher.
         if (_service_count.get() == 0)         if (_service_count.get() == 0)
         {         {
   
             _stop_polling++;             _stop_polling++;
             _polling_sem.signal();             _polling_sem.signal();
             if (_polling_thread)             if (_polling_thread)
Line 264 
Line 271 
                 delete _polling_thread;                 delete _polling_thread;
                 _polling_thread = 0;                 _polling_thread = 0;
             }             }
             _meta_dispatcher->_shutdown_routed_queue();  
             delete _meta_dispatcher;             delete _meta_dispatcher;
             _meta_dispatcher = 0;             _meta_dispatcher = 0;
  
             delete _thread_pool;             delete _thread_pool;
             _thread_pool = 0;             _thread_pool = 0;
         }         }
     } // mutex unlocks here  
     // Clean up in case there are extra stuff on the queue.  
     while (_incoming.count())  
     {  
         try  
         {  
             delete _incoming.dequeue();  
         }  
         catch (const ListClosed&)  
         {  
             // If the list is closed, there is nothing we can do.  
             break;  
         }  
     }  
 } }
  
 void MessageQueueService::_shutdown_incoming_queue()      // Clean up any extra stuff on the queue.
 {      AsyncOpNode* op = 0;
     if (_incoming_queue_shutdown.get() > 0)      while ((op = _incoming.dequeue()))
         return;  
   
     AsyncIoctl *msg = new AsyncIoctl(  
         0,  
         _queueId,  
         _queueId,  
         true,  
         AsyncIoctl::IO_CLOSE,  
         0,  
         0);  
   
     msg->op = get_op();  
     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
     msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
         | ASYNC_OPFLAGS_SIMPLE_STATUS);  
     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
   
     msg->op->_op_dest = this;  
     msg->op->_request.reset(msg);  
     try  
     {  
         _incoming.enqueue_wait(msg->op);  
         _polling_sem.signal();  
     }  
     catch (const ListClosed&)  
     {  
         // This means the queue has already been shut-down (happens  when there  
         // are two AsyncIoctrl::IO_CLOSE messages generated and one got first  
         // processed.  
         delete msg;  
     }  
     catch (const Permission&)  
     {     {
         delete msg;          delete op;
     }     }
 } }
  
   
   
 void MessageQueueService::enqueue(Message* msg) void MessageQueueService::enqueue(Message* msg)
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
Line 359 
Line 317 
         // many operations may have been queued.         // many operations may have been queued.
         do         do
         {         {
             try  
             {  
                 operation = service->_incoming.dequeue();                 operation = service->_incoming.dequeue();
             }  
             catch (ListClosed&)  
             {  
                 // ATTN: This appears to be a common loop exit path.  
                 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                 //    "Caught ListClosed exception.  Exiting _req_proc.");  
                 break;  
             }  
  
             if (operation)             if (operation)
             {             {
Line 380 
Line 328 
     }     }
     catch (const Exception& e)     catch (const Exception& e)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,          PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
             String("Caught exception: \"") + e.getMessage() +              "Caught exception: \"%s\".  Exiting _req_proc.",
                 "\".  Exiting _req_proc.");              (const char*)e.getMessage().getCString()));
     }     }
     catch (...)     catch (...)
     {     {
Line 407 
Line 355 
 // including op, op->_callback_node, and op->_callback_ptr // including op, op->_callback_node, and op->_callback_ptr
 void MessageQueueService::_handle_async_callback(AsyncOpNode* op) void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
 { {
     if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)      PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_CALLBACK);
     {  
         Message *msg = op->removeRequest();  
         if (msg && (msg->getMask() & MessageMask::ha_async))  
         {  
             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)  
             {  
                 AsyncLegacyOperationStart *wrapper =  
                     static_cast<AsyncLegacyOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)  
             {  
                 AsyncModuleOperationStart *wrapper =  
                     static_cast<AsyncModuleOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_OP_START)  
             {  
                 AsyncOperationStart *wrapper =  
                     static_cast<AsyncOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             delete msg;  
         }  
   
         msg = op->removeResponse();  
         if (msg && (msg->getMask() & MessageMask::ha_async))  
         {  
             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)  
             {  
                 AsyncLegacyOperationResult *wrapper =  
                     static_cast<AsyncLegacyOperationResult *>(msg);  
                 msg = wrapper->get_result();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)  
             {  
                 AsyncModuleOperationResult *wrapper =  
                     static_cast<AsyncModuleOperationResult *>(msg);  
                 msg = wrapper->get_result();  
                 delete wrapper;  
             }  
         }  
         void (*callback)(Message *, void *, void *) = op->__async_callback;  
         void *handle = op->_callback_handle;  
         void *parm = op->_callback_parameter;  
         op->release();  
         return_op(op);  
         callback(msg, handle, parm);  
     }  
     else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)  
     {  
         // note that _callback_node may be different from op         // note that _callback_node may be different from op
         // op->_callback_response_q is a "this" pointer we can use for         // op->_callback_response_q is a "this" pointer we can use for
         // static callback methods         // static callback methods
         op->_async_callback(         op->_async_callback(
             op->_callback_node, op->_callback_response_q, op->_callback_ptr);             op->_callback_node, op->_callback_response_q, op->_callback_ptr);
     }     }
 }  
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation) void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
 { {
     if (operation != 0)     if (operation != 0)
     {     {
   
 // ATTN: optimization  
 // << Tue Feb 19 14:10:38 2002 mdd >>  
         operation->lock();  
   
         Message *rq = operation->_request.get();         Message *rq = operation->_request.get();
  
 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>> // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
Line 491 
Line 378 
         if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))         if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
         {         {
             operation->_request.release();             operation->_request.release();
             operation->unlock();  
             // delete the op node             // delete the op node
             operation->release();  
             return_op(operation);             return_op(operation);
   
             handleEnqueue(rq);             handleEnqueue(rq);
             return;             return;
         }         }
  
         if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||          if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK) &&
              operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&  
             (operation->_state & ASYNC_OPSTATE_COMPLETE))             (operation->_state & ASYNC_OPSTATE_COMPLETE))
         {         {
             operation->unlock();  
             _handle_async_callback(operation);             _handle_async_callback(operation);
         }         }
         else         else
         {         {
             PEGASUS_ASSERT(rq != 0);             PEGASUS_ASSERT(rq != 0);
             operation->unlock();  
             _handle_async_request(static_cast<AsyncRequest *>(rq));             _handle_async_request(static_cast<AsyncRequest *>(rq));
         }         }
     }     }
Line 519 
Line 400 
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
     if (req != 0)  
     {  
         req->op->processing();  
   
         MessageType type = req->getType();         MessageType type = req->getType();
         if (type == ASYNC_HEARTBEAT)      if (type == ASYNC_IOCLOSE)
             handle_heartbeat_request(req);      {
         else if (type == ASYNC_IOCTL)          handle_AsyncIoClose(static_cast<AsyncIoClose*>(req));
             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));      }
         else if (type == ASYNC_CIMSERVICE_START)         else if (type == ASYNC_CIMSERVICE_START)
       {
             handle_CimServiceStart(static_cast<CimServiceStart *>(req));             handle_CimServiceStart(static_cast<CimServiceStart *>(req));
       }
         else if (type == ASYNC_CIMSERVICE_STOP)         else if (type == ASYNC_CIMSERVICE_STOP)
       {
             handle_CimServiceStop(static_cast<CimServiceStop *>(req));             handle_CimServiceStop(static_cast<CimServiceStop *>(req));
         else if (type == ASYNC_CIMSERVICE_PAUSE)      }
             handle_CimServicePause(static_cast<CimServicePause *>(req));  
         else if (type == ASYNC_CIMSERVICE_RESUME)  
             handle_CimServiceResume(static_cast<CimServiceResume *>(req));  
         else if (type == ASYNC_ASYNC_OP_START)  
             handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));  
         else         else
         {         {
             // we don't handle this request message             // we don't handle this request message
             _make_response(req, async_results::CIM_NAK);             _make_response(req, async_results::CIM_NAK);
         }         }
     }     }
 }  
   
  
 Boolean MessageQueueService::_enqueueResponse( Boolean MessageQueueService::_enqueueResponse(
     Message* request,     Message* request,
Line 560 
Line 433 
         {         {
             _completeAsyncResponse(             _completeAsyncResponse(
                 static_cast<AsyncRequest *>(request),                 static_cast<AsyncRequest *>(request),
                 static_cast<AsyncReply *>(response),                  static_cast<AsyncReply *>(response));
                 ASYNC_OPSTATE_COMPLETE, 0);  
             PEG_METHOD_EXIT();             PEG_METHOD_EXIT();
             return true;             return true;
         }         }
Line 588 
Line 461 
                 response);                 response);
         _completeAsyncResponse(         _completeAsyncResponse(
             asyncRequest,             asyncRequest,
             async_result,              async_result);
             ASYNC_OPSTATE_COMPLETE,  
             0);  
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return true;         return true;
     }     }
Line 605 
Line 477 
     cimom::_make_response(req, code);     cimom::_make_response(req, code);
 } }
  
   
 void MessageQueueService::_completeAsyncResponse( void MessageQueueService::_completeAsyncResponse(
     AsyncRequest* request,     AsyncRequest* request,
     AsyncReply* reply,      AsyncReply* reply)
     Uint32 state,  
     Uint32 flag)  
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
         "MessageQueueService::_completeAsyncResponse");         "MessageQueueService::_completeAsyncResponse");
  
     cimom::_completeAsyncResponse(request, reply, state, flag);      cimom::_completeAsyncResponse(request, reply);
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
  
 void MessageQueueService::_complete_op_node( void MessageQueueService::_complete_op_node(
     AsyncOpNode* op,      AsyncOpNode* op)
     Uint32 state,  
     Uint32 flag,  
     Uint32 code)  
 { {
     cimom::_complete_op_node(op, state, flag, code);      cimom::_complete_op_node(op);
 } }
  
  
 Boolean MessageQueueService::accept_async(AsyncOpNode* op) Boolean MessageQueueService::accept_async(AsyncOpNode* op)
 { {
       if (!_isRunning)
       {
           // Don't accept any messages other than start.
           if (op->_request.get()->getType() != ASYNC_CIMSERVICE_START)
           {
               return false;
           }
       }
   
     if (_incoming_queue_shutdown.get() > 0)     if (_incoming_queue_shutdown.get() > 0)
         return false;         return false;
   
     if (_polling_thread == NULL)     if (_polling_thread == NULL)
     {     {
           PEGASUS_ASSERT(_polling_list);
         _polling_thread = new Thread(         _polling_thread = new Thread(
             polling_routine,             polling_routine,
             reinterpret_cast<void *>(_get_polling_list()),              reinterpret_cast<void *>(_polling_list),
             false);             false);
         ThreadStatus tr = PEGASUS_THREAD_OK;         ThreadStatus tr = PEGASUS_THREAD_OK;
         while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)         while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
Line 652 
Line 529 
                     "Could not allocate thread for the polling thread."));                     "Could not allocate thread for the polling thread."));
         }         }
     }     }
 // ATTN optimization remove the message checking altogether in the base      if (_die.get() == 0)
 // << Mon Feb 18 14:02:20 2002 mdd >>      {
     op->lock();          if (_incoming.enqueue(op))
     Message *rq = op->_request.get();  
     Message *rp = op->_response.get();  
     op->unlock();  
   
     if ((rq != 0 && (true == messageOK(rq))) ||  
         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)  
     {     {
         _incoming.enqueue_wait(op);  
         _polling_sem.signal();         _polling_sem.signal();
         return true;         return true;
     }     }
     return false;  
 } }
   
 Boolean MessageQueueService::messageOK(const Message* msg)  
 {  
     if (_incoming_queue_shutdown.get() > 0)  
         return false;         return false;
     return true;  
 } }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)  void MessageQueueService::handle_AsyncIoClose(AsyncIoClose *req)
 {  
     // default action is to echo a heartbeat response  
   
     AsyncReply *reply = new AsyncReply(  
         ASYNC_HEARTBEAT,  
         0,  
         req->op,  
         async_results::OK,  
         req->resp,  
         false);  
     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);  
 }  
   
   
 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)  
 {  
 }  
   
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)  
 {  
     switch (req->ctl)  
     {  
         case AsyncIoctl::IO_CLOSE:  
         {         {
             MessageQueueService *service =             MessageQueueService *service =
                 static_cast<MessageQueueService *>(req->op->_service_ptr);          static_cast<MessageQueueService*>(req->op->_op_dest);
  
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
             PEGASUS_STD(cout) << service->getQueueName() <<             PEGASUS_STD(cout) << service->getQueueName() <<
                 " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);          " Received AsyncIoClose " << PEGASUS_STD(endl);
 #endif #endif
       // set the closing flag, don't accept any more messages
       service->_incoming_queue_shutdown = 1;
  
             // respond to this message. this is fire and forget, so we             // respond to this message. this is fire and forget, so we
             // don't need to delete anything.             // don't need to delete anything.
             // this takes care of two problems that were being found             // this takes care of two problems that were being found
             // << Thu Oct  9 10:52:48 2003 mdd >>             // << Thu Oct  9 10:52:48 2003 mdd >>
             _make_response(req, async_results::OK);             _make_response(req, async_results::OK);
             // ensure we do not accept any further messages  
   
             // ensure we don't recurse on IO_CLOSE  
             if (_incoming_queue_shutdown.get() > 0)  
                 break;  
   
             // set the closing flag  
             service->_incoming_queue_shutdown = 1;  
             // empty out the queue  
             while (1)  
             {  
                 AsyncOpNode *operation;  
                 try  
                 {  
                     operation = service->_incoming.dequeue();  
                 }  
                 catch (IPCException&)  
                 {  
                     break;  
                 }  
                 if (operation)  
                 {  
                     operation->_service_ptr = service;  
                     service->_handle_incoming_operation(operation);  
                 }  
                 else  
                     break;  
             } // message processing loop  
   
             // shutdown the AsyncQueue  
             service->_incoming.close();  
             return;  
         }  
   
         default:  
             _make_response(req, async_results::CIM_NAK);  
     }  
 } }
  
 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req) void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
Line 759 
Line 565 
     PEGASUS_STD(cout) << getQueueName() << "received START" <<     PEGASUS_STD(cout) << getQueueName() << "received START" <<
         PEGASUS_STD(endl);         PEGASUS_STD(endl);
 #endif #endif
       PEGASUS_ASSERT(!_isRunning);
     // clear the stoped bit and update      _isRunning = true;
     _capabilities &= (~(module_capabilities::stopped));  
     _make_response(req, async_results::OK);     _make_response(req, async_results::OK);
     // now tell the meta dispatcher we are stopped  
     update_service(_capabilities, _mask);  
 } }
  
 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req) void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
Line 772 
Line 575 
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 #endif #endif
     // set the stopeed bit and update      PEGASUS_ASSERT(_isRunning);
     _capabilities |= module_capabilities::stopped;      _isRunning = false;
     _make_response(req, async_results::CIM_STOPPED);      _make_response(req, async_results::CIM_SERVICE_STOPPED);
     // now tell the meta dispatcher we are stopped  
     update_service(_capabilities, _mask);  
 }  
   
 void MessageQueueService::handle_CimServicePause(CimServicePause* req)  
 {  
     // set the paused bit and update  
     _capabilities |= module_capabilities::paused;  
     update_service(_capabilities, _mask);  
     _make_response(req, async_results::CIM_PAUSED);  
     // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)  
 {  
     // clear the paused  bit and update  
     _capabilities &= (~(module_capabilities::paused));  
     update_service(_capabilities, _mask);  
     _make_response(req, async_results::OK);  
     // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)  
 {  
     _make_response(req, async_results::CIM_NAK);  
 }  
   
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)  
 {  
 }  
   
   
 void MessageQueueService::handle_AsyncLegacyOperationStart(  
     AsyncLegacyOperationStart* req)  
 {  
     // remove the legacy message from the request and enqueue it to its  
     // destination  
     Uint32 result = async_results::CIM_NAK;  
   
     Message* legacy = req->_act;  
     if (legacy != 0)  
     {  
         MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);  
         if (queue != 0)  
         {  
             if (queue->isAsync() == true)  
             {  
                 (static_cast<MessageQueueService *>(queue))->handleEnqueue(  
                     legacy);  
             }  
             else  
             {  
                 // Enqueue the response:  
                 queue->enqueue(req->get_action());  
             }  
   
             result = async_results::OK;  
         }  
     }  
     _make_response(req, result);  
 }  
   
 void MessageQueueService::handle_AsyncLegacyOperationResult(  
     AsyncLegacyOperationResult* rep)  
 {  
 } }
  
 AsyncOpNode* MessageQueueService::get_op() AsyncOpNode* MessageQueueService::get_op()
Line 847 
Line 585 
    AsyncOpNode* op = new AsyncOpNode();    AsyncOpNode* op = new AsyncOpNode();
  
    op->_state = ASYNC_OPSTATE_UNKNOWN;    op->_state = ASYNC_OPSTATE_UNKNOWN;
    op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;     op->_flags = ASYNC_OPFLAGS_UNKNOWN;
  
    return op;    return op;
 } }
  
 void MessageQueueService::return_op(AsyncOpNode* op) void MessageQueueService::return_op(AsyncOpNode* op)
 { {
     PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);  
     delete op;     delete op;
 } }
  
Line 866 
Line 603 
     MessageQueue* callback_response_q,     MessageQueue* callback_response_q,
     void* callback_ptr)     void* callback_ptr)
 { {
     PEGASUS_ASSERT(op != 0 && callback != 0);      return _sendAsync(
           op,
           destination,
           callback,
           callback_response_q,
           callback_ptr,
           ASYNC_OPFLAGS_CALLBACK);
  
     // get the queue handle for the destination  }
   
   Boolean MessageQueueService::_sendAsync(
       AsyncOpNode* op,
       Uint32 destination,
       void (*callback)(AsyncOpNode*, MessageQueue*, void*),
       MessageQueue* callback_response_q,
       void* callback_ptr,
       Uint32 flags)
   {
       PEGASUS_ASSERT(op != 0 && callback != 0);
  
     op->lock();  
     // destination of this message     // destination of this message
     op->_op_dest = MessageQueue::lookup(destination);     op->_op_dest = MessageQueue::lookup(destination);
     op->_flags |= ASYNC_OPFLAGS_CALLBACK;      if (op->_op_dest == 0)
     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);      {
           return false;
       }
       op->_flags = flags;
     // initialize the callback data     // initialize the callback data
     // callback function to be executed by recpt. of response     // callback function to be executed by recpt. of response
     op->_async_callback = callback;     op->_async_callback = callback;
Line 887 
Line 642 
     // I am the originator of this request     // I am the originator of this request
     op->_callback_request_q = this;     op->_callback_request_q = this;
  
     op->unlock();  
     if (op->_op_dest == 0)  
         return false;  
   
     return  _meta_dispatcher->route_async(op);  
 }  
   
   
 Boolean MessageQueueService::SendAsync(  
     Message* msg,  
     Uint32 destination,  
     void (*callback)(Message* response, void* handle, void* parameter),  
     void* handle,  
     void* parameter)  
 {  
     if (msg == NULL)  
         return false;  
     if (callback == NULL)  
         return SendForget(msg);  
     AsyncOpNode *op = get_op();  
     msg->dest = destination;  
     if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))  
     {  
         op->release();  
         return_op(op);  
         return false;  
     }  
     op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;  
     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);  
     op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
     op->__async_callback = callback;  
     op->_callback_node = op;  
     op->_callback_handle = handle;  
     op->_callback_parameter = parameter;  
     op->_callback_response_q = this;  
   
     if (!(msg->getMask() & MessageMask::ha_async))  
     {  
         AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(  
             op,  
             destination,  
             msg,  
             destination);  
     }  
     else  
     {  
         op->_request.reset(msg);  
         (static_cast<AsyncMessage *>(msg))->op = op;  
     }  
     return _meta_dispatcher->route_async(op);     return _meta_dispatcher->route_async(op);
 } }
  
   
 Boolean MessageQueueService::SendForget(Message* msg) Boolean MessageQueueService::SendForget(Message* msg)
 { {
     AsyncOpNode* op = 0;     AsyncOpNode* op = 0;
Line 959 
Line 664 
             (static_cast<AsyncMessage *>(msg))->op = op;             (static_cast<AsyncMessage *>(msg))->op = op;
         }         }
     }     }
   
       PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(op->_state == ASYNC_OPSTATE_UNKNOWN);
     op->_op_dest = MessageQueue::lookup(msg->dest);     op->_op_dest = MessageQueue::lookup(msg->dest);
     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
         | ASYNC_OPFLAGS_SIMPLE_STATUS);  
     op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
     if (op->_op_dest == 0)     if (op->_op_dest == 0)
     {     {
         op->release();  
         return_op(op);         return_op(op);
         return false;         return false;
     }     }
  
       op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
   
     // now see if the meta dispatcher will take it     // now see if the meta dispatcher will take it
     return  _meta_dispatcher->route_async(op);     return  _meta_dispatcher->route_async(op);
 } }
Line 990 
Line 695 
         destroy_op = true;         destroy_op = true;
     }     }
  
       PEGASUS_ASSERT(request->op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(request->op->_state == ASYNC_OPSTATE_UNKNOWN);
   
     request->block = false;     request->block = false;
     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;      _sendAsync(
     SendAsync(  
         request->op,         request->op,
         request->dest,         request->dest,
         _sendwait_callback,         _sendwait_callback,
         this,         this,
         (void *)0);          (void *)0,
           ASYNC_OPFLAGS_PSEUDO_CALLBACK);
  
     request->op->_client_sem.wait();     request->op->_client_sem.wait();
  
Line 1006 
Line 714 
  
     if (destroy_op == true)     if (destroy_op == true)
     {     {
         request->op->lock();  
         request->op->_request.release();         request->op->_request.release();
         request->op->_state |= ASYNC_OPSTATE_RELEASED;  
         request->op->unlock();  
         return_op(request->op);         return_op(request->op);
         request->op = 0;         request->op = 0;
     }     }
     return rpl;     return rpl;
 } }
  
   Uint32 MessageQueueService::find_service_qid(const String &name)
 Boolean MessageQueueService::register_service(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask)  
 {  
     RegisterCimService *msg = new RegisterCimService(  
         0,  
         true,  
         name,  
         capabilities,  
         mask,  
         _queueId);  
     msg->dest = CIMOM_Q_ID;  
   
     Boolean registered = false;  
     AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));  
   
     if (reply != 0)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {             {
                 if (reply->result == async_results::OK ||      MessageQueue* queue = MessageQueue::lookup((const char*)name.getCString());
                     reply->result == async_results::MODULE_ALREADY_REGISTERED)      PEGASUS_ASSERT(queue);
                 {      return queue->getQueueId();
                     registered = true;  
                 }  
             }  
         }  
   
         delete reply;  
     }  
     delete msg;  
     return registered;  
 } }
  
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)  void MessageQueueService::_removeFromPollingList(MessageQueueService *service)
 {  
     UpdateCimService *msg = new UpdateCimService(  
         0,  
         true,  
         _queueId,  
         _capabilities,  
         _mask);  
     Boolean registered = false;  
   
     AsyncMessage* reply = SendWait(msg);  
     if (reply)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (static_cast<AsyncReply *>(reply)->result ==  
                         async_results::OK)  
                 {  
                     registered = true;  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete msg;  
     return registered;  
 }  
   
   
 Boolean MessageQueueService::deregister_service()  
 {  
     _meta_dispatcher->deregister_module(_queueId);  
     return true;  
 }  
   
   
 void MessageQueueService::find_services(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask,  
     Array<Uint32>* results)  
 {  
     if (results == 0)  
     {  
         throw NullPointer();  
     }  
   
     results->clear();  
   
     FindServiceQueue *req = new FindServiceQueue(  
         0,  
         _queueId,  
         true,  
         name,  
         capabilities,  
         mask);  
   
     req->dest = CIMOM_Q_ID;  
   
     AsyncMessage *reply = SendWait(req);  
     if (reply)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)  
                 {  
                     if ((static_cast<FindServiceQueueResult*>(reply))->result ==  
                             async_results::OK)  
                         *results =  
                             (static_cast<FindServiceQueueResult*>(reply))->qids;  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete req;  
 }  
   
 void MessageQueueService::enumerate_service(  
     Uint32 queue,  
     message_module* result)  
 {  
     if (result == 0)  
     {  
         throw NullPointer();  
     }  
   
     EnumerateService *req = new EnumerateService(  
         0,  
         _queueId,  
         true,  
         queue);  
   
     AsyncMessage* reply = SendWait(req);  
   
     if (reply)  
     {  
         Boolean found = false;  
   
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)  
                 {  
                     if ((static_cast<EnumerateServiceResponse*>(reply))->  
                             result == async_results::OK)  
                     {  
                         if (found == false)  
                         {  
                             found = true;  
   
                             result->put_name((static_cast<  
                                 EnumerateServiceResponse*>(reply))->name);  
                             result->put_capabilities((static_cast<  
                                 EnumerateServiceResponse*>(reply))->  
                                     capabilities);  
                             result->put_mask((static_cast<  
                                 EnumerateServiceResponse*>(reply))->mask);  
                             result->put_queue((static_cast<  
                                 EnumerateServiceResponse*>(reply))->qid);  
                         }  
                     }  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete req;  
 }  
   
 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()  
 { {
     _polling_list_mutex.lock();     _polling_list_mutex.lock();
       _polling_list->remove(service);
     if (!_polling_list)  
         _polling_list = new PollingList;  
   
     _polling_list_mutex.unlock();     _polling_list_mutex.unlock();
   
     return _polling_list;  
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.138  
changed lines
  Added in v.1.159

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2