(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / _cimom

  1 mday  1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6           // of this software and associated documentation files (the "Software"), to
  7           // deal in the Software without restriction, including without limitation the
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9           // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11           //
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22 mday  1.1 //
 23           // Author: Mike Day (mdday@us.ibm.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29           #include "MessageQueueService.h"
 30           
 31           PEGASUS_NAMESPACE_BEGIN
 32           
 33           AtomicInt MessageQueueService::_xid(1);
 34           
 35           // mutex is UNLOCKED
 36           void MessageQueueService::handleEnqueue(void)
 37           {
 38 mday  1.3    Message *msg = dequeue();
 39              if( msg )
 40              {
 41                 if(msg->getMask() & message_mask::ha_async)
 42                 {
 43           	 (static_cast<AsyncMessage *>(msg))->op->release();
 44                 }
 45                 else
 46           	 delete msg;
 47              }
 48           }
 49           
 50 mday  1.1 
 51 mday  1.3 void MessageQueueService::_enqueueAsyncResponse(AsyncRequest *request, 
 52           						AsyncReply *reply, 
 53           						Uint32 state, 
 54           						Uint32 flag)
 55           {
 56              AsyncOpNode *op = request->op;
 57              op->lock();
 58              if (false == op->_response.exists(reply))
 59                 op->_response.insert_last(reply);
 60              
 61              op->_state |= state;
 62              op->_flags |= flag;
 63              gettimeofday(&(op->_updated), NULL);
 64              op->unlock();
 65 mday  1.1 }
 66           
 67 mday  1.3 
 68           Message *MessageQueueService::openEnvelope(Message *msg)
 69 mday  1.2 {
 70              Uint32 mask = msg->getMask();
 71              if( mask & message_mask::ha_async )
 72              {
 73 mday  1.3       AsyncOpNode *op = (static_cast<AsyncMessage *>(msg))->op;
 74                 if(op == 0 )
 75           	 throw NullPointer();
 76                 // start pulling the last message
 77                 // when we reach the envelope return null
 78           
 79 mday  1.2    }
 80              return 0;
 81           }
 82           
 83 mday  1.3 AsyncOpNode *MessageQueueService::_get_op(void)
 84 mday  1.1 {
 85 mday  1.3    AsyncOpNode *op = _meta_dispatcher->get_cached_op();
 86              if(op == 0 )
 87                 throw NullPointer();
 88 mday  1.1    
 89 mday  1.3    op->write_state(ASYNC_OPSTATE_UNKNOWN);
 90              op->write_flags(ASYNC_OPFLAGS_SINGLE | 
 91           		   ASYNC_OPFLAGS_NORMAL | 
 92           		   ASYNC_OPFLAGS_META_DISPATCHER);
 93              return op;
 94 mday  1.1 }
 95           
 96 mday  1.3 void MessageQueueService::_return_op(AsyncOpNode *op)
 97 mday  1.1 {
 98 mday  1.3    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
 99 mday  1.1    
100 mday  1.3    if(op->read_state() & ASYNC_OPFLAGS_META_DISPATCHER )
101              {
102                 _meta_dispatcher->cache_op(op);
103              }
104              else
105                 delete op;
106 mday  1.1 }
107           
108 mday  1.3 
109           void MessageQueueService::SendWait(AsyncRequest *request, unlocked_dq<AsyncMessage>& reply_list)
110 mday  1.1 {
111 mday  1.3    AsyncOpNode *op = request->op;
112 mday  1.2    if(op == 0 )
113 mday  1.3       return;
114 mday  1.2    
115 mday  1.3    if(true == _meta_dispatcher->accept_async(static_cast<Message *>(request)))
116              {
117                 op->_client_sem.wait();
118                 op->lock();
119                 while( op->_response.count() )
120                 {
121           	 AsyncMessage *rply = static_cast<AsyncMessage *>(op->_response.remove_last());
122           	 if (rply != 0 )
123           	 {
124           	    rply->op = 0;
125           	    reply_list.insert_first( rply );
126           	 }
127                 }
128                 // release the opnode, the meta-dispatcher will recycle it for us
129                 op->_state |= ASYNC_OPSTATE_RELEASED ;
130                 op->unlock();
131              }
132              else
133 mday  1.2    {
134 mday  1.3       // manually free the opnode and message
135                 op->release();
136                 _return_op(op);
137 mday  1.2    }
138 mday  1.3 }
139           
140           Boolean MessageQueueService::SendAsync(AsyncMessage *msg)
141           {
142              return _meta_dispatcher->accept_async(static_cast<Message *>(msg));
143 mday  1.1 }
144           
145           
146 mday  1.3 Boolean MessageQueueService::register_service(String name, 
147           					      Uint32 capabilities, 
148           					      Uint32 mask)
149 mday  1.1 
150 mday  1.3 {
151              AsyncOpNode *op = _meta_dispatcher->get_cached_op();
152              
153              op->_state |= ASYNC_OPSTATE_UNKNOWN;
154              op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
155              
156              RegisterCimService *msg = new RegisterCimService(get_next_xid(),
157           						    op, 
158           						    true, 
159           						    name, 
160           						    capabilities, 
161           						    mask,
162           						    _queueId);
163              unlocked_dq<AsyncMessage> reply_list;
164              SendWait(msg, reply_list);
165              Boolean registered = false;
166              
167              AsyncReply *reply = static_cast<AsyncReply *>(reply_list.remove_first());
168              while(reply)
169              {
170                 if(reply->getMask() & message_mask:: ha_async)
171 mday  1.3       {
172           	 if(reply->getMask() & message_mask::ha_reply)
173           	 {
174           	    if(reply->result == async_results::OK)
175           	       registered = true;
176           	 }
177                 }
178                 
179                 delete reply;
180                 reply = static_cast<AsyncReply *>(reply_list.remove_first());
181              }
182              return registered;
183           }
184 mday  1.1 
185 mday  1.3 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
186           {
187              
188              AsyncOpNode *op = _meta_dispatcher->get_cached_op();
189              op->_state |= ASYNC_OPSTATE_UNKNOWN;
190              op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
191              
192              UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
193           						op, 
194           						true, 
195           						_queueId,
196           						_capabilities, 
197           						_mask);
198              unlocked_dq<AsyncMessage> reply_list;
199              SendWait(msg, reply_list);
200              Boolean registered = false;
201              AsyncReply *reply = static_cast<AsyncReply *>(reply_list.remove_first());
202              while(reply)
203              {
204                 if(reply->getMask() & message_mask:: ha_async)
205                 {
206 mday  1.3 	 if(reply->getMask() & message_mask::ha_reply)
207           	 {
208           	    if(reply->result == async_results::OK)
209           	       registered = true;
210           	 }
211                 }
212                 
213                 delete reply;
214                 reply = static_cast<AsyncReply *>(reply_list.remove_first());
215              }
216              return registered;
217           }
218 mday  1.1 
219           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2