1 karl 1.119 //%2006////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mday 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.82 //
|
21 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "MessageQueueService.h"
|
35 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
|
37 mday 1.1
38 PEGASUS_NAMESPACE_BEGIN
39
|
40 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
41 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
42 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
43 mday 1.15
|
44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
45 mday 1.53
|
46 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
47 mday 1.53
|
48 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
49 Mutex MessageQueueService::_polling_list_mutex;
|
50 mday 1.53
|
51 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
52 mday 1.61
|
53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
54 mday 1.69 {
55 return _thread_pool;
56 }
|
57 kumpf 1.117
|
58 jim.wunderlich 1.110 //
|
59 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
60 jim.wunderlich 1.110 //
61 // JR Wunderlich Jun 6, 2005
62 //
63
64 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
66 jim.wunderlich 1.110
|
67 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
68 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
69 #endif
70
|
71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
72 mday 1.69
|
73 mike 1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
|
74 mday 1.53 {
75 Thread *myself = reinterpret_cast<Thread *>(parm);
|
76 mike 1.124 List<MessageQueueService, Mutex> *list =
77 reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
|
78 mike 1.120
|
79 mike 1.118 while (_stop_polling.get() == 0)
|
80 mday 1.53 {
|
81 baggio.br 1.100 _polling_sem.wait();
82
|
83 mike 1.118 if (_stop_polling.get() != 0)
|
84 mday 1.61 {
|
85 kumpf 1.62 break;
|
86 mday 1.61 }
|
87 kumpf 1.104
|
88 denise.eckstein 1.116 // The polling_routine thread must hold the lock on the
|
89 kumpf 1.130 // _polling_list while processing incoming messages.
|
90 denise.eckstein 1.116 // This lock is used to give this thread ownership of
91 // services on the _polling_routine list.
92
93 // This is necessary to avoid confict with other threads
94 // processing the _polling_list
95 // (e.g., MessageQueueServer::~MessageQueueService).
96
|
97 mday 1.53 list->lock();
|
98 mike 1.120 MessageQueueService *service = list->front();
|
99 denise.eckstein 1.116 ThreadStatus rtn = PEGASUS_THREAD_OK;
100 while (service != NULL)
101 {
102 if ((service->_incoming.count() > 0) &&
|
103 mike 1.118 (service->_die.get() == 0) &&
104 (service->_threads.get() < max_threads_per_svc_queue))
|
105 denise.eckstein 1.116 {
106 // The _threads count is used to track the
107 // number of active threads that have been allocated
108 // to process messages for this service.
109
110 // The _threads count MUST be incremented while
111 // the polling_routine owns the _polling_thread
112 // lock and has ownership of the service object.
113
114 service->_threads++;
115 try
116 {
117 rtn = _thread_pool->allocate_and_awaken(
118 service, _req_proc, &_polling_sem);
119 }
120 catch (...)
121 {
122 service->_threads--;
123
124 // allocate_and_awaken should never generate an exception.
125 PEGASUS_ASSERT(0);
126 denise.eckstein 1.116 }
127 // if no more threads available, break from processing loop
128 if (rtn != PEGASUS_THREAD_OK )
129 {
130 service->_threads--;
131 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
132 "Not enough threads to process this request. Skipping.");
133
134 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
135 "Could not allocate thread for %s. " \
136 "Queue has %d messages waiting and %d threads servicing." \
137 "Skipping the service for right now. ",
138 service->getQueueName(),
139 service->_incoming.count(),
|
140 mike 1.118 service->_threads.get());
|
141 denise.eckstein 1.116
|
142 mike 1.124 Threads::yield();
|
143 denise.eckstein 1.116 service = NULL;
144 }
145 }
146 if (service != NULL)
147 {
|
148 mike 1.120 service = list->next_of(service);
|
149 denise.eckstein 1.116 }
150 }
|
151 mday 1.53 list->unlock();
152 }
|
153 mike 1.124 myself->exit_self( (ThreadReturnType) 1 );
|
154 mday 1.53 return(0);
155 }
156
157
158 Semaphore MessageQueueService::_polling_sem(0);
159 AtomicInt MessageQueueService::_stop_polling(0);
160
|
161 mday 1.15
|
162 kumpf 1.104 MessageQueueService::MessageQueueService(
163 const char *name,
164 Uint32 queueID,
165 Uint32 capabilities,
166 Uint32 mask)
|
167 mday 1.15 : Base(name, true, queueID),
|
168 mday 1.1 _mask(mask),
|
169 mday 1.4 _die(0),
|
170 denise.eckstein 1.116 _threads(0),
|
171 mike 1.124 _incoming(),
|
172 konrad.r 1.106 _incoming_queue_shutdown(0)
|
173 kumpf 1.104 {
|
174 jim.wunderlich 1.110
|
175 kumpf 1.104 _capabilities = (capabilities | module_capabilities::async);
|
176 mday 1.53
|
177 mday 1.1 _default_op_timeout.tv_sec = 30;
178 _default_op_timeout.tv_usec = 100;
|
179 mday 1.15
|
180 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
181
|
182 kumpf 1.117 // if requested thread max is out of range, then set to
183 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
|
184 jim.wunderlich 1.110
|
185 kumpf 1.117 if ((max_threads_per_svc_queue < 1) ||
186 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
187 {
|
188 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
|
189 kumpf 1.117 }
|
190 jim.wunderlich 1.110
|
191 kumpf 1.117 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
192 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);
|
193 jim.wunderlich 1.110
|
194 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
195 kumpf 1.104
196 if (_meta_dispatcher == 0)
|
197 mday 1.15 {
|
198 joyce.j 1.101 _stop_polling = 0;
|
199 mike 1.118 PEGASUS_ASSERT(_service_count.get() == 0);
|
200 mday 1.15 _meta_dispatcher = new cimom();
|
201 kumpf 1.130
|
202 jim.wunderlich 1.110 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
203 // minimum_cnt, maximum_cnt, deallocateWait);
204 //
|
205 kumpf 1.99 _thread_pool =
|
206 kumpf 1.104 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
|
207 mday 1.15 }
208 _service_count++;
209
|
210 kumpf 1.104 if (false == register_service(name, _capabilities, _mask))
|
211 mday 1.15 {
|
212 humberto 1.71 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
213 kumpf 1.130 "CIM base message queue service is unable to register with the CIMOM "
214 "dispatcher.");
|
215 humberto 1.71 throw BindFailedException(parms);
|
216 mday 1.15 }
|
217 kumpf 1.104
|
218 mike 1.120 _get_polling_list()->insert_back(this);
|
219 mday 1.1 }
220
|
221 mday 1.4
|
222 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
223 mday 1.1 {
224 _die = 1;
|
225 mday 1.76
|
226 denise.eckstein 1.116 // The polling_routine locks the _polling_list while
227 // processing the incoming messages for services on the
228 // list. Deleting the service from the _polling_list
229 // prior to processing, avoids synchronization issues
230 // with the _polling_routine.
231
|
232 mike 1.120 // ATTN: added to prevent assertion in List in which the list does not
233 // contain this element.
234
235 if (_get_polling_list()->contains(this))
236 _get_polling_list()->remove(this);
|
237 denise.eckstein 1.116
238 // ATTN: The code for closing the _incoming queue
239 // is not working correctly. In OpenPegasus 2.5,
240 // execution of the following code is very timing
241 // dependent. This needs to be fix.
242 // See Bug 4079 for details.
|
243 mike 1.118 if (_incoming_queue_shutdown.get() == 0)
|
244 mday 1.76 {
|
245 denise.eckstein 1.116 _shutdown_incoming_queue();
246 }
|
247 konrad.r 1.107
|
248 denise.eckstein 1.116 // Wait until all threads processing the messages
249 // for this service have completed.
250
|
251 mike 1.118 while (_threads.get() > 0)
|
252 denise.eckstein 1.116 {
|
253 mike 1.124 Threads::yield();
|
254 mday 1.76 }
|
255 kumpf 1.104
|
256 mday 1.15 {
|
257 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
258 _service_count--;
|
259 mike 1.118 if (_service_count.get() == 0)
|
260 a.arora 1.87 {
|
261 mday 1.76
|
262 mday 1.53 _stop_polling++;
263 _polling_sem.signal();
|
264 kumpf 1.104 if (_polling_thread) {
265 _polling_thread->join();
266 delete _polling_thread;
267 _polling_thread = 0;
268 }
|
269 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
270 delete _meta_dispatcher;
|
271 kumpf 1.45 _meta_dispatcher = 0;
|
272 mday 1.76
|
273 kumpf 1.65 delete _thread_pool;
274 _thread_pool = 0;
|
275 a.arora 1.87 }
276 } // mutex unlocks here
|
277 kumpf 1.104 // Clean up in case there are extra stuff on the queue.
|
278 konrad.r 1.72 while (_incoming.count())
279 {
|
280 konrad.r 1.109 try {
|
281 mike 1.120 delete _incoming.dequeue();
|
282 kumpf 1.125 } catch (const ListClosed&)
|
283 konrad.r 1.109 {
284 // If the list is closed, there is nothing we can do.
285 break;
286 }
|
287 konrad.r 1.72 }
|
288 kumpf 1.104 }
|
289 mday 1.1
|
290 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
291 mday 1.7 {
|
292 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
293 kumpf 1.104 return;
294
295 AsyncIoctl *msg = new AsyncIoctl(
296 0,
297 _queueId,
298 _queueId,
299 true,
300 AsyncIoctl::IO_CLOSE,
301 0,
302 0);
|
303 mday 1.9
|
304 mday 1.8 msg->op = get_op();
|
305 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
306 kumpf 1.104 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
307 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
308 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
309 mday 1.41
|
310 mday 1.32 msg->op->_op_dest = this;
|
311 kumpf 1.123 msg->op->_request.reset(msg);
|
312 konrad.r 1.108 try {
|
313 mike 1.120 _incoming.enqueue_wait(msg->op);
|
314 konrad.r 1.108 _polling_sem.signal();
315 } catch (const ListClosed &)
316 {
|
317 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
318 konrad.r 1.108 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
319 // processed.
320 delete msg;
321 }
|
322 konrad.r 1.112 catch (const Permission &)
323 {
324 delete msg;
325 }
|
326 mday 1.7 }
327
|
328 mday 1.22
329
|
330 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
331 mday 1.22 {
|
332 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
333
|
334 mday 1.22 Base::enqueue(msg);
|
335 kumpf 1.28
336 PEG_METHOD_EXIT();
|
337 mday 1.22 }
338
339
|
340 mike 1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
|
341 kumpf 1.103 void * parm)
342 {
|
343 konrad.r 1.107 MessageQueueService* service =
344 reinterpret_cast<MessageQueueService*>(parm);
345 PEGASUS_ASSERT(service != 0);
|
346 kumpf 1.103 try
347 {
348
|
349 mike 1.118 if (service->_die.get() != 0)
|
350 kumpf 1.103 {
|
351 denise.eckstein 1.116 service->_threads--;
|
352 kumpf 1.103 return (0);
353 }
354 // pull messages off the incoming queue and dispatch them. then
355 // check pending messages that are non-blocking
356 AsyncOpNode *operation = 0;
357
358 // many operations may have been queued.
359 do
360 {
361 try
362 {
|
363 mike 1.120 operation = service->_incoming.dequeue();
|
364 kumpf 1.103 }
365 catch (ListClosed &)
366 {
|
367 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
368 //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
369 // "Caught ListClosed exception. Exiting _req_proc.");
|
370 kumpf 1.103 break;
371 }
372
373 if (operation)
374 {
375 operation->_service_ptr = service;
376 service->_handle_incoming_operation(operation);
377 }
378 } while (operation);
379 }
380 catch (const Exception& e)
381 {
382 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
383 String("Caught exception: \"") + e.getMessage() +
384 "\". Exiting _req_proc.");
385 }
386 catch (...)
387 {
388 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
389 "Caught unrecognized exception. Exiting _req_proc.");
390 }
|
391 konrad.r 1.107 service->_threads--;
|
392 kumpf 1.103 return(0);
|
393 mday 1.1 }
394
|
395 mday 1.43
|
396 kumpf 1.104 void MessageQueueService::_sendwait_callback(
397 AsyncOpNode *op,
398 MessageQueue *q,
399 void *parm)
|
400 mday 1.33 {
401 op->_client_sem.signal();
402 }
403
|
404 mday 1.30
405 // callback function is responsible for cleaning up all resources
406 // including op, op->_callback_node, and op->_callback_ptr
|
407 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
408 {
|
409 kumpf 1.104 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
|
410 mday 1.39 {
411
|
412 kumpf 1.123 Message *msg = op->removeRequest();
|
413 kumpf 1.128 if (msg && (msg->getMask() & MessageMask::ha_async))
|
414 mday 1.39 {
|
415 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
416 {
417 AsyncLegacyOperationStart *wrapper =
418 static_cast<AsyncLegacyOperationStart *>(msg);
419 msg = wrapper->get_action();
420 delete wrapper;
421 }
422 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
423 {
424 AsyncModuleOperationStart *wrapper =
425 static_cast<AsyncModuleOperationStart *>(msg);
426 msg = wrapper->get_action();
427 delete wrapper;
428 }
429 else if (msg->getType() == async_messages::ASYNC_OP_START)
430 {
431 AsyncOperationStart *wrapper =
432 static_cast<AsyncOperationStart *>(msg);
433 msg = wrapper->get_action();
434 delete wrapper;
435 }
436 kumpf 1.104 delete msg;
|
437 mday 1.39 }
438
|
439 kumpf 1.123 msg = op->removeResponse();
|
440 kumpf 1.128 if (msg && (msg->getMask() & MessageMask::ha_async))
|
441 mday 1.39 {
|
442 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
443 {
444 AsyncLegacyOperationResult *wrapper =
445 static_cast<AsyncLegacyOperationResult *>(msg);
446 msg = wrapper->get_result();
447 delete wrapper;
448 }
449 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
450 {
451 AsyncModuleOperationResult *wrapper =
452 static_cast<AsyncModuleOperationResult *>(msg);
453 msg = wrapper->get_result();
454 delete wrapper;
455 }
|
456 mday 1.39 }
457 void (*callback)(Message *, void *, void *) = op->__async_callback;
458 void *handle = op->_callback_handle;
459 void *parm = op->_callback_parameter;
460 op->release();
461 return_op(op);
462 callback(msg, handle, parm);
463 }
|
464 kumpf 1.104 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
|
465 mday 1.39 {
|
466 kumpf 1.104 // note that _callback_node may be different from op
467 // op->_callback_response_q is a "this" pointer we can use for
|
468 mday 1.39 // static callback methods
469 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
470 }
|
471 mday 1.22 }
472
|
473 mday 1.1
|
474 kumpf 1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
|
475 mday 1.1 {
|
476 kumpf 1.104 if (operation != 0)
|
477 mday 1.5 {
|
478 kumpf 1.104
479 // ATTN: optimization
|
480 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
481 mday 1.6 operation->lock();
|
482 kumpf 1.104
|
483 kumpf 1.123 Message *rq = operation->_request.get();
|
484 kumpf 1.104
|
485 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
486 kumpf 1.104 // move this to the bottom of the loop when the majority of
487 // messages become async messages.
|
488 mday 1.31
|
489 mday 1.18 // divert legacy messages to handleEnqueue
|
490 kumpf 1.128 if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
|
491 mday 1.18 {
|
492 kumpf 1.123 operation->_request.release();
|
493 kumpf 1.104 operation->unlock();
494 // delete the op node
495 operation->release();
496 return_op(operation);
|
497 mday 1.24
|
498 kumpf 1.104 handleEnqueue(rq);
499 return;
|
500 mday 1.18 }
501
|
502 kumpf 1.104 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
503 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
504 (operation->_state & ASYNC_OPSTATE_COMPLETE))
|
505 mday 1.29 {
|
506 kumpf 1.104 operation->unlock();
507 _handle_async_callback(operation);
|
508 mday 1.29 }
|
509 kumpf 1.104 else
|
510 mday 1.29 {
|
511 kumpf 1.104 PEGASUS_ASSERT(rq != 0);
512 operation->unlock();
513 _handle_async_request(static_cast<AsyncRequest *>(rq));
|
514 mday 1.29 }
|
515 mday 1.5 }
516 return;
|
517 mday 1.1 }
518
519 void MessageQueueService::_handle_async_request(AsyncRequest *req)
520 {
|
521 kumpf 1.104 if (req != 0)
|
522 mday 1.4 {
523 req->op->processing();
|
524 kumpf 1.104
|
525 mday 1.4 Uint32 type = req->getType();
|
526 kumpf 1.104 if (type == async_messages::HEARTBEAT)
527 handle_heartbeat_request(req);
|
528 mday 1.4 else if (type == async_messages::IOCTL)
|
529 kumpf 1.104 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
|
530 mday 1.4 else if (type == async_messages::CIMSERVICE_START)
|
531 kumpf 1.104 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
|
532 mday 1.4 else if (type == async_messages::CIMSERVICE_STOP)
|
533 kumpf 1.104 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
|
534 mday 1.4 else if (type == async_messages::CIMSERVICE_PAUSE)
|
535 kumpf 1.104 handle_CimServicePause(static_cast<CimServicePause *>(req));
|
536 mday 1.4 else if (type == async_messages::CIMSERVICE_RESUME)
|
537 kumpf 1.104 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
538 else if (type == async_messages::ASYNC_OP_START)
539 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
540 else
|
541 mday 1.4 {
|
542 kumpf 1.104 // we don't handle this request message
543 _make_response(req, async_results::CIM_NAK);
|
544 mday 1.4 }
|
545 mday 1.1 }
546 }
547
|
548 mday 1.17
549 Boolean MessageQueueService::_enqueueResponse(
|
550 kumpf 1.104 Message* request,
|
551 mday 1.17 Message* response)
552 {
|
553 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
554 "MessageQueueService::_enqueueResponse");
|
555 mday 1.25
|
556 kumpf 1.128 if (request->getMask() & MessageMask::ha_async)
|
557 mday 1.25 {
|
558 kumpf 1.128 if (response->getMask() & MessageMask::ha_async)
|
559 mday 1.25 {
|
560 kumpf 1.104 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
561 static_cast<AsyncReply *>(response),
562 ASYNC_OPSTATE_COMPLETE, 0);
|
563 kumpf 1.37 PEG_METHOD_EXIT();
|
564 kumpf 1.104 return true;
|
565 mday 1.25 }
566 }
|
567 kumpf 1.104
568 if (request->_async != 0)
|
569 mday 1.17 {
570 Uint32 mask = request->_async->getMask();
|
571 kumpf 1.128 PEGASUS_ASSERT(mask & (MessageMask::ha_async | MessageMask::ha_request));
|
572 kumpf 1.104
|
573 mday 1.18 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
574 AsyncOpNode *op = async->op;
575 request->_async = 0;
|
576 mday 1.63 // the legacy request is going to be deleted by its handler
|
577 kumpf 1.104 // remove it from the op node
|
578 mday 1.63
579 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
580 kumpf 1.104
581 AsyncLegacyOperationResult *async_result =
582 new AsyncLegacyOperationResult(
583 op,
584 response);
585 _completeAsyncResponse(
586 async,
587 async_result,
588 ASYNC_OPSTATE_COMPLETE,
589 0);
|
590 kumpf 1.37 PEG_METHOD_EXIT();
|
591 mday 1.18 return true;
|
592 mday 1.17 }
|
593 kumpf 1.104
|
594 mday 1.18 // ensure that the destination queue is in response->dest
|
595 kumpf 1.37 PEG_METHOD_EXIT();
|
596 mday 1.24 return SendForget(response);
|
597 kumpf 1.104
|
598 mday 1.17 }
599
|
600 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
601 mday 1.1 {
|
602 mday 1.19 cimom::_make_response(req, code);
|
603 mday 1.1 }
604
605
|
606 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
607 AsyncRequest *request,
608 AsyncReply *reply,
609 Uint32 state,
610 Uint32 flag)
|
611 mday 1.5 {
|
612 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
613 "MessageQueueService::_completeAsyncResponse");
614
|
615 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
616 kumpf 1.37
617 PEG_METHOD_EXIT();
|
618 mday 1.5 }
619
620
|
621 kumpf 1.104 void MessageQueueService::_complete_op_node(
622 AsyncOpNode *op,
623 Uint32 state,
624 Uint32 flag,
625 Uint32 code)
|
626 mday 1.32 {
627 cimom::_complete_op_node(op, state, flag, code);
628 }
629
|
630 mday 1.5
631 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
632 {
|
633 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
634 mday 1.8 return false;
|
635 kumpf 1.104 if (_polling_thread == NULL)
636 {
637 _polling_thread = new Thread(
638 polling_routine,
|
639 mike 1.120 reinterpret_cast<void *>(_get_polling_list()),
|
640 kumpf 1.104 false);
|
641 konrad.r 1.113 ThreadStatus tr = PEGASUS_THREAD_OK;
642 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
|
643 a.arora 1.93 {
|
644 denise.eckstein 1.116 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
|
645 mike 1.124 Threads::yield();
|
646 denise.eckstein 1.116 else
647 throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
648 "Could not allocate thread for the polling thread."));
|
649 a.arora 1.93 }
|
650 kumpf 1.104 }
651 // ATTN optimization remove the message checking altogether in the base
|
652 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
653 mday 1.6 op->lock();
|
654 kumpf 1.123 Message *rq = op->_request.get();
655 Message *rp = op->_response.get();
|
656 mday 1.6 op->unlock();
|
657 kumpf 1.104
658 if ((rq != 0 && (true == messageOK(rq))) ||
|
659 mike 1.118 (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
|
660 mday 1.5 {
|
661 mike 1.120 _incoming.enqueue_wait(op);
|
662 mday 1.53 _polling_sem.signal();
|
663 mday 1.5 return true;
664 }
665 return false;
666 }
667
668 Boolean MessageQueueService::messageOK(const Message *msg)
669 {
|
670 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
671 mday 1.8 return false;
|
672 mday 1.18 return true;
673 }
674
|
675 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
676 {
|
677 kumpf 1.104 // default action is to echo a heartbeat response
678
679 AsyncReply *reply = new AsyncReply(
680 async_messages::HEARTBEAT,
681 0,
682 req->op,
683 async_results::OK,
684 req->resp,
685 false);
686 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
687 mday 1.1 }
688
689
690 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
|
691 kumpf 1.104 {
|
692 mday 1.1 }
|
693 kumpf 1.104
|
694 mday 1.1 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
695 {
|
696 kumpf 1.104 switch (req->ctl)
|
697 mday 1.8 {
698 case AsyncIoctl::IO_CLOSE:
699 {
|
700 kumpf 1.104 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
701 kumpf 1.81
702 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
703 kumpf 1.104 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
704 kumpf 1.81 #endif
|
705 kumpf 1.104
706 // respond to this message. this is fire and forget, so we don't need to delete anything.
707 // this takes care of two problems that were being found
708 // << Thu Oct 9 10:52:48 2003 mdd >>
709 _make_response(req, async_results::OK);
710 // ensure we do not accept any further messages
711
712 // ensure we don't recurse on IO_CLOSE
|
713 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
714 kumpf 1.104 break;
715
716 // set the closing flag
717 service->_incoming_queue_shutdown = 1;
718 // empty out the queue
719 while (1)
720 {
721 AsyncOpNode *operation;
722 try
723 {
|
724 mike 1.120 operation = service->_incoming.dequeue();
|
725 kumpf 1.104 }
726 catch(IPCException &)
727 {
728 break;
729 }
730 if (operation)
731 {
732 operation->_service_ptr = service;
733 service->_handle_incoming_operation(operation);
734 }
735 else
736 break;
737 } // message processing loop
738
|
739 mike 1.120 // shutdown the AsyncQueue
|
740 mike 1.124 service->_incoming.close();
|
741 kumpf 1.104 return;
|
742 mday 1.8 }
743
744 default:
|
745 kumpf 1.104 _make_response(req, async_results::CIM_NAK);
|
746 mday 1.8 }
|
747 mday 1.1 }
|
748 mday 1.8
|
749 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
750 {
|
751 mday 1.79
|
752 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
753 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
754 kumpf 1.81 #endif
|
755 kumpf 1.104
|
756 mday 1.10 // clear the stoped bit and update
|
757 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
758 mday 1.10 _make_response(req, async_results::OK);
|
759 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
760 mday 1.10 update_service(_capabilities, _mask);
761
|
762 mday 1.1 }
763 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
764 {
|
765 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
766 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
767 kumpf 1.81 #endif
|
768 mday 1.10 // set the stopeed bit and update
769 _capabilities |= module_capabilities::stopped;
770 _make_response(req, async_results::CIM_STOPPED);
|
771 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
772 mday 1.10 update_service(_capabilities, _mask);
|
773 mday 1.1 }
|
774 kumpf 1.104
|
775 mday 1.1 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
776 {
|
777 mday 1.10 // set the paused bit and update
|
778 mday 1.13 _capabilities |= module_capabilities::paused;
|
779 mday 1.11 update_service(_capabilities, _mask);
|
780 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
|
781 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
782 mday 1.1 }
|
783 kumpf 1.104
|
784 mday 1.1 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
785 {
|
786 mday 1.10 // clear the paused bit and update
|
787 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
788 mday 1.11 update_service(_capabilities, _mask);
|
789 mday 1.10 _make_response(req, async_results::OK);
|
790 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
791 mday 1.1 }
|
792 kumpf 1.104
|
793 mday 1.1 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
794 {
795 _make_response(req, async_results::CIM_NAK);
796 }
797
798 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
799 {
|
800 mday 1.14 ;
801 }
802
|
803 mday 1.10
|
804 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
805 {
806 // remove the legacy message from the request and enqueue it to its destination
807 Uint32 result = async_results::CIM_NAK;
|
808 kumpf 1.104
|
809 mday 1.25 Message *legacy = req->_act;
|
810 kumpf 1.104 if (legacy != 0)
|
811 mday 1.14 {
|
812 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
813 kumpf 1.104 if (queue != 0)
|
814 mday 1.14 {
|
815 kumpf 1.104 if (queue->isAsync() == true)
816 {
817 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
818 }
819 else
820 {
821 // Enqueue the response:
822 queue->enqueue(req->get_action());
823 }
824
825 result = async_results::OK;
|
826 mday 1.14 }
827 }
828 _make_response(req, result);
829 }
830
831 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
832 {
833 ;
|
834 mday 1.1 }
835
|
836 kumpf 1.104 AsyncOpNode *MessageQueueService::get_op()
|
837 mday 1.1 {
|
838 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
839 kumpf 1.104
|
840 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
841 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
842 kumpf 1.104
|
843 mday 1.1 return op;
844 }
845
846 void MessageQueueService::return_op(AsyncOpNode *op)
847 {
|
848 kumpf 1.123 PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
|
849 mday 1.4 delete op;
|
850 mday 1.1 }
851
|
852 mday 1.18
|
853 kumpf 1.104 Boolean MessageQueueService::SendAsync(
854 AsyncOpNode *op,
855 Uint32 destination,
856 void (*callback)(AsyncOpNode *, MessageQueue *, void *),
857 MessageQueue *callback_response_q,
858 void *callback_ptr)
859 {
860 PEGASUS_ASSERT(op != 0 && callback != 0);
861
|
862 mday 1.21 // get the queue handle for the destination
863
|
864 mday 1.30 op->lock();
|
865 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
866 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
867 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
|
868 mday 1.30 // initialize the callback data
|
869 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
870 op->_callback_node = op; // the op node
871 op->_callback_response_q = callback_response_q; // the queue that will receive the response
872 op->_callback_ptr = callback_ptr; // user data for callback
873 op->_callback_request_q = this; // I am the originator of this request
|
874 kumpf 1.104
|
875 mday 1.30 op->unlock();
|
876 kumpf 1.104 if (op->_op_dest == 0)
|
877 mday 1.30 return false;
|
878 kumpf 1.104
|
879 mday 1.21 return _meta_dispatcher->route_async(op);
|
880 mday 1.18 }
881
882
|
883 kumpf 1.104 Boolean MessageQueueService::SendAsync(
884 Message *msg,
885 Uint32 destination,
886 void (*callback)(Message *response, void *handle, void *parameter),
887 void *handle,
888 void *parameter)
|
889 mday 1.39 {
|
890 kumpf 1.104 if (msg == NULL)
|
891 mday 1.39 return false;
|
892 kumpf 1.104 if (callback == NULL)
|
893 mday 1.39 return SendForget(msg);
894 AsyncOpNode *op = get_op();
|
895 mday 1.43 msg->dest = destination;
|
896 kumpf 1.104 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
|
897 mday 1.39 {
898 op->release();
899 return_op(op);
900 return false;
901 }
902 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
903 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
904 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
905 op->__async_callback = callback;
906 op->_callback_node = op;
907 op->_callback_handle = handle;
908 op->_callback_parameter = parameter;
909 op->_callback_response_q = this;
910
|
911 kumpf 1.128 if (!(msg->getMask() & MessageMask::ha_async))
|
912 mday 1.39 {
|
913 kumpf 1.104 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
914 op,
915 destination,
916 msg,
917 destination);
|
918 mday 1.39 }
|
919 kumpf 1.104 else
|
920 mday 1.39 {
|
921 kumpf 1.123 op->_request.reset(msg);
|
922 mday 1.39 (static_cast<AsyncMessage *>(msg))->op = op;
923 }
924 return _meta_dispatcher->route_async(op);
925 }
926
927
|
928 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
929 {
930 AsyncOpNode *op = 0;
|
931 mday 1.22 Uint32 mask = msg->getMask();
|
932 kumpf 1.104
|
933 kumpf 1.128 if (mask & MessageMask::ha_async)
|
934 mday 1.18 {
935 op = (static_cast<AsyncMessage *>(msg))->op ;
936 }
|
937 mday 1.22
|
938 kumpf 1.104 if (op == 0)
|
939 mday 1.20 {
|
940 mday 1.18 op = get_op();
|
941 kumpf 1.123 op->_request.reset(msg);
|
942 kumpf 1.128 if (mask & MessageMask::ha_async)
|
943 kumpf 1.104 {
944 (static_cast<AsyncMessage *>(msg))->op = op;
945 }
|
946 mday 1.20 }
|
947 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
948 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
949 kumpf 1.104 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
950 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
951 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
952 kumpf 1.104 if (op->_op_dest == 0)
|
953 mday 1.39 {
954 op->release();
955 return_op(op);
|
956 mday 1.21 return false;
|
957 mday 1.39 }
|
958 kumpf 1.104
|
959 mday 1.18 // now see if the meta dispatcher will take it
|
960 mday 1.30 return _meta_dispatcher->route_async(op);
|
961 mday 1.18 }
|
962 mday 1.2
|
963 mday 1.1
|
964 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
965 mday 1.1 {
|
966 kumpf 1.104 if (request == 0)
|
967 mday 1.4 return 0 ;
|
968 mday 1.5
969 Boolean destroy_op = false;
|
970 kumpf 1.104
|
971 s.hills 1.80 if (request->op == 0)
|
972 mday 1.5 {
973 request->op = get_op();
|
974 kumpf 1.123 request->op->_request.reset(request);
|
975 mday 1.5 destroy_op = true;
976 }
|
977 kumpf 1.104
|
978 mday 1.33 request->block = false;
|
979 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
980 kumpf 1.104 SendAsync(
981 request->op,
982 request->dest,
983 _sendwait_callback,
984 this,
985 (void *)0);
986
|
987 kumpf 1.130 request->op->_client_sem.wait();
|
988 baggio.br 1.100
|
989 kumpf 1.123 AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
|
990 mday 1.6 rpl->op = 0;
|
991 kumpf 1.104
992 if (destroy_op == true)
|
993 mday 1.5 {
|
994 mday 1.6 request->op->lock();
|
995 kumpf 1.123 request->op->_request.release();
|
996 mday 1.6 request->op->_state |= ASYNC_OPSTATE_RELEASED;
997 request->op->unlock();
998 return_op(request->op);
|
999 mday 1.7 request->op = 0;
|
1000 mday 1.5 }
1001 return rpl;
|
1002 mday 1.1 }
1003
1004
|
1005 kumpf 1.104 Boolean MessageQueueService::register_service(
1006 String name,
1007 Uint32 capabilities,
1008 Uint32 mask)
1009 {
1010 RegisterCimService *msg = new RegisterCimService(
1011 0,
1012 true,
1013 name,
1014 capabilities,
1015 mask,
1016 _queueId);
1017 msg->dest = CIMOM_Q_ID;
|
1018 mday 1.1
1019 Boolean registered = false;
|
1020 kumpf 1.104 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1021
1022 if (reply != 0)
1023 {
|
1024 kumpf 1.128 if (reply->getMask() & MessageMask::ha_async)
|
1025 kumpf 1.104 {
|
1026 kumpf 1.128 if (reply->getMask() & MessageMask::ha_reply)
|
1027 kumpf 1.104 {
1028 if (reply->result == async_results::OK ||
1029 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1030 {
1031 registered = true;
1032 }
1033 }
|
1034 mday 1.1 }
|
1035 kumpf 1.104
1036 delete reply;
|
1037 mday 1.1 }
|
1038 mday 1.5 delete msg;
|
1039 mday 1.1 return registered;
1040 }
1041
1042 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1043 {
|
1044 kumpf 1.104 UpdateCimService *msg = new UpdateCimService(
1045 0,
1046 true,
1047 _queueId,
1048 _capabilities,
1049 _mask);
|
1050 mday 1.1 Boolean registered = false;
|
1051 mday 1.2
1052 AsyncMessage *reply = SendWait(msg);
1053 if (reply)
|
1054 mday 1.1 {
|
1055 kumpf 1.128 if (reply->getMask() & MessageMask::ha_async)
|
1056 mday 1.1 {
|
1057 kumpf 1.128 if (reply->getMask() & MessageMask::ha_reply)
|
1058 kumpf 1.104 {
1059 if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1060 {
1061 registered = true;
1062 }
1063 }
|
1064 mday 1.1 }
1065 delete reply;
1066 }
|
1067 mday 1.5 delete msg;
|
1068 mday 1.1 return registered;
1069 }
1070
1071
|
1072 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1073 mday 1.1 {
|
1074 mday 1.3
|
1075 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1076 return true;
|
1077 mday 1.1 }
1078
1079
|
1080 kumpf 1.104 void MessageQueueService::find_services(
1081 String name,
1082 Uint32 capabilities,
1083 Uint32 mask,
1084 Array<Uint32> *results)
|
1085 mday 1.1 {
|
1086 kumpf 1.104 if (results == 0)
1087 {
|
1088 mday 1.1 throw NullPointer();
|
1089 kumpf 1.104 }
1090
|
1091 mday 1.1 results->clear();
|
1092 kumpf 1.104
1093 FindServiceQueue *req = new FindServiceQueue(
1094 0,
1095 _queueId,
1096 true,
1097 name,
1098 capabilities,
1099 mask);
1100
|
1101 mday 1.44 req->dest = CIMOM_Q_ID;
|
1102 kumpf 1.104
1103 AsyncMessage *reply = SendWait(req);
1104 if (reply)
1105 {
|
1106 kumpf 1.128 if (reply->getMask() & MessageMask::ha_async)
|
1107 kumpf 1.104 {
|
1108 kumpf 1.128 if (reply->getMask() & MessageMask::ha_reply)
|
1109 kumpf 1.104 {
1110 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1111 {
1112 if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1113 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1114 }
1115 }
|
1116 mday 1.1 }
1117 delete reply;
1118 }
|
1119 mday 1.5 delete req;
|
1120 mday 1.1 return ;
1121 }
1122
1123 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1124 {
|
1125 kumpf 1.104 if (result == 0)
1126 {
|
1127 mday 1.1 throw NullPointer();
|
1128 kumpf 1.104 }
1129
1130 EnumerateService *req = new EnumerateService(
1131 0,
1132 _queueId,
1133 true,
1134 queue);
1135
|
1136 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1137 kumpf 1.104
|
1138 mday 1.2 if (reply)
|
1139 mday 1.1 {
1140 Boolean found = false;
|
1141 kumpf 1.104
|
1142 kumpf 1.128 if (reply->getMask() & MessageMask::ha_async)
|
1143 mday 1.1 {
|
1144 kumpf 1.128 if (reply->getMask() & MessageMask::ha_reply)
|
1145 kumpf 1.104 {
1146 if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1147 {
1148 if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1149 {
1150 if (found == false)
1151 {
1152 found = true;
1153
1154 result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1155 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1156 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1157 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1158 }
1159 }
1160 }
1161 }
|
1162 mday 1.1 }
1163 delete reply;
1164 }
|
1165 mday 1.5 delete req;
|
1166 kumpf 1.104
|
1167 mday 1.1 return;
1168 }
1169
|
1170 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1171 {
1172 _polling_list_mutex.lock();
1173
1174 if (!_polling_list)
1175 _polling_list = new PollingList;
1176
1177 _polling_list_mutex.unlock();
|
1178 kumpf 1.104
|
1179 mike 1.120 return _polling_list;
|
1180 mday 1.1 }
1181
1182 PEGASUS_NAMESPACE_END
|