1 karl 1.91 //%2005////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mday 1.1 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 karl 1.82 //
|
19 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By:
|
33 a.arora 1.93 // Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
|
34 joyce.j 1.101 // Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
|
35 jim.wunderlich 1.114 // Jim Wunderlich (Jim_Wunderlich@prodigy.net)
|
36 mday 1.1 //
37 //%/////////////////////////////////////////////////////////////////////////////
38
|
39 jim.wunderlich 1.111 // #include <iostream.h>
|
40 mday 1.1 #include "MessageQueueService.h"
|
41 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
42 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
43 mday 1.1
44 PEGASUS_NAMESPACE_BEGIN
45
|
46 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
47 AtomicInt MessageQueueService::_service_count = 0;
48 AtomicInt MessageQueueService::_xid(1);
|
49 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
50 mday 1.15
|
51 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
52 mday 1.53
|
53 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
54 mday 1.53
55 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
56
|
57 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
58 mday 1.61
|
59 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
60 mday 1.69 {
61 return _thread_pool;
62 }
|
63 jim.wunderlich 1.110 //
64 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
65 //
66 // JR Wunderlich Jun 6, 2005
67 //
68
69 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
70 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
71 jim.wunderlich 1.110
72 Uint32 max_threads_per_svc_queue;
|
73 mday 1.69
|
74 kumpf 1.104 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
75 MessageQueueService::kill_idle_threads(void *parm)
|
76 mday 1.53 {
|
77 mday 1.69
78 static struct timeval now, last = {0,0};
|
79 mday 1.53 gettimeofday(&now, NULL);
|
80 mday 1.56 int dead_threads = 0;
|
81 kumpf 1.104
82 if (now.tv_sec - last.tv_sec > 120)
|
83 mday 1.53 {
84 gettimeofday(&last, NULL);
|
85 kumpf 1.104 try
|
86 mday 1.56 {
|
87 kumpf 1.104 dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
|
88 mday 1.56 }
|
89 mday 1.69 catch(...)
|
90 mday 1.56 {
91
92 }
|
93 mday 1.53 }
|
94 jenny.yu 1.92
|
95 w.otsuka 1.94 #ifdef PEGASUS_POINTER_64BIT
|
96 jenny.yu 1.92 return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
97 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
98 return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
99 #else
100 return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
101 #endif
|
102 mday 1.53 }
103
104 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
105 {
106 Thread *myself = reinterpret_cast<Thread *>(parm);
107 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
|
108 kumpf 1.104 while (_stop_polling.value() == 0)
|
109 mday 1.53 {
|
110 baggio.br 1.100 _polling_sem.wait();
111
|
112 kumpf 1.104 if (_stop_polling.value() != 0)
|
113 mday 1.61 {
|
114 kumpf 1.62 break;
|
115 mday 1.61 }
|
116 kumpf 1.104
|
117 denise.eckstein 1.116 // The polling_routine thread must hold the lock on the
118 // _polling_thread list while processing incoming messages.
119 // This lock is used to give this thread ownership of
120 // services on the _polling_routine list.
121
122 // This is necessary to avoid confict with other threads
123 // processing the _polling_list
124 // (e.g., MessageQueueServer::~MessageQueueService).
125
|
126 mday 1.53 list->lock();
127 MessageQueueService *service = list->next(0);
|
128 denise.eckstein 1.116 ThreadStatus rtn = PEGASUS_THREAD_OK;
129 while (service != NULL)
130 {
131 if ((service->_incoming.count() > 0) &&
132 (service->_die.value() == 0) &&
133 (service->_threads < max_threads_per_svc_queue))
134 {
135 // The _threads count is used to track the
136 // number of active threads that have been allocated
137 // to process messages for this service.
138
139 // The _threads count MUST be incremented while
140 // the polling_routine owns the _polling_thread
141 // lock and has ownership of the service object.
142
143 service->_threads++;
144 try
145 {
146 rtn = _thread_pool->allocate_and_awaken(
147 service, _req_proc, &_polling_sem);
148 }
149 denise.eckstein 1.116 catch (...)
150 {
151 service->_threads--;
152
153 // allocate_and_awaken should never generate an exception.
154 PEGASUS_ASSERT(0);
155 }
156 // if no more threads available, break from processing loop
157 if (rtn != PEGASUS_THREAD_OK )
158 {
159 service->_threads--;
160 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
161 "Not enough threads to process this request. Skipping.");
162
163 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
164 "Could not allocate thread for %s. " \
165 "Queue has %d messages waiting and %d threads servicing." \
166 "Skipping the service for right now. ",
167 service->getQueueName(),
168 service->_incoming.count(),
169 service->_threads.value());
170 denise.eckstein 1.116
171 pegasus_yield();
172 service = NULL;
173 }
174 }
175 if (service != NULL)
176 {
177 service = list->next(service);
178 }
179 }
|
180 mday 1.53 list->unlock();
|
181 jim.wunderlich 1.110
|
182 kumpf 1.104 if (_check_idle_flag.value() != 0)
|
183 mday 1.69 {
|
184 kumpf 1.104 _check_idle_flag = 0;
|
185 denise.eckstein 1.116 // try to do idle thread clean up processing when system is not busy
186 // if system is busy there may not be a thread available to allocate
187 // so nothing will be done and that is OK.
|
188 kumpf 1.84
|
189 konrad.r 1.113 if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)
|
190 denise.eckstein 1.116 {
191 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
192 "Not enough threads to kill idle threads. What an irony.");
|
193 konrad.r 1.113
|
194 denise.eckstein 1.116 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
195 "Could not allocate thread to kill idle threads." \
196 "Skipping. ");
197 }
198
199
|
200 mday 1.69 }
|
201 mday 1.53 }
202 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
203 return(0);
204 }
205
206
207 Semaphore MessageQueueService::_polling_sem(0);
208 AtomicInt MessageQueueService::_stop_polling(0);
|
209 mday 1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
|
210 mday 1.53
|
211 mday 1.15
|
212 kumpf 1.104 MessageQueueService::MessageQueueService(
213 const char *name,
214 Uint32 queueID,
215 Uint32 capabilities,
216 Uint32 mask)
|
217 mday 1.15 : Base(name, true, queueID),
|
218 mday 1.1 _mask(mask),
|
219 mday 1.4 _die(0),
|
220 denise.eckstein 1.116 _threads(0),
|
221 mday 1.41 _incoming(true, 0),
|
222 konrad.r 1.106 _incoming_queue_shutdown(0)
|
223 kumpf 1.104 {
|
224 jim.wunderlich 1.110
|
225 kumpf 1.104 _capabilities = (capabilities | module_capabilities::async);
|
226 mday 1.53
|
227 mday 1.1 _default_op_timeout.tv_sec = 30;
228 _default_op_timeout.tv_usec = 100;
|
229 mday 1.15
|
230 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
231
232 // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT
233 // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
234
235 if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)
236 {
237 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
238 }
239
240 // if requested threads eq 0 (unlimited)
241 // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
242
243 if (max_threads_per_svc_queue == 0)
244 {
|
245 jim.wunderlich 1.114 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_DEFAULT;
|
246 jim.wunderlich 1.110 }
247
248 // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;
249 // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;
250
251
|
252 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
253 kumpf 1.104
254 if (_meta_dispatcher == 0)
|
255 mday 1.15 {
|
256 joyce.j 1.101 _stop_polling = 0;
|
257 kumpf 1.104 PEGASUS_ASSERT(_service_count.value() == 0);
|
258 mday 1.15 _meta_dispatcher = new cimom();
|
259 kumpf 1.104 if (_meta_dispatcher == NULL)
|
260 mday 1.15 {
|
261 a.arora 1.87 throw NullPointer();
|
262 mday 1.15 }
|
263 jim.wunderlich 1.110 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
264 // minimum_cnt, maximum_cnt, deallocateWait);
265 //
|
266 kumpf 1.99 _thread_pool =
|
267 kumpf 1.104 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
|
268 mday 1.15 }
269 _service_count++;
270
|
271 kumpf 1.104 if (false == register_service(name, _capabilities, _mask))
|
272 mday 1.15 {
|
273 humberto 1.71 //l10n
274 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
275 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
276 kumpf 1.104 "MessageQueueService Base Unable to register with Meta Dispatcher");
277
|
278 humberto 1.71 throw BindFailedException(parms);
|
279 mday 1.15 }
|
280 kumpf 1.104
|
281 mday 1.53 _polling_list.insert_last(this);
|
282 kumpf 1.104
|
283 mday 1.1 }
284
|
285 mday 1.4
|
286 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
287 mday 1.1 {
288 _die = 1;
|
289 mday 1.76
|
290 denise.eckstein 1.116 // The polling_routine locks the _polling_list while
291 // processing the incoming messages for services on the
292 // list. Deleting the service from the _polling_list
293 // prior to processing, avoids synchronization issues
294 // with the _polling_routine.
295
296 _polling_list.remove(this);
297
298 // ATTN: The code for closing the _incoming queue
299 // is not working correctly. In OpenPegasus 2.5,
300 // execution of the following code is very timing
301 // dependent. This needs to be fix.
302 // See Bug 4079 for details.
|
303 kumpf 1.104 if (_incoming_queue_shutdown.value() == 0)
|
304 mday 1.76 {
|
305 denise.eckstein 1.116 _shutdown_incoming_queue();
306 }
|
307 konrad.r 1.107
|
308 denise.eckstein 1.116 // Wait until all threads processing the messages
309 // for this service have completed.
310
311 while (_threads.value() > 0)
312 {
313 pegasus_yield();
|
314 mday 1.76 }
|
315 kumpf 1.104
|
316 mday 1.15 {
|
317 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
318 _service_count--;
|
319 kumpf 1.104 if (_service_count.value() == 0)
|
320 a.arora 1.87 {
|
321 mday 1.76
|
322 mday 1.53 _stop_polling++;
323 _polling_sem.signal();
|
324 kumpf 1.104 if (_polling_thread) {
325 _polling_thread->join();
326 delete _polling_thread;
327 _polling_thread = 0;
328 }
|
329 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
330 delete _meta_dispatcher;
|
331 kumpf 1.45 _meta_dispatcher = 0;
|
332 mday 1.76
|
333 kumpf 1.65 delete _thread_pool;
334 _thread_pool = 0;
|
335 a.arora 1.87 }
336 } // mutex unlocks here
|
337 kumpf 1.104 // Clean up in case there are extra stuff on the queue.
|
338 konrad.r 1.72 while (_incoming.count())
339 {
|
340 konrad.r 1.109 try {
341 delete _incoming.remove_first();
342 } catch (const ListClosed &e)
343 {
344 // If the list is closed, there is nothing we can do.
345 break;
346 }
|
347 konrad.r 1.72 }
|
348 kumpf 1.104 }
|
349 mday 1.1
|
350 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
351 mday 1.7 {
|
352 kumpf 1.104 if (_incoming_queue_shutdown.value() > 0)
353 return;
354
355 AsyncIoctl *msg = new AsyncIoctl(
356 get_next_xid(),
357 0,
358 _queueId,
359 _queueId,
360 true,
361 AsyncIoctl::IO_CLOSE,
362 0,
363 0);
|
364 mday 1.9
|
365 mday 1.8 msg->op = get_op();
|
366 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
367 kumpf 1.104 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
368 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
369 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
370 mday 1.41
|
371 mday 1.32 msg->op->_op_dest = this;
|
372 mday 1.8 msg->op->_request.insert_first(msg);
|
373 konrad.r 1.108 try {
374 _incoming.insert_last_wait(msg->op);
375 _polling_sem.signal();
376 } catch (const ListClosed &)
377 {
|
378 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
379 konrad.r 1.108 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
380 // processed.
381 delete msg;
382 }
|
383 konrad.r 1.112 catch (const Permission &)
384 {
385 delete msg;
386 }
|
387 mday 1.7 }
388
|
389 mday 1.22
390
|
391 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
392 mday 1.22 {
|
393 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
394
|
395 mday 1.22 Base::enqueue(msg);
|
396 kumpf 1.28
397 PEG_METHOD_EXIT();
|
398 mday 1.22 }
399
400
|
401 kumpf 1.103 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
402 void * parm)
403 {
|
404 konrad.r 1.107 MessageQueueService* service =
405 reinterpret_cast<MessageQueueService*>(parm);
406 PEGASUS_ASSERT(service != 0);
|
407 kumpf 1.103 try
408 {
409
410 if (service->_die.value() != 0)
411 {
|
412 denise.eckstein 1.116 service->_threads--;
|
413 kumpf 1.103 return (0);
414 }
415 // pull messages off the incoming queue and dispatch them. then
416 // check pending messages that are non-blocking
417 AsyncOpNode *operation = 0;
418
419 // many operations may have been queued.
420 do
421 {
422 try
423 {
424 operation = service->_incoming.remove_first();
425 }
426 catch (ListClosed &)
427 {
|
428 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
429 //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
430 // "Caught ListClosed exception. Exiting _req_proc.");
|
431 kumpf 1.103 break;
432 }
433
434 if (operation)
435 {
436 operation->_service_ptr = service;
437 service->_handle_incoming_operation(operation);
438 }
439 } while (operation);
440 }
441 catch (const Exception& e)
442 {
443 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
444 String("Caught exception: \"") + e.getMessage() +
445 "\". Exiting _req_proc.");
446 }
447 catch (...)
448 {
449 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
450 "Caught unrecognized exception. Exiting _req_proc.");
451 }
|
452 konrad.r 1.107 service->_threads--;
|
453 kumpf 1.103 return(0);
|
454 mday 1.1 }
455
|
456 mday 1.43
|
457 kumpf 1.104 void MessageQueueService::_sendwait_callback(
458 AsyncOpNode *op,
459 MessageQueue *q,
460 void *parm)
|
461 mday 1.33 {
462 op->_client_sem.signal();
463 }
464
|
465 mday 1.30
466 // callback function is responsible for cleaning up all resources
467 // including op, op->_callback_node, and op->_callback_ptr
|
468 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
469 {
|
470 kumpf 1.104 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
|
471 mday 1.39 {
472
473 Message *msg = op->get_request();
|
474 kumpf 1.104 if (msg && (msg->getMask() & message_mask::ha_async))
|
475 mday 1.39 {
|
476 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
477 {
478 AsyncLegacyOperationStart *wrapper =
479 static_cast<AsyncLegacyOperationStart *>(msg);
480 msg = wrapper->get_action();
481 delete wrapper;
482 }
483 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
484 {
485 AsyncModuleOperationStart *wrapper =
486 static_cast<AsyncModuleOperationStart *>(msg);
487 msg = wrapper->get_action();
488 delete wrapper;
489 }
490 else if (msg->getType() == async_messages::ASYNC_OP_START)
491 {
492 AsyncOperationStart *wrapper =
493 static_cast<AsyncOperationStart *>(msg);
494 msg = wrapper->get_action();
495 delete wrapper;
496 }
497 kumpf 1.104 delete msg;
|
498 mday 1.39 }
499
500 msg = op->get_response();
|
501 kumpf 1.104 if (msg && (msg->getMask() & message_mask::ha_async))
|
502 mday 1.39 {
|
503 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
504 {
505 AsyncLegacyOperationResult *wrapper =
506 static_cast<AsyncLegacyOperationResult *>(msg);
507 msg = wrapper->get_result();
508 delete wrapper;
509 }
510 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
511 {
512 AsyncModuleOperationResult *wrapper =
513 static_cast<AsyncModuleOperationResult *>(msg);
514 msg = wrapper->get_result();
515 delete wrapper;
516 }
|
517 mday 1.39 }
518 void (*callback)(Message *, void *, void *) = op->__async_callback;
519 void *handle = op->_callback_handle;
520 void *parm = op->_callback_parameter;
521 op->release();
522 return_op(op);
523 callback(msg, handle, parm);
524 }
|
525 kumpf 1.104 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
|
526 mday 1.39 {
|
527 kumpf 1.104 // note that _callback_node may be different from op
528 // op->_callback_response_q is a "this" pointer we can use for
|
529 mday 1.39 // static callback methods
530 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
531 }
|
532 mday 1.22 }
533
|
534 mday 1.1
|
535 kumpf 1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
|
536 mday 1.1 {
|
537 kumpf 1.104 if (operation != 0)
|
538 mday 1.5 {
|
539 kumpf 1.104
540 // ATTN: optimization
|
541 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
542 mday 1.6 operation->lock();
|
543 kumpf 1.104
|
544 mday 1.6 Message *rq = operation->_request.next(0);
|
545 kumpf 1.104
|
546 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
547 kumpf 1.104 // move this to the bottom of the loop when the majority of
548 // messages become async messages.
|
549 mday 1.31
|
550 mday 1.18 // divert legacy messages to handleEnqueue
|
551 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
552 mday 1.18 {
|
553 kumpf 1.104 rq = operation->_request.remove_first() ;
554 operation->unlock();
555 // delete the op node
556 operation->release();
557 return_op(operation);
|
558 mday 1.24
|
559 kumpf 1.104 handleEnqueue(rq);
560 return;
|
561 mday 1.18 }
562
|
563 kumpf 1.104 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
564 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
565 (operation->_state & ASYNC_OPSTATE_COMPLETE))
|
566 mday 1.29 {
|
567 kumpf 1.104 operation->unlock();
568 _handle_async_callback(operation);
|
569 mday 1.29 }
|
570 kumpf 1.104 else
|
571 mday 1.29 {
|
572 kumpf 1.104 PEGASUS_ASSERT(rq != 0);
573 operation->unlock();
574 _handle_async_request(static_cast<AsyncRequest *>(rq));
|
575 mday 1.29 }
|
576 mday 1.5 }
577 return;
|
578 mday 1.1 }
579
580 void MessageQueueService::_handle_async_request(AsyncRequest *req)
581 {
|
582 kumpf 1.104 if (req != 0)
|
583 mday 1.4 {
584 req->op->processing();
|
585 kumpf 1.104
|
586 mday 1.4 Uint32 type = req->getType();
|
587 kumpf 1.104 if (type == async_messages::HEARTBEAT)
588 handle_heartbeat_request(req);
|
589 mday 1.4 else if (type == async_messages::IOCTL)
|
590 kumpf 1.104 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
|
591 mday 1.4 else if (type == async_messages::CIMSERVICE_START)
|
592 kumpf 1.104 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
|
593 mday 1.4 else if (type == async_messages::CIMSERVICE_STOP)
|
594 kumpf 1.104 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
|
595 mday 1.4 else if (type == async_messages::CIMSERVICE_PAUSE)
|
596 kumpf 1.104 handle_CimServicePause(static_cast<CimServicePause *>(req));
|
597 mday 1.4 else if (type == async_messages::CIMSERVICE_RESUME)
|
598 kumpf 1.104 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
599 else if (type == async_messages::ASYNC_OP_START)
600 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
601 else
|
602 mday 1.4 {
|
603 kumpf 1.104 // we don't handle this request message
604 _make_response(req, async_results::CIM_NAK);
|
605 mday 1.4 }
|
606 mday 1.1 }
607 }
608
|
609 mday 1.17
610 Boolean MessageQueueService::_enqueueResponse(
|
611 kumpf 1.104 Message* request,
|
612 mday 1.17 Message* response)
613 {
|
614 w.white 1.102
615 STAT_COPYDISPATCHER
616
|
617 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
618 "MessageQueueService::_enqueueResponse");
|
619 mday 1.25
|
620 kumpf 1.104 if (request->getMask() & message_mask::ha_async)
|
621 mday 1.25 {
|
622 kumpf 1.104 if (response->getMask() & message_mask::ha_async)
|
623 mday 1.25 {
|
624 kumpf 1.104 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
625 static_cast<AsyncReply *>(response),
626 ASYNC_OPSTATE_COMPLETE, 0);
|
627 kumpf 1.37 PEG_METHOD_EXIT();
|
628 kumpf 1.104 return true;
|
629 mday 1.25 }
630 }
|
631 kumpf 1.104
632 if (request->_async != 0)
|
633 mday 1.17 {
634 Uint32 mask = request->_async->getMask();
|
635 kumpf 1.104 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));
636
|
637 mday 1.18 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
638 AsyncOpNode *op = async->op;
639 request->_async = 0;
|
640 mday 1.63 // the legacy request is going to be deleted by its handler
|
641 kumpf 1.104 // remove it from the op node
|
642 mday 1.63
643 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
644 kumpf 1.104
645 AsyncLegacyOperationResult *async_result =
646 new AsyncLegacyOperationResult(
647 async->getKey(),
648 async->getRouting(),
649 op,
650 response);
651 _completeAsyncResponse(
652 async,
653 async_result,
654 ASYNC_OPSTATE_COMPLETE,
655 0);
|
656 kumpf 1.37 PEG_METHOD_EXIT();
|
657 mday 1.18 return true;
|
658 mday 1.17 }
|
659 kumpf 1.104
|
660 mday 1.18 // ensure that the destination queue is in response->dest
|
661 kumpf 1.37 PEG_METHOD_EXIT();
|
662 mday 1.24 return SendForget(response);
|
663 kumpf 1.104
|
664 mday 1.17 }
665
|
666 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
667 mday 1.1 {
|
668 mday 1.19 cimom::_make_response(req, code);
|
669 mday 1.1 }
670
671
|
672 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
673 AsyncRequest *request,
674 AsyncReply *reply,
675 Uint32 state,
676 Uint32 flag)
|
677 mday 1.5 {
|
678 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
679 "MessageQueueService::_completeAsyncResponse");
680
|
681 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
682 kumpf 1.37
683 PEG_METHOD_EXIT();
|
684 mday 1.5 }
685
686
|
687 kumpf 1.104 void MessageQueueService::_complete_op_node(
688 AsyncOpNode *op,
689 Uint32 state,
690 Uint32 flag,
691 Uint32 code)
|
692 mday 1.32 {
693 cimom::_complete_op_node(op, state, flag, code);
694 }
695
|
696 mday 1.5
697 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
698 {
|
699 kumpf 1.104 if (_incoming_queue_shutdown.value() > 0)
|
700 mday 1.8 return false;
|
701 kumpf 1.104 if (_polling_thread == NULL)
702 {
703 _polling_thread = new Thread(
704 polling_routine,
705 reinterpret_cast<void *>(&_polling_list),
706 false);
|
707 konrad.r 1.113 ThreadStatus tr = PEGASUS_THREAD_OK;
708 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
|
709 a.arora 1.93 {
|
710 denise.eckstein 1.116 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
|
711 konrad.r 1.113 pegasus_yield();
|
712 denise.eckstein 1.116 else
713 throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
714 "Could not allocate thread for the polling thread."));
|
715 a.arora 1.93 }
|
716 kumpf 1.104 }
717 // ATTN optimization remove the message checking altogether in the base
|
718 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
719 mday 1.6 op->lock();
720 Message *rq = op->_request.next(0);
|
721 mday 1.20 Message *rp = op->_response.next(0);
|
722 mday 1.6 op->unlock();
|
723 kumpf 1.104
724 if ((rq != 0 && (true == messageOK(rq))) ||
725 (rp != 0 && (true == messageOK(rp))) && _die.value() == 0)
|
726 mday 1.5 {
|
727 mday 1.6 _incoming.insert_last_wait(op);
|
728 mday 1.53 _polling_sem.signal();
|
729 mday 1.5 return true;
730 }
731 return false;
732 }
733
734 Boolean MessageQueueService::messageOK(const Message *msg)
735 {
|
736 kumpf 1.104 if (_incoming_queue_shutdown.value() > 0)
|
737 mday 1.8 return false;
|
738 mday 1.18 return true;
739 }
740
|
741 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
742 {
|
743 kumpf 1.104 // default action is to echo a heartbeat response
744
745 AsyncReply *reply = new AsyncReply(
746 async_messages::HEARTBEAT,
747 req->getKey(),
748 req->getRouting(),
749 0,
750 req->op,
751 async_results::OK,
752 req->resp,
753 false);
754 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
755 mday 1.1 }
756
757
758 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
|
759 kumpf 1.104 {
|
760 mday 1.1 }
|
761 kumpf 1.104
|
762 mday 1.1 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
763 {
|
764 kumpf 1.104 switch (req->ctl)
|
765 mday 1.8 {
766 case AsyncIoctl::IO_CLOSE:
767 {
|
768 kumpf 1.104 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
769 kumpf 1.81
770 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
771 kumpf 1.104 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
772 kumpf 1.81 #endif
|
773 kumpf 1.104
774 // respond to this message. this is fire and forget, so we don't need to delete anything.
775 // this takes care of two problems that were being found
776 // << Thu Oct 9 10:52:48 2003 mdd >>
777 _make_response(req, async_results::OK);
778 // ensure we do not accept any further messages
779
780 // ensure we don't recurse on IO_CLOSE
781 if (_incoming_queue_shutdown.value() > 0)
782 break;
783
784 // set the closing flag
785 service->_incoming_queue_shutdown = 1;
786 // empty out the queue
787 while (1)
788 {
789 AsyncOpNode *operation;
790 try
791 {
792 operation = service->_incoming.remove_first();
793 }
794 kumpf 1.104 catch(IPCException &)
795 {
796 break;
797 }
798 if (operation)
799 {
800 operation->_service_ptr = service;
801 service->_handle_incoming_operation(operation);
802 }
803 else
804 break;
805 } // message processing loop
806
807 // shutdown the AsyncDQueue
808 service->_incoming.shutdown_queue();
809 return;
|
810 mday 1.8 }
811
812 default:
|
813 kumpf 1.104 _make_response(req, async_results::CIM_NAK);
|
814 mday 1.8 }
|
815 mday 1.1 }
|
816 mday 1.8
|
817 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
818 {
|
819 mday 1.79
|
820 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
821 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
822 kumpf 1.81 #endif
|
823 kumpf 1.104
|
824 mday 1.10 // clear the stoped bit and update
|
825 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
826 mday 1.10 _make_response(req, async_results::OK);
|
827 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
828 mday 1.10 update_service(_capabilities, _mask);
829
|
830 mday 1.1 }
831 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
832 {
|
833 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
834 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
835 kumpf 1.81 #endif
|
836 mday 1.10 // set the stopeed bit and update
837 _capabilities |= module_capabilities::stopped;
838 _make_response(req, async_results::CIM_STOPPED);
|
839 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
840 mday 1.10 update_service(_capabilities, _mask);
|
841 mday 1.1 }
|
842 kumpf 1.104
|
843 mday 1.1 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
844 {
|
845 mday 1.10 // set the paused bit and update
|
846 mday 1.13 _capabilities |= module_capabilities::paused;
|
847 mday 1.11 update_service(_capabilities, _mask);
|
848 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
|
849 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
850 mday 1.1 }
|
851 kumpf 1.104
|
852 mday 1.1 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
853 {
|
854 mday 1.10 // clear the paused bit and update
|
855 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
856 mday 1.11 update_service(_capabilities, _mask);
|
857 mday 1.10 _make_response(req, async_results::OK);
|
858 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
859 mday 1.1 }
|
860 kumpf 1.104
|
861 mday 1.1 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
862 {
863 _make_response(req, async_results::CIM_NAK);
864 }
865
866 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
867 {
|
868 mday 1.14 ;
869 }
870
|
871 mday 1.10
|
872 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
873 {
874 // remove the legacy message from the request and enqueue it to its destination
875 Uint32 result = async_results::CIM_NAK;
|
876 kumpf 1.104
|
877 mday 1.25 Message *legacy = req->_act;
|
878 kumpf 1.104 if (legacy != 0)
|
879 mday 1.14 {
|
880 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
881 kumpf 1.104 if (queue != 0)
|
882 mday 1.14 {
|
883 kumpf 1.104 if (queue->isAsync() == true)
884 {
885 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
886 }
887 else
888 {
889 // Enqueue the response:
890 queue->enqueue(req->get_action());
891 }
892
893 result = async_results::OK;
|
894 mday 1.14 }
895 }
896 _make_response(req, result);
897 }
898
899 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
900 {
901 ;
|
902 mday 1.1 }
903
|
904 kumpf 1.104 AsyncOpNode *MessageQueueService::get_op()
|
905 mday 1.1 {
|
906 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
907 kumpf 1.104
|
908 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
909 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
910 kumpf 1.104
|
911 mday 1.1 return op;
912 }
913
914 void MessageQueueService::return_op(AsyncOpNode *op)
915 {
|
916 kumpf 1.104 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);
|
917 mday 1.4 delete op;
|
918 mday 1.1 }
919
|
920 mday 1.18
|
921 kumpf 1.104 Boolean MessageQueueService::ForwardOp(
922 AsyncOpNode *op,
923 Uint32 destination)
|
924 mday 1.29 {
|
925 kumpf 1.104 PEGASUS_ASSERT(op != 0);
|
926 mday 1.30 op->lock();
927 op->_op_dest = MessageQueue::lookup(destination);
|
928 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
929 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
930 mday 1.30 op->unlock();
|
931 kumpf 1.104 if (op->_op_dest == 0)
|
932 mday 1.30 return false;
|
933 kumpf 1.104
|
934 mday 1.29 return _meta_dispatcher->route_async(op);
935 }
936
|
937 kumpf 1.104
938 Boolean MessageQueueService::SendAsync(
939 AsyncOpNode *op,
940 Uint32 destination,
941 void (*callback)(AsyncOpNode *, MessageQueue *, void *),
942 MessageQueue *callback_response_q,
943 void *callback_ptr)
944 {
945 PEGASUS_ASSERT(op != 0 && callback != 0);
946
|
947 mday 1.21 // get the queue handle for the destination
948
|
949 mday 1.30 op->lock();
|
950 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
951 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
952 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
|
953 mday 1.30 // initialize the callback data
|
954 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
955 op->_callback_node = op; // the op node
956 op->_callback_response_q = callback_response_q; // the queue that will receive the response
957 op->_callback_ptr = callback_ptr; // user data for callback
958 op->_callback_request_q = this; // I am the originator of this request
|
959 kumpf 1.104
|
960 mday 1.30 op->unlock();
|
961 kumpf 1.104 if (op->_op_dest == 0)
|
962 mday 1.30 return false;
|
963 kumpf 1.104
|
964 mday 1.21 return _meta_dispatcher->route_async(op);
|
965 mday 1.18 }
966
967
|
968 kumpf 1.104 Boolean MessageQueueService::SendAsync(
969 Message *msg,
970 Uint32 destination,
971 void (*callback)(Message *response, void *handle, void *parameter),
972 void *handle,
973 void *parameter)
|
974 mday 1.39 {
|
975 kumpf 1.104 if (msg == NULL)
|
976 mday 1.39 return false;
|
977 kumpf 1.104 if (callback == NULL)
|
978 mday 1.39 return SendForget(msg);
979 AsyncOpNode *op = get_op();
|
980 mday 1.43 msg->dest = destination;
|
981 kumpf 1.104 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
|
982 mday 1.39 {
983 op->release();
984 return_op(op);
985 return false;
986 }
987 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
988 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
989 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
990 op->__async_callback = callback;
991 op->_callback_node = op;
992 op->_callback_handle = handle;
993 op->_callback_parameter = parameter;
994 op->_callback_response_q = this;
995
|
996 kumpf 1.104 if (!(msg->getMask() & message_mask::ha_async))
|
997 mday 1.39 {
|
998 kumpf 1.104 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
999 get_next_xid(),
1000 op,
1001 destination,
1002 msg,
1003 destination);
|
1004 mday 1.39 }
|
1005 kumpf 1.104 else
|
1006 mday 1.39 {
1007 op->_request.insert_first(msg);
1008 (static_cast<AsyncMessage *>(msg))->op = op;
1009 }
1010 return _meta_dispatcher->route_async(op);
1011 }
1012
1013
|
1014 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
1015 {
1016 AsyncOpNode *op = 0;
|
1017 mday 1.22 Uint32 mask = msg->getMask();
|
1018 kumpf 1.104
|
1019 mday 1.22 if (mask & message_mask::ha_async)
|
1020 mday 1.18 {
1021 op = (static_cast<AsyncMessage *>(msg))->op ;
1022 }
|
1023 mday 1.22
|
1024 kumpf 1.104 if (op == 0)
|
1025 mday 1.20 {
|
1026 mday 1.18 op = get_op();
|
1027 mday 1.20 op->_request.insert_first(msg);
|
1028 mday 1.22 if (mask & message_mask::ha_async)
|
1029 kumpf 1.104 {
1030 (static_cast<AsyncMessage *>(msg))->op = op;
1031 }
|
1032 mday 1.20 }
|
1033 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
1034 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
1035 kumpf 1.104 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1036 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
1037 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
1038 kumpf 1.104 if (op->_op_dest == 0)
|
1039 mday 1.39 {
1040 op->release();
1041 return_op(op);
|
1042 mday 1.21 return false;
|
1043 mday 1.39 }
|
1044 kumpf 1.104
|
1045 mday 1.18 // now see if the meta dispatcher will take it
|
1046 mday 1.30 return _meta_dispatcher->route_async(op);
|
1047 mday 1.18 }
|
1048 mday 1.2
|
1049 mday 1.1
|
1050 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
1051 mday 1.1 {
|
1052 kumpf 1.104 if (request == 0)
|
1053 mday 1.4 return 0 ;
|
1054 mday 1.5
1055 Boolean destroy_op = false;
|
1056 kumpf 1.104
|
1057 s.hills 1.80 if (request->op == 0)
|
1058 mday 1.5 {
1059 request->op = get_op();
|
1060 mday 1.7 request->op->_request.insert_first(request);
|
1061 mday 1.5 destroy_op = true;
1062 }
|
1063 kumpf 1.104
|
1064 mday 1.33 request->block = false;
|
1065 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
1066 kumpf 1.104 SendAsync(
1067 request->op,
1068 request->dest,
1069 _sendwait_callback,
1070 this,
1071 (void *)0);
1072
|
1073 baggio.br 1.100 request->op->_client_sem.wait();
1074
|
1075 mday 1.6 request->op->lock();
1076 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
1077 rpl->op = 0;
1078 request->op->unlock();
|
1079 kumpf 1.104
1080 if (destroy_op == true)
|
1081 mday 1.5 {
|
1082 mday 1.6 request->op->lock();
1083 request->op->_request.remove(request);
1084 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1085 request->op->unlock();
1086 return_op(request->op);
|
1087 mday 1.7 request->op = 0;
|
1088 mday 1.5 }
1089 return rpl;
|
1090 mday 1.1 }
1091
1092
|
1093 kumpf 1.104 Boolean MessageQueueService::register_service(
1094 String name,
1095 Uint32 capabilities,
1096 Uint32 mask)
1097 {
1098 RegisterCimService *msg = new RegisterCimService(
1099 get_next_xid(),
1100 0,
1101 true,
1102 name,
1103 capabilities,
1104 mask,
1105 _queueId);
1106 msg->dest = CIMOM_Q_ID;
|
1107 mday 1.1
1108 Boolean registered = false;
|
1109 kumpf 1.104 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1110
1111 if (reply != 0)
1112 {
1113 if (reply->getMask() & message_mask::ha_async)
1114 {
1115 if (reply->getMask() & message_mask::ha_reply)
1116 {
1117 if (reply->result == async_results::OK ||
1118 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1119 {
1120 registered = true;
1121 }
1122 }
|
1123 mday 1.1 }
|
1124 kumpf 1.104
1125 delete reply;
|
1126 mday 1.1 }
|
1127 mday 1.5 delete msg;
|
1128 mday 1.1 return registered;
1129 }
1130
1131 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1132 {
|
1133 kumpf 1.104 UpdateCimService *msg = new UpdateCimService(
1134 get_next_xid(),
1135 0,
1136 true,
1137 _queueId,
1138 _capabilities,
1139 _mask);
|
1140 mday 1.1 Boolean registered = false;
|
1141 mday 1.2
1142 AsyncMessage *reply = SendWait(msg);
1143 if (reply)
|
1144 mday 1.1 {
|
1145 kumpf 1.104 if (reply->getMask() & message_mask::ha_async)
|
1146 mday 1.1 {
|
1147 kumpf 1.104 if (reply->getMask() & message_mask::ha_reply)
1148 {
1149 if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1150 {
1151 registered = true;
1152 }
1153 }
|
1154 mday 1.1 }
1155 delete reply;
1156 }
|
1157 mday 1.5 delete msg;
|
1158 mday 1.1 return registered;
1159 }
1160
1161
|
1162 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1163 mday 1.1 {
|
1164 mday 1.3
|
1165 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1166 return true;
|
1167 mday 1.1 }
1168
1169
|
1170 kumpf 1.104 void MessageQueueService::find_services(
1171 String name,
1172 Uint32 capabilities,
1173 Uint32 mask,
1174 Array<Uint32> *results)
|
1175 mday 1.1 {
|
1176 kumpf 1.104 if (results == 0)
1177 {
|
1178 mday 1.1 throw NullPointer();
|
1179 kumpf 1.104 }
1180
|
1181 mday 1.1 results->clear();
|
1182 kumpf 1.104
1183 FindServiceQueue *req = new FindServiceQueue(
1184 get_next_xid(),
1185 0,
1186 _queueId,
1187 true,
1188 name,
1189 capabilities,
1190 mask);
1191
|
1192 mday 1.44 req->dest = CIMOM_Q_ID;
|
1193 kumpf 1.104
1194 AsyncMessage *reply = SendWait(req);
1195 if (reply)
1196 {
1197 if (reply->getMask() & message_mask::ha_async)
1198 {
1199 if (reply->getMask() & message_mask::ha_reply)
1200 {
1201 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1202 {
1203 if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1204 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1205 }
1206 }
|
1207 mday 1.1 }
1208 delete reply;
1209 }
|
1210 mday 1.5 delete req;
|
1211 mday 1.1 return ;
1212 }
1213
1214 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1215 {
|
1216 kumpf 1.104 if (result == 0)
1217 {
|
1218 mday 1.1 throw NullPointer();
|
1219 kumpf 1.104 }
1220
1221 EnumerateService *req = new EnumerateService(
1222 get_next_xid(),
1223 0,
1224 _queueId,
1225 true,
1226 queue);
1227
|
1228 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1229 kumpf 1.104
|
1230 mday 1.2 if (reply)
|
1231 mday 1.1 {
1232 Boolean found = false;
|
1233 kumpf 1.104
1234 if (reply->getMask() & message_mask::ha_async)
|
1235 mday 1.1 {
|
1236 kumpf 1.104 if (reply->getMask() & message_mask::ha_reply)
1237 {
1238 if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1239 {
1240 if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1241 {
1242 if (found == false)
1243 {
1244 found = true;
1245
1246 result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1247 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1248 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1249 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1250 }
1251 }
1252 }
1253 }
|
1254 mday 1.1 }
1255 delete reply;
1256 }
|
1257 mday 1.5 delete req;
|
1258 kumpf 1.104
|
1259 mday 1.1 return;
1260 }
1261
|
1262 kumpf 1.104 Uint32 MessageQueueService::get_next_xid()
|
1263 mday 1.1 {
|
1264 mday 1.78 static Mutex _monitor;
1265 Uint32 value;
|
1266 kumpf 1.104 AutoMutex autoMut(_monitor);
|
1267 mday 1.1 _xid++;
|
1268 mday 1.78 value = _xid.value();
1269 return value;
|
1270 kumpf 1.104
|
1271 mday 1.1 }
1272
1273 PEGASUS_NAMESPACE_END
|