1 mday 1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mday 1.1 //
23 // Author: Mike Day (mdday@us.ibm.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "MessageQueueService.h"
30
31 PEGASUS_NAMESPACE_BEGIN
32
33 AtomicInt MessageQueueService::_xid(1);
34
35 // mutex is UNLOCKED
36 void MessageQueueService::handleEnqueue(void)
37 {
|
38 mday 1.3 Message *msg = dequeue();
39 if( msg )
40 {
41 if(msg->getMask() & message_mask::ha_async)
42 {
43 (static_cast<AsyncMessage *>(msg))->op->release();
44 }
45 else
46 delete msg;
47 }
48 }
49
|
50 mday 1.1
|
51 mday 1.3 void MessageQueueService::_enqueueAsyncResponse(AsyncRequest *request,
52 AsyncReply *reply,
53 Uint32 state,
54 Uint32 flag)
55 {
56 AsyncOpNode *op = request->op;
57 op->lock();
58 if (false == op->_response.exists(reply))
59 op->_response.insert_last(reply);
60
61 op->_state |= state;
62 op->_flags |= flag;
63 gettimeofday(&(op->_updated), NULL);
64 op->unlock();
|
65 mday 1.1 }
66
|
67 mday 1.3
68 Message *MessageQueueService::openEnvelope(Message *msg)
|
69 mday 1.2 {
70 Uint32 mask = msg->getMask();
71 if( mask & message_mask::ha_async )
72 {
|
73 mday 1.3 AsyncOpNode *op = (static_cast<AsyncMessage *>(msg))->op;
74 if(op == 0 )
75 throw NullPointer();
76 // start pulling the last message
77 // when we reach the envelope return null
78
|
79 mday 1.2 }
80 return 0;
81 }
82
|
83 mday 1.3 AsyncOpNode *MessageQueueService::_get_op(void)
|
84 mday 1.1 {
|
85 mday 1.3 AsyncOpNode *op = _meta_dispatcher->get_cached_op();
86 if(op == 0 )
87 throw NullPointer();
|
88 mday 1.1
|
89 mday 1.3 op->write_state(ASYNC_OPSTATE_UNKNOWN);
90 op->write_flags(ASYNC_OPFLAGS_SINGLE |
91 ASYNC_OPFLAGS_NORMAL |
92 ASYNC_OPFLAGS_META_DISPATCHER);
93 return op;
|
94 mday 1.1 }
95
|
96 mday 1.3 void MessageQueueService::_return_op(AsyncOpNode *op)
|
97 mday 1.1 {
|
98 mday 1.3 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
99 mday 1.1
|
100 mday 1.3 if(op->read_state() & ASYNC_OPFLAGS_META_DISPATCHER )
101 {
102 _meta_dispatcher->cache_op(op);
103 }
104 else
105 delete op;
|
106 mday 1.1 }
107
|
108 mday 1.3
109 void MessageQueueService::SendWait(AsyncRequest *request, unlocked_dq<AsyncMessage>& reply_list)
|
110 mday 1.1 {
|
111 mday 1.3 AsyncOpNode *op = request->op;
|
112 mday 1.2 if(op == 0 )
|
113 mday 1.3 return;
|
114 mday 1.2
|
115 mday 1.3 if(true == _meta_dispatcher->accept_async(static_cast<Message *>(request)))
116 {
117 op->_client_sem.wait();
118 op->lock();
119 while( op->_response.count() )
120 {
121 AsyncMessage *rply = static_cast<AsyncMessage *>(op->_response.remove_last());
122 if (rply != 0 )
123 {
124 rply->op = 0;
125 reply_list.insert_first( rply );
126 }
127 }
128 // release the opnode, the meta-dispatcher will recycle it for us
129 op->_state |= ASYNC_OPSTATE_RELEASED ;
130 op->unlock();
131 }
132 else
|
133 mday 1.2 {
|
134 mday 1.3 // manually free the opnode and message
135 op->release();
136 _return_op(op);
|
137 mday 1.2 }
|
138 mday 1.3 }
139
140 Boolean MessageQueueService::SendAsync(AsyncMessage *msg)
141 {
142 return _meta_dispatcher->accept_async(static_cast<Message *>(msg));
|
143 mday 1.1 }
144
145
|
146 mday 1.3 Boolean MessageQueueService::register_service(String name,
147 Uint32 capabilities,
148 Uint32 mask)
|
149 mday 1.1
|
150 mday 1.3 {
151 AsyncOpNode *op = _meta_dispatcher->get_cached_op();
152
153 op->_state |= ASYNC_OPSTATE_UNKNOWN;
154 op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
155
156 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
157 op,
158 true,
159 name,
160 capabilities,
161 mask,
162 _queueId);
163 unlocked_dq<AsyncMessage> reply_list;
164 SendWait(msg, reply_list);
165 Boolean registered = false;
166
167 AsyncReply *reply = static_cast<AsyncReply *>(reply_list.remove_first());
168 while(reply)
169 {
170 if(reply->getMask() & message_mask:: ha_async)
171 mday 1.3 {
172 if(reply->getMask() & message_mask::ha_reply)
173 {
174 if(reply->result == async_results::OK)
175 registered = true;
176 }
177 }
178
179 delete reply;
180 reply = static_cast<AsyncReply *>(reply_list.remove_first());
181 }
182 return registered;
183 }
|
184 mday 1.1
|
185 mday 1.3 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
186 {
187
188 AsyncOpNode *op = _meta_dispatcher->get_cached_op();
189 op->_state |= ASYNC_OPSTATE_UNKNOWN;
190 op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
191
192 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
193 op,
194 true,
195 _queueId,
196 _capabilities,
197 _mask);
198 unlocked_dq<AsyncMessage> reply_list;
199 SendWait(msg, reply_list);
200 Boolean registered = false;
201 AsyncReply *reply = static_cast<AsyncReply *>(reply_list.remove_first());
202 while(reply)
203 {
204 if(reply->getMask() & message_mask:: ha_async)
205 {
206 mday 1.3 if(reply->getMask() & message_mask::ha_reply)
207 {
208 if(reply->result == async_results::OK)
209 registered = true;
210 }
211 }
212
213 delete reply;
214 reply = static_cast<AsyncReply *>(reply_list.remove_first());
215 }
216 return registered;
217 }
|
218 mday 1.1
219 PEGASUS_NAMESPACE_END
|