1 karl 1.91 //%2005////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mday 1.1 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 karl 1.82 //
|
19 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By:
|
33 a.arora 1.93 // Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
|
34 joyce.j 1.101 // Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
|
35 jim.wunderlich 1.114 // Jim Wunderlich (Jim_Wunderlich@prodigy.net)
|
36 mday 1.1 //
37 //%/////////////////////////////////////////////////////////////////////////////
38
|
39 jim.wunderlich 1.111 // #include <iostream.h>
|
40 mday 1.1 #include "MessageQueueService.h"
|
41 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
42 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
43 mday 1.1
44 PEGASUS_NAMESPACE_BEGIN
45
|
46 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
47 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
48 mday 1.15 AtomicInt MessageQueueService::_xid(1);
|
49 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
50 mday 1.15
|
51 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
52 mday 1.53
|
53 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
54 mday 1.53
55 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
56
|
57 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
58 mday 1.61
|
59 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
60 mday 1.69 {
61 return _thread_pool;
62 }
|
63 kumpf 1.117
|
64 jim.wunderlich 1.110 //
|
65 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
66 jim.wunderlich 1.110 //
67 // JR Wunderlich Jun 6, 2005
68 //
69
70 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
71 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
72 jim.wunderlich 1.110
|
73 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
74 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
75 #endif
76
|
77 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
78 mday 1.69
|
79 kumpf 1.104 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
80 MessageQueueService::kill_idle_threads(void *parm)
|
81 mday 1.53 {
|
82 mday 1.69
83 static struct timeval now, last = {0,0};
|
84 mday 1.53 gettimeofday(&now, NULL);
|
85 mday 1.56 int dead_threads = 0;
|
86 kumpf 1.104
87 if (now.tv_sec - last.tv_sec > 120)
|
88 mday 1.53 {
89 gettimeofday(&last, NULL);
|
90 kumpf 1.104 try
|
91 mday 1.56 {
|
92 kumpf 1.104 dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
|
93 mday 1.56 }
|
94 mday 1.69 catch(...)
|
95 mday 1.56 {
96
97 }
|
98 mday 1.53 }
|
99 jenny.yu 1.92
|
100 w.otsuka 1.94 #ifdef PEGASUS_POINTER_64BIT
|
101 jenny.yu 1.92 return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
102 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
103 return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
104 #else
105 return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
106 #endif
|
107 mday 1.53 }
108
109 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
110 {
111 Thread *myself = reinterpret_cast<Thread *>(parm);
112 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
|
113 mike 1.118 while (_stop_polling.get() == 0)
|
114 mday 1.53 {
|
115 baggio.br 1.100 _polling_sem.wait();
116
|
117 mike 1.118 if (_stop_polling.get() != 0)
|
118 mday 1.61 {
|
119 kumpf 1.62 break;
|
120 mday 1.61 }
|
121 kumpf 1.104
|
122 denise.eckstein 1.116 // The polling_routine thread must hold the lock on the
123 // _polling_thread list while processing incoming messages.
124 // This lock is used to give this thread ownership of
125 // services on the _polling_routine list.
126
127 // This is necessary to avoid confict with other threads
128 // processing the _polling_list
129 // (e.g., MessageQueueServer::~MessageQueueService).
130
|
131 mday 1.53 list->lock();
132 MessageQueueService *service = list->next(0);
|
133 denise.eckstein 1.116 ThreadStatus rtn = PEGASUS_THREAD_OK;
134 while (service != NULL)
135 {
136 if ((service->_incoming.count() > 0) &&
|
137 mike 1.118 (service->_die.get() == 0) &&
138 (service->_threads.get() < max_threads_per_svc_queue))
|
139 denise.eckstein 1.116 {
140 // The _threads count is used to track the
141 // number of active threads that have been allocated
142 // to process messages for this service.
143
144 // The _threads count MUST be incremented while
145 // the polling_routine owns the _polling_thread
146 // lock and has ownership of the service object.
147
148 service->_threads++;
149 try
150 {
151 rtn = _thread_pool->allocate_and_awaken(
152 service, _req_proc, &_polling_sem);
153 }
154 catch (...)
155 {
156 service->_threads--;
157
158 // allocate_and_awaken should never generate an exception.
159 PEGASUS_ASSERT(0);
160 denise.eckstein 1.116 }
161 // if no more threads available, break from processing loop
162 if (rtn != PEGASUS_THREAD_OK )
163 {
164 service->_threads--;
165 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
166 "Not enough threads to process this request. Skipping.");
167
168 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
169 "Could not allocate thread for %s. " \
170 "Queue has %d messages waiting and %d threads servicing." \
171 "Skipping the service for right now. ",
172 service->getQueueName(),
173 service->_incoming.count(),
|
174 mike 1.118 service->_threads.get());
|
175 denise.eckstein 1.116
176 pegasus_yield();
177 service = NULL;
178 }
179 }
180 if (service != NULL)
181 {
182 service = list->next(service);
183 }
184 }
|
185 mday 1.53 list->unlock();
|
186 jim.wunderlich 1.110
|
187 mike 1.118 if (_check_idle_flag.get() != 0)
|
188 mday 1.69 {
|
189 kumpf 1.104 _check_idle_flag = 0;
|
190 denise.eckstein 1.116 // try to do idle thread clean up processing when system is not busy
191 // if system is busy there may not be a thread available to allocate
192 // so nothing will be done and that is OK.
|
193 kumpf 1.84
|
194 konrad.r 1.113 if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)
|
195 denise.eckstein 1.116 {
196 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
197 "Not enough threads to kill idle threads. What an irony.");
|
198 konrad.r 1.113
|
199 denise.eckstein 1.116 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
200 "Could not allocate thread to kill idle threads." \
201 "Skipping. ");
202 }
203
204
|
205 mday 1.69 }
|
206 mday 1.53 }
207 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
208 return(0);
209 }
210
211
212 Semaphore MessageQueueService::_polling_sem(0);
213 AtomicInt MessageQueueService::_stop_polling(0);
|
214 mday 1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
|
215 mday 1.53
|
216 mday 1.15
|
217 kumpf 1.104 MessageQueueService::MessageQueueService(
218 const char *name,
219 Uint32 queueID,
220 Uint32 capabilities,
221 Uint32 mask)
|
222 mday 1.15 : Base(name, true, queueID),
|
223 mday 1.1 _mask(mask),
|
224 mday 1.4 _die(0),
|
225 denise.eckstein 1.116 _threads(0),
|
226 mday 1.41 _incoming(true, 0),
|
227 konrad.r 1.106 _incoming_queue_shutdown(0)
|
228 kumpf 1.104 {
|
229 jim.wunderlich 1.110
|
230 kumpf 1.104 _capabilities = (capabilities | module_capabilities::async);
|
231 mday 1.53
|
232 mday 1.1 _default_op_timeout.tv_sec = 30;
233 _default_op_timeout.tv_usec = 100;
|
234 mday 1.15
|
235 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
236
|
237 kumpf 1.117 // if requested thread max is out of range, then set to
238 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
|
239 jim.wunderlich 1.110
|
240 kumpf 1.117 if ((max_threads_per_svc_queue < 1) ||
241 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
242 {
|
243 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
|
244 kumpf 1.117 }
|
245 jim.wunderlich 1.110
|
246 kumpf 1.117 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
247 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);
|
248 jim.wunderlich 1.110
|
249 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
250 kumpf 1.104
251 if (_meta_dispatcher == 0)
|
252 mday 1.15 {
|
253 joyce.j 1.101 _stop_polling = 0;
|
254 mike 1.118 PEGASUS_ASSERT(_service_count.get() == 0);
|
255 mday 1.15 _meta_dispatcher = new cimom();
|
256 kumpf 1.104 if (_meta_dispatcher == NULL)
|
257 mday 1.15 {
|
258 a.arora 1.87 throw NullPointer();
|
259 mday 1.15 }
|
260 jim.wunderlich 1.110 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
261 // minimum_cnt, maximum_cnt, deallocateWait);
262 //
|
263 kumpf 1.99 _thread_pool =
|
264 kumpf 1.104 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
|
265 mday 1.15 }
266 _service_count++;
267
|
268 kumpf 1.104 if (false == register_service(name, _capabilities, _mask))
|
269 mday 1.15 {
|
270 humberto 1.71 //l10n
271 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
272 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
273 kumpf 1.104 "MessageQueueService Base Unable to register with Meta Dispatcher");
274
|
275 humberto 1.71 throw BindFailedException(parms);
|
276 mday 1.15 }
|
277 kumpf 1.104
|
278 mday 1.53 _polling_list.insert_last(this);
|
279 kumpf 1.104
|
280 mday 1.1 }
281
|
282 mday 1.4
|
283 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
284 mday 1.1 {
285 _die = 1;
|
286 mday 1.76
|
287 denise.eckstein 1.116 // The polling_routine locks the _polling_list while
288 // processing the incoming messages for services on the
289 // list. Deleting the service from the _polling_list
290 // prior to processing, avoids synchronization issues
291 // with the _polling_routine.
292
293 _polling_list.remove(this);
294
295 // ATTN: The code for closing the _incoming queue
296 // is not working correctly. In OpenPegasus 2.5,
297 // execution of the following code is very timing
298 // dependent. This needs to be fix.
299 // See Bug 4079 for details.
|
300 mike 1.118 if (_incoming_queue_shutdown.get() == 0)
|
301 mday 1.76 {
|
302 denise.eckstein 1.116 _shutdown_incoming_queue();
303 }
|
304 konrad.r 1.107
|
305 denise.eckstein 1.116 // Wait until all threads processing the messages
306 // for this service have completed.
307
|
308 mike 1.118 while (_threads.get() > 0)
|
309 denise.eckstein 1.116 {
310 pegasus_yield();
|
311 mday 1.76 }
|
312 kumpf 1.104
|
313 mday 1.15 {
|
314 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
315 _service_count--;
|
316 mike 1.118 if (_service_count.get() == 0)
|
317 a.arora 1.87 {
|
318 mday 1.76
|
319 mday 1.53 _stop_polling++;
320 _polling_sem.signal();
|
321 kumpf 1.104 if (_polling_thread) {
322 _polling_thread->join();
323 delete _polling_thread;
324 _polling_thread = 0;
325 }
|
326 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
327 delete _meta_dispatcher;
|
328 kumpf 1.45 _meta_dispatcher = 0;
|
329 mday 1.76
|
330 kumpf 1.65 delete _thread_pool;
331 _thread_pool = 0;
|
332 a.arora 1.87 }
333 } // mutex unlocks here
|
334 kumpf 1.104 // Clean up in case there are extra stuff on the queue.
|
335 konrad.r 1.72 while (_incoming.count())
336 {
|
337 konrad.r 1.109 try {
338 delete _incoming.remove_first();
339 } catch (const ListClosed &e)
340 {
341 // If the list is closed, there is nothing we can do.
342 break;
343 }
|
344 konrad.r 1.72 }
|
345 kumpf 1.104 }
|
346 mday 1.1
|
347 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
348 mday 1.7 {
|
349 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
350 kumpf 1.104 return;
351
352 AsyncIoctl *msg = new AsyncIoctl(
353 get_next_xid(),
354 0,
355 _queueId,
356 _queueId,
357 true,
358 AsyncIoctl::IO_CLOSE,
359 0,
360 0);
|
361 mday 1.9
|
362 mday 1.8 msg->op = get_op();
|
363 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
364 kumpf 1.104 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
365 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
366 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
367 mday 1.41
|
368 mday 1.32 msg->op->_op_dest = this;
|
369 mday 1.8 msg->op->_request.insert_first(msg);
|
370 konrad.r 1.108 try {
371 _incoming.insert_last_wait(msg->op);
372 _polling_sem.signal();
373 } catch (const ListClosed &)
374 {
|
375 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
376 konrad.r 1.108 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
377 // processed.
378 delete msg;
379 }
|
380 konrad.r 1.112 catch (const Permission &)
381 {
382 delete msg;
383 }
|
384 mday 1.7 }
385
|
386 mday 1.22
387
|
388 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
389 mday 1.22 {
|
390 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
391
|
392 mday 1.22 Base::enqueue(msg);
|
393 kumpf 1.28
394 PEG_METHOD_EXIT();
|
395 mday 1.22 }
396
397
|
398 kumpf 1.103 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
399 void * parm)
400 {
|
401 konrad.r 1.107 MessageQueueService* service =
402 reinterpret_cast<MessageQueueService*>(parm);
403 PEGASUS_ASSERT(service != 0);
|
404 kumpf 1.103 try
405 {
406
|
407 mike 1.118 if (service->_die.get() != 0)
|
408 kumpf 1.103 {
|
409 denise.eckstein 1.116 service->_threads--;
|
410 kumpf 1.103 return (0);
411 }
412 // pull messages off the incoming queue and dispatch them. then
413 // check pending messages that are non-blocking
414 AsyncOpNode *operation = 0;
415
416 // many operations may have been queued.
417 do
418 {
419 try
420 {
421 operation = service->_incoming.remove_first();
422 }
423 catch (ListClosed &)
424 {
|
425 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
426 //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
427 // "Caught ListClosed exception. Exiting _req_proc.");
|
428 kumpf 1.103 break;
429 }
430
431 if (operation)
432 {
433 operation->_service_ptr = service;
434 service->_handle_incoming_operation(operation);
435 }
436 } while (operation);
437 }
438 catch (const Exception& e)
439 {
440 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
441 String("Caught exception: \"") + e.getMessage() +
442 "\". Exiting _req_proc.");
443 }
444 catch (...)
445 {
446 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
447 "Caught unrecognized exception. Exiting _req_proc.");
448 }
|
449 konrad.r 1.107 service->_threads--;
|
450 kumpf 1.103 return(0);
|
451 mday 1.1 }
452
|
453 mday 1.43
|
454 kumpf 1.104 void MessageQueueService::_sendwait_callback(
455 AsyncOpNode *op,
456 MessageQueue *q,
457 void *parm)
|
458 mday 1.33 {
459 op->_client_sem.signal();
460 }
461
|
462 mday 1.30
463 // callback function is responsible for cleaning up all resources
464 // including op, op->_callback_node, and op->_callback_ptr
|
465 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
466 {
|
467 kumpf 1.104 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
|
468 mday 1.39 {
469
470 Message *msg = op->get_request();
|
471 kumpf 1.104 if (msg && (msg->getMask() & message_mask::ha_async))
|
472 mday 1.39 {
|
473 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
474 {
475 AsyncLegacyOperationStart *wrapper =
476 static_cast<AsyncLegacyOperationStart *>(msg);
477 msg = wrapper->get_action();
478 delete wrapper;
479 }
480 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
481 {
482 AsyncModuleOperationStart *wrapper =
483 static_cast<AsyncModuleOperationStart *>(msg);
484 msg = wrapper->get_action();
485 delete wrapper;
486 }
487 else if (msg->getType() == async_messages::ASYNC_OP_START)
488 {
489 AsyncOperationStart *wrapper =
490 static_cast<AsyncOperationStart *>(msg);
491 msg = wrapper->get_action();
492 delete wrapper;
493 }
494 kumpf 1.104 delete msg;
|
495 mday 1.39 }
496
497 msg = op->get_response();
|
498 kumpf 1.104 if (msg && (msg->getMask() & message_mask::ha_async))
|
499 mday 1.39 {
|
500 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
501 {
502 AsyncLegacyOperationResult *wrapper =
503 static_cast<AsyncLegacyOperationResult *>(msg);
504 msg = wrapper->get_result();
505 delete wrapper;
506 }
507 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
508 {
509 AsyncModuleOperationResult *wrapper =
510 static_cast<AsyncModuleOperationResult *>(msg);
511 msg = wrapper->get_result();
512 delete wrapper;
513 }
|
514 mday 1.39 }
515 void (*callback)(Message *, void *, void *) = op->__async_callback;
516 void *handle = op->_callback_handle;
517 void *parm = op->_callback_parameter;
518 op->release();
519 return_op(op);
520 callback(msg, handle, parm);
521 }
|
522 kumpf 1.104 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
|
523 mday 1.39 {
|
524 kumpf 1.104 // note that _callback_node may be different from op
525 // op->_callback_response_q is a "this" pointer we can use for
|
526 mday 1.39 // static callback methods
527 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
528 }
|
529 mday 1.22 }
530
|
531 mday 1.1
|
532 kumpf 1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
|
533 mday 1.1 {
|
534 kumpf 1.104 if (operation != 0)
|
535 mday 1.5 {
|
536 kumpf 1.104
537 // ATTN: optimization
|
538 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
539 mday 1.6 operation->lock();
|
540 kumpf 1.104
|
541 mday 1.6 Message *rq = operation->_request.next(0);
|
542 kumpf 1.104
|
543 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
544 kumpf 1.104 // move this to the bottom of the loop when the majority of
545 // messages become async messages.
|
546 mday 1.31
|
547 mday 1.18 // divert legacy messages to handleEnqueue
|
548 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
549 mday 1.18 {
|
550 kumpf 1.104 rq = operation->_request.remove_first() ;
551 operation->unlock();
552 // delete the op node
553 operation->release();
554 return_op(operation);
|
555 mday 1.24
|
556 kumpf 1.104 handleEnqueue(rq);
557 return;
|
558 mday 1.18 }
559
|
560 kumpf 1.104 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
561 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
562 (operation->_state & ASYNC_OPSTATE_COMPLETE))
|
563 mday 1.29 {
|
564 kumpf 1.104 operation->unlock();
565 _handle_async_callback(operation);
|
566 mday 1.29 }
|
567 kumpf 1.104 else
|
568 mday 1.29 {
|
569 kumpf 1.104 PEGASUS_ASSERT(rq != 0);
570 operation->unlock();
571 _handle_async_request(static_cast<AsyncRequest *>(rq));
|
572 mday 1.29 }
|
573 mday 1.5 }
574 return;
|
575 mday 1.1 }
576
577 void MessageQueueService::_handle_async_request(AsyncRequest *req)
578 {
|
579 kumpf 1.104 if (req != 0)
|
580 mday 1.4 {
581 req->op->processing();
|
582 kumpf 1.104
|
583 mday 1.4 Uint32 type = req->getType();
|
584 kumpf 1.104 if (type == async_messages::HEARTBEAT)
585 handle_heartbeat_request(req);
|
586 mday 1.4 else if (type == async_messages::IOCTL)
|
587 kumpf 1.104 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
|
588 mday 1.4 else if (type == async_messages::CIMSERVICE_START)
|
589 kumpf 1.104 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
|
590 mday 1.4 else if (type == async_messages::CIMSERVICE_STOP)
|
591 kumpf 1.104 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
|
592 mday 1.4 else if (type == async_messages::CIMSERVICE_PAUSE)
|
593 kumpf 1.104 handle_CimServicePause(static_cast<CimServicePause *>(req));
|
594 mday 1.4 else if (type == async_messages::CIMSERVICE_RESUME)
|
595 kumpf 1.104 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
596 else if (type == async_messages::ASYNC_OP_START)
597 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
598 else
|
599 mday 1.4 {
|
600 kumpf 1.104 // we don't handle this request message
601 _make_response(req, async_results::CIM_NAK);
|
602 mday 1.4 }
|
603 mday 1.1 }
604 }
605
|
606 mday 1.17
607 Boolean MessageQueueService::_enqueueResponse(
|
608 kumpf 1.104 Message* request,
|
609 mday 1.17 Message* response)
610 {
|
611 w.white 1.102
612 STAT_COPYDISPATCHER
613
|
614 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
615 "MessageQueueService::_enqueueResponse");
|
616 mday 1.25
|
617 kumpf 1.104 if (request->getMask() & message_mask::ha_async)
|
618 mday 1.25 {
|
619 kumpf 1.104 if (response->getMask() & message_mask::ha_async)
|
620 mday 1.25 {
|
621 kumpf 1.104 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
622 static_cast<AsyncReply *>(response),
623 ASYNC_OPSTATE_COMPLETE, 0);
|
624 kumpf 1.37 PEG_METHOD_EXIT();
|
625 kumpf 1.104 return true;
|
626 mday 1.25 }
627 }
|
628 kumpf 1.104
629 if (request->_async != 0)
|
630 mday 1.17 {
631 Uint32 mask = request->_async->getMask();
|
632 kumpf 1.104 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));
633
|
634 mday 1.18 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
635 AsyncOpNode *op = async->op;
636 request->_async = 0;
|
637 mday 1.63 // the legacy request is going to be deleted by its handler
|
638 kumpf 1.104 // remove it from the op node
|
639 mday 1.63
640 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
641 kumpf 1.104
642 AsyncLegacyOperationResult *async_result =
643 new AsyncLegacyOperationResult(
644 async->getKey(),
645 async->getRouting(),
646 op,
647 response);
648 _completeAsyncResponse(
649 async,
650 async_result,
651 ASYNC_OPSTATE_COMPLETE,
652 0);
|
653 kumpf 1.37 PEG_METHOD_EXIT();
|
654 mday 1.18 return true;
|
655 mday 1.17 }
|
656 kumpf 1.104
|
657 mday 1.18 // ensure that the destination queue is in response->dest
|
658 kumpf 1.37 PEG_METHOD_EXIT();
|
659 mday 1.24 return SendForget(response);
|
660 kumpf 1.104
|
661 mday 1.17 }
662
|
663 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
664 mday 1.1 {
|
665 mday 1.19 cimom::_make_response(req, code);
|
666 mday 1.1 }
667
668
|
669 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
670 AsyncRequest *request,
671 AsyncReply *reply,
672 Uint32 state,
673 Uint32 flag)
|
674 mday 1.5 {
|
675 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
676 "MessageQueueService::_completeAsyncResponse");
677
|
678 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
679 kumpf 1.37
680 PEG_METHOD_EXIT();
|
681 mday 1.5 }
682
683
|
684 kumpf 1.104 void MessageQueueService::_complete_op_node(
685 AsyncOpNode *op,
686 Uint32 state,
687 Uint32 flag,
688 Uint32 code)
|
689 mday 1.32 {
690 cimom::_complete_op_node(op, state, flag, code);
691 }
692
|
693 mday 1.5
694 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
695 {
|
696 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
697 mday 1.8 return false;
|
698 kumpf 1.104 if (_polling_thread == NULL)
699 {
700 _polling_thread = new Thread(
701 polling_routine,
702 reinterpret_cast<void *>(&_polling_list),
703 false);
|
704 konrad.r 1.113 ThreadStatus tr = PEGASUS_THREAD_OK;
705 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
|
706 a.arora 1.93 {
|
707 denise.eckstein 1.116 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
|
708 konrad.r 1.113 pegasus_yield();
|
709 denise.eckstein 1.116 else
710 throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
711 "Could not allocate thread for the polling thread."));
|
712 a.arora 1.93 }
|
713 kumpf 1.104 }
714 // ATTN optimization remove the message checking altogether in the base
|
715 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
716 mday 1.6 op->lock();
717 Message *rq = op->_request.next(0);
|
718 mday 1.20 Message *rp = op->_response.next(0);
|
719 mday 1.6 op->unlock();
|
720 kumpf 1.104
721 if ((rq != 0 && (true == messageOK(rq))) ||
|
722 mike 1.118 (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
|
723 mday 1.5 {
|
724 mday 1.6 _incoming.insert_last_wait(op);
|
725 mday 1.53 _polling_sem.signal();
|
726 mday 1.5 return true;
727 }
728 return false;
729 }
730
731 Boolean MessageQueueService::messageOK(const Message *msg)
732 {
|
733 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
734 mday 1.8 return false;
|
735 mday 1.18 return true;
736 }
737
|
738 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
739 {
|
740 kumpf 1.104 // default action is to echo a heartbeat response
741
742 AsyncReply *reply = new AsyncReply(
743 async_messages::HEARTBEAT,
744 req->getKey(),
745 req->getRouting(),
746 0,
747 req->op,
748 async_results::OK,
749 req->resp,
750 false);
751 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
752 mday 1.1 }
753
754
755 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
|
756 kumpf 1.104 {
|
757 mday 1.1 }
|
758 kumpf 1.104
|
759 mday 1.1 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
760 {
|
761 kumpf 1.104 switch (req->ctl)
|
762 mday 1.8 {
763 case AsyncIoctl::IO_CLOSE:
764 {
|
765 kumpf 1.104 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
766 kumpf 1.81
767 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
768 kumpf 1.104 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
769 kumpf 1.81 #endif
|
770 kumpf 1.104
771 // respond to this message. this is fire and forget, so we don't need to delete anything.
772 // this takes care of two problems that were being found
773 // << Thu Oct 9 10:52:48 2003 mdd >>
774 _make_response(req, async_results::OK);
775 // ensure we do not accept any further messages
776
777 // ensure we don't recurse on IO_CLOSE
|
778 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
779 kumpf 1.104 break;
780
781 // set the closing flag
782 service->_incoming_queue_shutdown = 1;
783 // empty out the queue
784 while (1)
785 {
786 AsyncOpNode *operation;
787 try
788 {
789 operation = service->_incoming.remove_first();
790 }
791 catch(IPCException &)
792 {
793 break;
794 }
795 if (operation)
796 {
797 operation->_service_ptr = service;
798 service->_handle_incoming_operation(operation);
799 }
800 kumpf 1.104 else
801 break;
802 } // message processing loop
803
804 // shutdown the AsyncDQueue
805 service->_incoming.shutdown_queue();
806 return;
|
807 mday 1.8 }
808
809 default:
|
810 kumpf 1.104 _make_response(req, async_results::CIM_NAK);
|
811 mday 1.8 }
|
812 mday 1.1 }
|
813 mday 1.8
|
814 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
815 {
|
816 mday 1.79
|
817 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
818 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
819 kumpf 1.81 #endif
|
820 kumpf 1.104
|
821 mday 1.10 // clear the stoped bit and update
|
822 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
823 mday 1.10 _make_response(req, async_results::OK);
|
824 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
825 mday 1.10 update_service(_capabilities, _mask);
826
|
827 mday 1.1 }
828 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
829 {
|
830 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
831 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
832 kumpf 1.81 #endif
|
833 mday 1.10 // set the stopeed bit and update
834 _capabilities |= module_capabilities::stopped;
835 _make_response(req, async_results::CIM_STOPPED);
|
836 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
837 mday 1.10 update_service(_capabilities, _mask);
|
838 mday 1.1 }
|
839 kumpf 1.104
|
840 mday 1.1 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
841 {
|
842 mday 1.10 // set the paused bit and update
|
843 mday 1.13 _capabilities |= module_capabilities::paused;
|
844 mday 1.11 update_service(_capabilities, _mask);
|
845 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
|
846 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
847 mday 1.1 }
|
848 kumpf 1.104
|
849 mday 1.1 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
850 {
|
851 mday 1.10 // clear the paused bit and update
|
852 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
853 mday 1.11 update_service(_capabilities, _mask);
|
854 mday 1.10 _make_response(req, async_results::OK);
|
855 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
856 mday 1.1 }
|
857 kumpf 1.104
|
858 mday 1.1 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
859 {
860 _make_response(req, async_results::CIM_NAK);
861 }
862
863 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
864 {
|
865 mday 1.14 ;
866 }
867
|
868 mday 1.10
|
869 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
870 {
871 // remove the legacy message from the request and enqueue it to its destination
872 Uint32 result = async_results::CIM_NAK;
|
873 kumpf 1.104
|
874 mday 1.25 Message *legacy = req->_act;
|
875 kumpf 1.104 if (legacy != 0)
|
876 mday 1.14 {
|
877 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
878 kumpf 1.104 if (queue != 0)
|
879 mday 1.14 {
|
880 kumpf 1.104 if (queue->isAsync() == true)
881 {
882 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
883 }
884 else
885 {
886 // Enqueue the response:
887 queue->enqueue(req->get_action());
888 }
889
890 result = async_results::OK;
|
891 mday 1.14 }
892 }
893 _make_response(req, result);
894 }
895
896 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
897 {
898 ;
|
899 mday 1.1 }
900
|
901 kumpf 1.104 AsyncOpNode *MessageQueueService::get_op()
|
902 mday 1.1 {
|
903 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
904 kumpf 1.104
|
905 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
906 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
907 kumpf 1.104
|
908 mday 1.1 return op;
909 }
910
911 void MessageQueueService::return_op(AsyncOpNode *op)
912 {
|
913 kumpf 1.104 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);
|
914 mday 1.4 delete op;
|
915 mday 1.1 }
916
|
917 mday 1.18
|
918 kumpf 1.104 Boolean MessageQueueService::ForwardOp(
919 AsyncOpNode *op,
920 Uint32 destination)
|
921 mday 1.29 {
|
922 kumpf 1.104 PEGASUS_ASSERT(op != 0);
|
923 mday 1.30 op->lock();
924 op->_op_dest = MessageQueue::lookup(destination);
|
925 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
926 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
927 mday 1.30 op->unlock();
|
928 kumpf 1.104 if (op->_op_dest == 0)
|
929 mday 1.30 return false;
|
930 kumpf 1.104
|
931 mday 1.29 return _meta_dispatcher->route_async(op);
932 }
933
|
934 kumpf 1.104
935 Boolean MessageQueueService::SendAsync(
936 AsyncOpNode *op,
937 Uint32 destination,
938 void (*callback)(AsyncOpNode *, MessageQueue *, void *),
939 MessageQueue *callback_response_q,
940 void *callback_ptr)
941 {
942 PEGASUS_ASSERT(op != 0 && callback != 0);
943
|
944 mday 1.21 // get the queue handle for the destination
945
|
946 mday 1.30 op->lock();
|
947 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
948 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
949 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
|
950 mday 1.30 // initialize the callback data
|
951 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
952 op->_callback_node = op; // the op node
953 op->_callback_response_q = callback_response_q; // the queue that will receive the response
954 op->_callback_ptr = callback_ptr; // user data for callback
955 op->_callback_request_q = this; // I am the originator of this request
|
956 kumpf 1.104
|
957 mday 1.30 op->unlock();
|
958 kumpf 1.104 if (op->_op_dest == 0)
|
959 mday 1.30 return false;
|
960 kumpf 1.104
|
961 mday 1.21 return _meta_dispatcher->route_async(op);
|
962 mday 1.18 }
963
964
|
965 kumpf 1.104 Boolean MessageQueueService::SendAsync(
966 Message *msg,
967 Uint32 destination,
968 void (*callback)(Message *response, void *handle, void *parameter),
969 void *handle,
970 void *parameter)
|
971 mday 1.39 {
|
972 kumpf 1.104 if (msg == NULL)
|
973 mday 1.39 return false;
|
974 kumpf 1.104 if (callback == NULL)
|
975 mday 1.39 return SendForget(msg);
976 AsyncOpNode *op = get_op();
|
977 mday 1.43 msg->dest = destination;
|
978 kumpf 1.104 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
|
979 mday 1.39 {
980 op->release();
981 return_op(op);
982 return false;
983 }
984 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
985 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
986 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
987 op->__async_callback = callback;
988 op->_callback_node = op;
989 op->_callback_handle = handle;
990 op->_callback_parameter = parameter;
991 op->_callback_response_q = this;
992
|
993 kumpf 1.104 if (!(msg->getMask() & message_mask::ha_async))
|
994 mday 1.39 {
|
995 kumpf 1.104 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
996 get_next_xid(),
997 op,
998 destination,
999 msg,
1000 destination);
|
1001 mday 1.39 }
|
1002 kumpf 1.104 else
|
1003 mday 1.39 {
1004 op->_request.insert_first(msg);
1005 (static_cast<AsyncMessage *>(msg))->op = op;
1006 }
1007 return _meta_dispatcher->route_async(op);
1008 }
1009
1010
|
1011 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
1012 {
1013 AsyncOpNode *op = 0;
|
1014 mday 1.22 Uint32 mask = msg->getMask();
|
1015 kumpf 1.104
|
1016 mday 1.22 if (mask & message_mask::ha_async)
|
1017 mday 1.18 {
1018 op = (static_cast<AsyncMessage *>(msg))->op ;
1019 }
|
1020 mday 1.22
|
1021 kumpf 1.104 if (op == 0)
|
1022 mday 1.20 {
|
1023 mday 1.18 op = get_op();
|
1024 mday 1.20 op->_request.insert_first(msg);
|
1025 mday 1.22 if (mask & message_mask::ha_async)
|
1026 kumpf 1.104 {
1027 (static_cast<AsyncMessage *>(msg))->op = op;
1028 }
|
1029 mday 1.20 }
|
1030 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
1031 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
1032 kumpf 1.104 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1033 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
1034 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
1035 kumpf 1.104 if (op->_op_dest == 0)
|
1036 mday 1.39 {
1037 op->release();
1038 return_op(op);
|
1039 mday 1.21 return false;
|
1040 mday 1.39 }
|
1041 kumpf 1.104
|
1042 mday 1.18 // now see if the meta dispatcher will take it
|
1043 mday 1.30 return _meta_dispatcher->route_async(op);
|
1044 mday 1.18 }
|
1045 mday 1.2
|
1046 mday 1.1
|
1047 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
1048 mday 1.1 {
|
1049 kumpf 1.104 if (request == 0)
|
1050 mday 1.4 return 0 ;
|
1051 mday 1.5
1052 Boolean destroy_op = false;
|
1053 kumpf 1.104
|
1054 s.hills 1.80 if (request->op == 0)
|
1055 mday 1.5 {
1056 request->op = get_op();
|
1057 mday 1.7 request->op->_request.insert_first(request);
|
1058 mday 1.5 destroy_op = true;
1059 }
|
1060 kumpf 1.104
|
1061 mday 1.33 request->block = false;
|
1062 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
1063 kumpf 1.104 SendAsync(
1064 request->op,
1065 request->dest,
1066 _sendwait_callback,
1067 this,
1068 (void *)0);
1069
|
1070 a.dunfey 1.118.2.1 request->op->_client_sem.wait();
1071
|
1072 baggio.br 1.100
|
1073 mday 1.6 request->op->lock();
1074 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
1075 rpl->op = 0;
1076 request->op->unlock();
|
1077 kumpf 1.104
1078 if (destroy_op == true)
|
1079 mday 1.5 {
|
1080 mday 1.6 request->op->lock();
1081 request->op->_request.remove(request);
1082 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1083 request->op->unlock();
1084 return_op(request->op);
|
1085 mday 1.7 request->op = 0;
|
1086 mday 1.5 }
1087 return rpl;
|
1088 mday 1.1 }
1089
1090
|
1091 kumpf 1.104 Boolean MessageQueueService::register_service(
1092 String name,
1093 Uint32 capabilities,
1094 Uint32 mask)
1095 {
1096 RegisterCimService *msg = new RegisterCimService(
1097 get_next_xid(),
1098 0,
1099 true,
1100 name,
1101 capabilities,
1102 mask,
1103 _queueId);
1104 msg->dest = CIMOM_Q_ID;
|
1105 mday 1.1
1106 Boolean registered = false;
|
1107 kumpf 1.104 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1108
1109 if (reply != 0)
1110 {
1111 if (reply->getMask() & message_mask::ha_async)
1112 {
1113 if (reply->getMask() & message_mask::ha_reply)
1114 {
1115 if (reply->result == async_results::OK ||
1116 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1117 {
1118 registered = true;
1119 }
1120 }
|
1121 mday 1.1 }
|
1122 kumpf 1.104
1123 delete reply;
|
1124 mday 1.1 }
|
1125 mday 1.5 delete msg;
|
1126 mday 1.1 return registered;
1127 }
1128
1129 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1130 {
|
1131 kumpf 1.104 UpdateCimService *msg = new UpdateCimService(
1132 get_next_xid(),
1133 0,
1134 true,
1135 _queueId,
1136 _capabilities,
1137 _mask);
|
1138 mday 1.1 Boolean registered = false;
|
1139 mday 1.2
1140 AsyncMessage *reply = SendWait(msg);
1141 if (reply)
|
1142 mday 1.1 {
|
1143 kumpf 1.104 if (reply->getMask() & message_mask::ha_async)
|
1144 mday 1.1 {
|
1145 kumpf 1.104 if (reply->getMask() & message_mask::ha_reply)
1146 {
1147 if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1148 {
1149 registered = true;
1150 }
1151 }
|
1152 mday 1.1 }
1153 delete reply;
1154 }
|
1155 mday 1.5 delete msg;
|
1156 mday 1.1 return registered;
1157 }
1158
1159
|
1160 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1161 mday 1.1 {
|
1162 mday 1.3
|
1163 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1164 return true;
|
1165 mday 1.1 }
1166
1167
|
1168 kumpf 1.104 void MessageQueueService::find_services(
1169 String name,
1170 Uint32 capabilities,
1171 Uint32 mask,
1172 Array<Uint32> *results)
|
1173 mday 1.1 {
|
1174 kumpf 1.104 if (results == 0)
1175 {
|
1176 mday 1.1 throw NullPointer();
|
1177 kumpf 1.104 }
1178
|
1179 mday 1.1 results->clear();
|
1180 kumpf 1.104
1181 FindServiceQueue *req = new FindServiceQueue(
1182 get_next_xid(),
1183 0,
1184 _queueId,
1185 true,
1186 name,
1187 capabilities,
1188 mask);
1189
|
1190 mday 1.44 req->dest = CIMOM_Q_ID;
|
1191 kumpf 1.104
1192 AsyncMessage *reply = SendWait(req);
1193 if (reply)
1194 {
1195 if (reply->getMask() & message_mask::ha_async)
1196 {
1197 if (reply->getMask() & message_mask::ha_reply)
1198 {
1199 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1200 {
1201 if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1202 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1203 }
1204 }
|
1205 mday 1.1 }
1206 delete reply;
1207 }
|
1208 mday 1.5 delete req;
|
1209 mday 1.1 return ;
1210 }
1211
1212 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1213 {
|
1214 kumpf 1.104 if (result == 0)
1215 {
|
1216 mday 1.1 throw NullPointer();
|
1217 kumpf 1.104 }
1218
1219 EnumerateService *req = new EnumerateService(
1220 get_next_xid(),
1221 0,
1222 _queueId,
1223 true,
1224 queue);
1225
|
1226 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1227 kumpf 1.104
|
1228 mday 1.2 if (reply)
|
1229 mday 1.1 {
1230 Boolean found = false;
|
1231 kumpf 1.104
1232 if (reply->getMask() & message_mask::ha_async)
|
1233 mday 1.1 {
|
1234 kumpf 1.104 if (reply->getMask() & message_mask::ha_reply)
1235 {
1236 if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1237 {
1238 if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1239 {
1240 if (found == false)
1241 {
1242 found = true;
1243
1244 result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1245 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1246 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1247 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1248 }
1249 }
1250 }
1251 }
|
1252 mday 1.1 }
1253 delete reply;
1254 }
|
1255 mday 1.5 delete req;
|
1256 kumpf 1.104
|
1257 mday 1.1 return;
1258 }
1259
|
1260 kumpf 1.104 Uint32 MessageQueueService::get_next_xid()
|
1261 mday 1.1 {
|
1262 mday 1.78 static Mutex _monitor;
1263 Uint32 value;
|
1264 kumpf 1.104 AutoMutex autoMut(_monitor);
|
1265 mday 1.1 _xid++;
|
1266 mike 1.118 value = _xid.get();
|
1267 mday 1.78 return value;
|
1268 kumpf 1.104
|
1269 mday 1.1 }
1270
1271 PEGASUS_NAMESPACE_END
|