(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.145 and 1.146

version 1.145, 2008/09/17 18:47:22 version 1.146, 2008/10/14 17:25:58
Line 170 
Line 170 
 { {
     _capabilities = (capabilities | module_capabilities::async);     _capabilities = (capabilities | module_capabilities::async);
  
     _default_op_timeout.tv_sec = 30;  
     _default_op_timeout.tv_usec = 100;  
   
     max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;     max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
  
     // if requested thread max is out of range, then set to     // if requested thread max is out of range, then set to
Line 203 
Line 200 
     }     }
     _service_count++;     _service_count++;
  
     if (false == register_service(name, _capabilities, _mask))  
     {  
         MessageLoaderParms parms(  
             "Common.MessageQueueService.UNABLE_TO_REGISTER",  
             "CIM base message queue service is unable to register with the "  
                 "CIMOM dispatcher.");  
         throw Exception(parms);  
     }  
   
     _get_polling_list()->insert_back(this);     _get_polling_list()->insert_back(this);
 } }
  
Line 502 
Line 490 
         req->op->processing();         req->op->processing();
  
         MessageType type = req->getType();         MessageType type = req->getType();
         if (type == ASYNC_HEARTBEAT)          if (type == ASYNC_IOCTL)
             handle_heartbeat_request(req);  
         else if (type == ASYNC_IOCTL)  
             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
         else if (type == ASYNC_CIMSERVICE_START)         else if (type == ASYNC_CIMSERVICE_START)
             handle_CimServiceStart(static_cast<CimServiceStart *>(req));             handle_CimServiceStart(static_cast<CimServiceStart *>(req));
         else if (type == ASYNC_CIMSERVICE_STOP)         else if (type == ASYNC_CIMSERVICE_STOP)
             handle_CimServiceStop(static_cast<CimServiceStop *>(req));             handle_CimServiceStop(static_cast<CimServiceStop *>(req));
         else if (type == ASYNC_CIMSERVICE_PAUSE)  
             handle_CimServicePause(static_cast<CimServicePause *>(req));  
         else if (type == ASYNC_CIMSERVICE_RESUME)  
             handle_CimServiceResume(static_cast<CimServiceResume *>(req));  
         else if (type == ASYNC_ASYNC_OP_START)  
             handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));  
         else         else
         {         {
             // we don't handle this request message             // we don't handle this request message
Line 656 
Line 636 
     return true;     return true;
 } }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)  
 {  
     // default action is to echo a heartbeat response  
   
     AsyncReply *reply = new AsyncReply(  
         ASYNC_HEARTBEAT,  
         0,  
         req->op,  
         async_results::OK,  
         req->resp,  
         false);  
     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);  
 }  
   
   
 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)  
 {  
 }  
   
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req) void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
 { {
     switch (req->ctl)     switch (req->ctl)
Line 743 
Line 704 
     // clear the stoped bit and update     // clear the stoped bit and update
     _capabilities &= (~(module_capabilities::stopped));     _capabilities &= (~(module_capabilities::stopped));
     _make_response(req, async_results::OK);     _make_response(req, async_results::OK);
     // now tell the meta dispatcher we are stopped  
     update_service(_capabilities, _mask);  
 } }
  
 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req) void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
Line 755 
Line 714 
     // set the stopeed bit and update     // set the stopeed bit and update
     _capabilities |= module_capabilities::stopped;     _capabilities |= module_capabilities::stopped;
     _make_response(req, async_results::CIM_STOPPED);     _make_response(req, async_results::CIM_STOPPED);
     // now tell the meta dispatcher we are stopped  
     update_service(_capabilities, _mask);  
 }  
   
 void MessageQueueService::handle_CimServicePause(CimServicePause* req)  
 {  
     // set the paused bit and update  
     _capabilities |= module_capabilities::paused;  
     update_service(_capabilities, _mask);  
     _make_response(req, async_results::CIM_PAUSED);  
     // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)  
 {  
     // clear the paused  bit and update  
     _capabilities &= (~(module_capabilities::paused));  
     update_service(_capabilities, _mask);  
     _make_response(req, async_results::OK);  
     // now tell the meta dispatcher we are stopped  
 }  
   
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)  
 {  
     _make_response(req, async_results::CIM_NAK);  
 }  
   
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)  
 {  
 } }
  
 AsyncOpNode* MessageQueueService::get_op() AsyncOpNode* MessageQueueService::get_op()
Line 838 
Line 768 
     return  _meta_dispatcher->route_async(op);     return  _meta_dispatcher->route_async(op);
 } }
  
   
 Boolean MessageQueueService::SendAsync(  
     Message* msg,  
     Uint32 destination,  
     void (*callback)(Message* response, void* handle, void* parameter),  
     void* handle,  
     void* parameter)  
 {  
     if (msg == NULL)  
         return false;  
     if (callback == NULL)  
         return SendForget(msg);  
     AsyncOpNode *op = get_op();  
     msg->dest = destination;  
     if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))  
     {  
         op->release();  
         return_op(op);  
         return false;  
     }  
     op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;  
     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);  
     op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
     op->__async_callback = callback;  
     op->_callback_node = op;  
     op->_callback_handle = handle;  
     op->_callback_parameter = parameter;  
     op->_callback_response_q = this;  
   
     if (!(msg->getMask() & MessageMask::ha_async))  
     {  
         AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(  
             op,  
             destination,  
             msg);  
     }  
     else  
     {  
         op->_request.reset(msg);  
         (static_cast<AsyncMessage *>(msg))->op = op;  
     }  
     return _meta_dispatcher->route_async(op);  
 }  
   
   
 Boolean MessageQueueService::SendForget(Message* msg) Boolean MessageQueueService::SendForget(Message* msg)
 { {
     AsyncOpNode* op = 0;     AsyncOpNode* op = 0;
Line 959 
Line 844 
     return rpl;     return rpl;
 } }
  
   Uint32 MessageQueueService::find_service_qid(const String &name)
 Boolean MessageQueueService::register_service(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask)  
 {  
     RegisterCimService *msg = new RegisterCimService(  
         0,  
         true,  
         name,  
         capabilities,  
         mask,  
         _queueId);  
     msg->dest = CIMOM_Q_ID;  
   
     Boolean registered = false;  
     AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));  
   
     if (reply != 0)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->result == async_results::OK ||  
                     reply->result == async_results::MODULE_ALREADY_REGISTERED)  
                 {  
                     registered = true;  
                 }  
             }  
         }  
   
         delete reply;  
     }  
     delete msg;  
     return registered;  
 }  
   
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)  
 {  
     UpdateCimService *msg = new UpdateCimService(  
         0,  
         true,  
         _queueId,  
         _capabilities,  
         _mask);  
     Boolean registered = false;  
   
     AsyncMessage* reply = SendWait(msg);  
     if (reply)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (static_cast<AsyncReply *>(reply)->result ==  
                         async_results::OK)  
                 {  
                     registered = true;  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete msg;  
     return registered;  
 }  
   
   
 Boolean MessageQueueService::deregister_service()  
 {  
     _meta_dispatcher->deregister_module(_queueId);  
     return true;  
 }  
   
   
 void MessageQueueService::find_services(  
     String name,  
     Uint32 capabilities,  
     Uint32 mask,  
     Array<Uint32>* results)  
 {  
     if (results == 0)  
     {  
         throw NullPointer();  
     }  
   
     results->clear();  
   
     FindServiceQueue *req = new FindServiceQueue(  
         0,  
         _queueId,  
         true,  
         name,  
         capabilities,  
         mask);  
   
     req->dest = CIMOM_Q_ID;  
   
     AsyncMessage *reply = SendWait(req);  
     if (reply)  
     {  
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)  
                 {  
                     if ((static_cast<FindServiceQueueResult*>(reply))->result ==  
                             async_results::OK)  
                         *results =  
                             (static_cast<FindServiceQueueResult*>(reply))->qids;  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete req;  
 }  
   
 void MessageQueueService::enumerate_service(  
     Uint32 queue,  
     message_module* result)  
 {  
     if (result == 0)  
     {  
         throw NullPointer();  
     }  
   
     EnumerateService *req = new EnumerateService(  
         0,  
         _queueId,  
         true,  
         queue);  
   
     AsyncMessage* reply = SendWait(req);  
   
     if (reply)  
     {  
         Boolean found = false;  
   
         if (reply->getMask() & MessageMask::ha_async)  
         {  
             if (reply->getMask() & MessageMask::ha_reply)  
             {  
                 if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)  
                 {  
                     if ((static_cast<EnumerateServiceResponse*>(reply))->  
                             result == async_results::OK)  
                     {                     {
                         if (found == false)      MessageQueue *queue = MessageQueue::lookup((const char*)name.getCString());
                         {      PEGASUS_ASSERT(queue);
                             found = true;      return queue->getQueueId();
   
                             result->put_name((static_cast<  
                                 EnumerateServiceResponse*>(reply))->name);  
                             result->put_capabilities((static_cast<  
                                 EnumerateServiceResponse*>(reply))->  
                                     capabilities);  
                             result->put_mask((static_cast<  
                                 EnumerateServiceResponse*>(reply))->mask);  
                             result->put_queue((static_cast<  
                                 EnumerateServiceResponse*>(reply))->qid);  
                         }  
                     }  
                 }  
             }  
         }  
         delete reply;  
     }  
     delete req;  
 } }
  
 MessageQueueService::PollingList* MessageQueueService::_get_polling_list() MessageQueueService::PollingList* MessageQueueService::_get_polling_list()


Legend:
Removed from v.1.145  
changed lines
  Added in v.1.146

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2