1 karl 1.119 //%2006////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mday 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.82 //
|
21 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Day (mdday@us.ibm.com)
33 //
34 // Modified By:
|
35 a.arora 1.93 // Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
|
36 joyce.j 1.101 // Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
|
37 jim.wunderlich 1.114 // Jim Wunderlich (Jim_Wunderlich@prodigy.net)
|
38 mday 1.1 //
39 //%/////////////////////////////////////////////////////////////////////////////
40
|
41 jim.wunderlich 1.111 // #include <iostream.h>
|
42 mday 1.1 #include "MessageQueueService.h"
|
43 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
44 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
45 mday 1.1
46 PEGASUS_NAMESPACE_BEGIN
47
|
48 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
49 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
50 kumpf 1.119.12.2 Mutex MessageQueueService::_xidMutex;
51 Uint32 MessageQueueService::_xid = 1;
|
52 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
53 mday 1.15
|
54 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
55 mday 1.53
|
56 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
57 mday 1.53
|
58 mike 1.119.12.3 List<MessageQueueService, RecursiveMutex> MessageQueueService::_polling_list;
|
59 mday 1.53
|
60 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
61 mday 1.61
|
62 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
63 mday 1.69 {
64 return _thread_pool;
65 }
|
66 kumpf 1.117
|
67 jim.wunderlich 1.110 //
|
68 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
69 jim.wunderlich 1.110 //
70 // JR Wunderlich Jun 6, 2005
71 //
72
73 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
74 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
75 jim.wunderlich 1.110
|
76 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
77 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
78 #endif
79
|
80 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
81 mday 1.69
|
82 kumpf 1.104 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
83 MessageQueueService::kill_idle_threads(void *parm)
|
84 mday 1.53 {
|
85 mday 1.69
86 static struct timeval now, last = {0,0};
|
87 mday 1.53 gettimeofday(&now, NULL);
|
88 mday 1.56 int dead_threads = 0;
|
89 kumpf 1.104
90 if (now.tv_sec - last.tv_sec > 120)
|
91 mday 1.53 {
92 gettimeofday(&last, NULL);
|
93 kumpf 1.104 try
|
94 mday 1.56 {
|
95 kumpf 1.104 dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
|
96 mday 1.56 }
|
97 mday 1.69 catch(...)
|
98 mday 1.56 {
99
100 }
|
101 mday 1.53 }
|
102 jenny.yu 1.92
|
103 w.otsuka 1.94 #ifdef PEGASUS_POINTER_64BIT
|
104 jenny.yu 1.92 return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
105 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
106 return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
107 #else
108 return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
109 #endif
|
110 mday 1.53 }
111
112 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
113 {
114 Thread *myself = reinterpret_cast<Thread *>(parm);
|
115 mike 1.119.12.3 List<MessageQueueService, RecursiveMutex> *list =
116 reinterpret_cast<List<MessageQueueService, RecursiveMutex>*>(myself->get_parm());
|
117 mike 1.119.12.1
|
118 mike 1.118 while (_stop_polling.get() == 0)
|
119 mday 1.53 {
|
120 baggio.br 1.100 _polling_sem.wait();
121
|
122 mike 1.118 if (_stop_polling.get() != 0)
|
123 mday 1.61 {
|
124 kumpf 1.62 break;
|
125 mday 1.61 }
|
126 kumpf 1.104
|
127 denise.eckstein 1.116 // The polling_routine thread must hold the lock on the
128 // _polling_thread list while processing incoming messages.
129 // This lock is used to give this thread ownership of
130 // services on the _polling_routine list.
131
132 // This is necessary to avoid confict with other threads
133 // processing the _polling_list
134 // (e.g., MessageQueueServer::~MessageQueueService).
135
|
136 mday 1.53 list->lock();
|
137 mike 1.119.12.1 MessageQueueService *service = list->front();
|
138 denise.eckstein 1.116 ThreadStatus rtn = PEGASUS_THREAD_OK;
139 while (service != NULL)
140 {
141 if ((service->_incoming.count() > 0) &&
|
142 mike 1.118 (service->_die.get() == 0) &&
143 (service->_threads.get() < max_threads_per_svc_queue))
|
144 denise.eckstein 1.116 {
145 // The _threads count is used to track the
146 // number of active threads that have been allocated
147 // to process messages for this service.
148
149 // The _threads count MUST be incremented while
150 // the polling_routine owns the _polling_thread
151 // lock and has ownership of the service object.
152
153 service->_threads++;
154 try
155 {
156 rtn = _thread_pool->allocate_and_awaken(
157 service, _req_proc, &_polling_sem);
158 }
159 catch (...)
160 {
161 service->_threads--;
162
163 // allocate_and_awaken should never generate an exception.
164 PEGASUS_ASSERT(0);
165 denise.eckstein 1.116 }
166 // if no more threads available, break from processing loop
167 if (rtn != PEGASUS_THREAD_OK )
168 {
169 service->_threads--;
170 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
171 "Not enough threads to process this request. Skipping.");
172
173 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
174 "Could not allocate thread for %s. " \
175 "Queue has %d messages waiting and %d threads servicing." \
176 "Skipping the service for right now. ",
177 service->getQueueName(),
178 service->_incoming.count(),
|
179 mike 1.118 service->_threads.get());
|
180 denise.eckstein 1.116
181 pegasus_yield();
182 service = NULL;
183 }
184 }
185 if (service != NULL)
186 {
|
187 mike 1.119.12.1 service = list->next_of(service);
|
188 denise.eckstein 1.116 }
189 }
|
190 mday 1.53 list->unlock();
|
191 jim.wunderlich 1.110
|
192 mike 1.118 if (_check_idle_flag.get() != 0)
|
193 mday 1.69 {
|
194 kumpf 1.104 _check_idle_flag = 0;
|
195 denise.eckstein 1.116 // try to do idle thread clean up processing when system is not busy
196 // if system is busy there may not be a thread available to allocate
197 // so nothing will be done and that is OK.
|
198 kumpf 1.84
|
199 konrad.r 1.113 if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)
|
200 denise.eckstein 1.116 {
201 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
202 "Not enough threads to kill idle threads. What an irony.");
|
203 konrad.r 1.113
|
204 denise.eckstein 1.116 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
205 "Could not allocate thread to kill idle threads." \
206 "Skipping. ");
207 }
208
209
|
210 mday 1.69 }
|
211 mday 1.53 }
212 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
213 return(0);
214 }
215
216
217 Semaphore MessageQueueService::_polling_sem(0);
218 AtomicInt MessageQueueService::_stop_polling(0);
|
219 mday 1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
|
220 mday 1.53
|
221 mday 1.15
|
222 kumpf 1.104 MessageQueueService::MessageQueueService(
223 const char *name,
224 Uint32 queueID,
225 Uint32 capabilities,
226 Uint32 mask)
|
227 mday 1.15 : Base(name, true, queueID),
|
228 mday 1.1 _mask(mask),
|
229 mday 1.4 _die(0),
|
230 denise.eckstein 1.116 _threads(0),
|
231 mike 1.119.12.1 _incoming(0),
|
232 konrad.r 1.106 _incoming_queue_shutdown(0)
|
233 kumpf 1.104 {
|
234 jim.wunderlich 1.110
|
235 kumpf 1.104 _capabilities = (capabilities | module_capabilities::async);
|
236 mday 1.53
|
237 mday 1.1 _default_op_timeout.tv_sec = 30;
238 _default_op_timeout.tv_usec = 100;
|
239 mday 1.15
|
240 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
241
|
242 kumpf 1.117 // if requested thread max is out of range, then set to
243 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
|
244 jim.wunderlich 1.110
|
245 kumpf 1.117 if ((max_threads_per_svc_queue < 1) ||
246 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
247 {
|
248 jim.wunderlich 1.110 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
|
249 kumpf 1.117 }
|
250 jim.wunderlich 1.110
|
251 kumpf 1.117 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
252 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);
|
253 jim.wunderlich 1.110
|
254 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
255 kumpf 1.104
256 if (_meta_dispatcher == 0)
|
257 mday 1.15 {
|
258 joyce.j 1.101 _stop_polling = 0;
|
259 mike 1.118 PEGASUS_ASSERT(_service_count.get() == 0);
|
260 mday 1.15 _meta_dispatcher = new cimom();
|
261 kumpf 1.104 if (_meta_dispatcher == NULL)
|
262 mday 1.15 {
|
263 a.arora 1.87 throw NullPointer();
|
264 mday 1.15 }
|
265 jim.wunderlich 1.110 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
266 // minimum_cnt, maximum_cnt, deallocateWait);
267 //
|
268 kumpf 1.99 _thread_pool =
|
269 kumpf 1.104 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
|
270 mday 1.15 }
271 _service_count++;
272
|
273 kumpf 1.104 if (false == register_service(name, _capabilities, _mask))
|
274 mday 1.15 {
|
275 humberto 1.71 //l10n
276 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
277 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
278 kumpf 1.104 "MessageQueueService Base Unable to register with Meta Dispatcher");
279
|
280 humberto 1.71 throw BindFailedException(parms);
|
281 mday 1.15 }
|
282 kumpf 1.104
|
283 mike 1.119.12.1 _polling_list.insert_back(this);
|
284 kumpf 1.104
|
285 mday 1.1 }
286
|
287 mday 1.4
|
288 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
289 mday 1.1 {
290 _die = 1;
|
291 mday 1.76
|
292 denise.eckstein 1.116 // The polling_routine locks the _polling_list while
293 // processing the incoming messages for services on the
294 // list. Deleting the service from the _polling_list
295 // prior to processing, avoids synchronization issues
296 // with the _polling_routine.
297
|
298 mike 1.119.12.1 // ATTN: added to prevent assertion in List in which the list does not
299 // contain this element.
|
300 mike 1.119.12.4
301 if (_polling_list.contains(this))
302 _polling_list.remove(this);
|
303 denise.eckstein 1.116
304 // ATTN: The code for closing the _incoming queue
305 // is not working correctly. In OpenPegasus 2.5,
306 // execution of the following code is very timing
307 // dependent. This needs to be fix.
308 // See Bug 4079 for details.
|
309 mike 1.118 if (_incoming_queue_shutdown.get() == 0)
|
310 mday 1.76 {
|
311 denise.eckstein 1.116 _shutdown_incoming_queue();
312 }
|
313 konrad.r 1.107
|
314 denise.eckstein 1.116 // Wait until all threads processing the messages
315 // for this service have completed.
316
|
317 mike 1.118 while (_threads.get() > 0)
|
318 denise.eckstein 1.116 {
319 pegasus_yield();
|
320 mday 1.76 }
|
321 kumpf 1.104
|
322 mday 1.15 {
|
323 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
324 _service_count--;
|
325 mike 1.118 if (_service_count.get() == 0)
|
326 a.arora 1.87 {
|
327 mday 1.76
|
328 mday 1.53 _stop_polling++;
329 _polling_sem.signal();
|
330 kumpf 1.104 if (_polling_thread) {
331 _polling_thread->join();
332 delete _polling_thread;
333 _polling_thread = 0;
334 }
|
335 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
336 delete _meta_dispatcher;
|
337 kumpf 1.45 _meta_dispatcher = 0;
|
338 mday 1.76
|
339 kumpf 1.65 delete _thread_pool;
340 _thread_pool = 0;
|
341 a.arora 1.87 }
342 } // mutex unlocks here
|
343 kumpf 1.104 // Clean up in case there are extra stuff on the queue.
|
344 konrad.r 1.72 while (_incoming.count())
345 {
|
346 konrad.r 1.109 try {
|
347 mike 1.119.12.1 delete _incoming.dequeue();
|
348 konrad.r 1.109 } catch (const ListClosed &e)
349 {
350 // If the list is closed, there is nothing we can do.
351 break;
352 }
|
353 konrad.r 1.72 }
|
354 kumpf 1.104 }
|
355 mday 1.1
|
356 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
357 mday 1.7 {
|
358 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
359 kumpf 1.104 return;
360
361 AsyncIoctl *msg = new AsyncIoctl(
362 get_next_xid(),
363 0,
364 _queueId,
365 _queueId,
366 true,
367 AsyncIoctl::IO_CLOSE,
368 0,
369 0);
|
370 mday 1.9
|
371 mday 1.8 msg->op = get_op();
|
372 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
373 kumpf 1.104 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
374 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
375 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
376 mday 1.41
|
377 mday 1.32 msg->op->_op_dest = this;
|
378 mike 1.119.12.1 msg->op->_request.insert_front(msg);
|
379 konrad.r 1.108 try {
|
380 mike 1.119.12.1 _incoming.enqueue_wait(msg->op);
|
381 konrad.r 1.108 _polling_sem.signal();
382 } catch (const ListClosed &)
383 {
|
384 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
385 konrad.r 1.108 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
386 // processed.
387 delete msg;
388 }
|
389 konrad.r 1.112 catch (const Permission &)
390 {
391 delete msg;
392 }
|
393 mday 1.7 }
394
|
395 mday 1.22
396
|
397 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
398 mday 1.22 {
|
399 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
400
|
401 mday 1.22 Base::enqueue(msg);
|
402 kumpf 1.28
403 PEG_METHOD_EXIT();
|
404 mday 1.22 }
405
406
|
407 kumpf 1.103 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
408 void * parm)
409 {
|
410 konrad.r 1.107 MessageQueueService* service =
411 reinterpret_cast<MessageQueueService*>(parm);
412 PEGASUS_ASSERT(service != 0);
|
413 kumpf 1.103 try
414 {
415
|
416 mike 1.118 if (service->_die.get() != 0)
|
417 kumpf 1.103 {
|
418 denise.eckstein 1.116 service->_threads--;
|
419 kumpf 1.103 return (0);
420 }
421 // pull messages off the incoming queue and dispatch them. then
422 // check pending messages that are non-blocking
423 AsyncOpNode *operation = 0;
424
425 // many operations may have been queued.
426 do
427 {
428 try
429 {
|
430 mike 1.119.12.1 operation = service->_incoming.dequeue();
|
431 kumpf 1.103 }
432 catch (ListClosed &)
433 {
|
434 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
435 //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
436 // "Caught ListClosed exception. Exiting _req_proc.");
|
437 kumpf 1.103 break;
438 }
439
440 if (operation)
441 {
442 operation->_service_ptr = service;
443 service->_handle_incoming_operation(operation);
444 }
445 } while (operation);
446 }
447 catch (const Exception& e)
448 {
449 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
450 String("Caught exception: \"") + e.getMessage() +
451 "\". Exiting _req_proc.");
452 }
453 catch (...)
454 {
455 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
456 "Caught unrecognized exception. Exiting _req_proc.");
457 }
|
458 konrad.r 1.107 service->_threads--;
|
459 kumpf 1.103 return(0);
|
460 mday 1.1 }
461
|
462 mday 1.43
|
463 kumpf 1.104 void MessageQueueService::_sendwait_callback(
464 AsyncOpNode *op,
465 MessageQueue *q,
466 void *parm)
|
467 mday 1.33 {
468 op->_client_sem.signal();
469 }
470
|
471 mday 1.30
472 // callback function is responsible for cleaning up all resources
473 // including op, op->_callback_node, and op->_callback_ptr
|
474 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
475 {
|
476 kumpf 1.104 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
|
477 mday 1.39 {
478
479 Message *msg = op->get_request();
|
480 kumpf 1.104 if (msg && (msg->getMask() & message_mask::ha_async))
|
481 mday 1.39 {
|
482 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
483 {
484 AsyncLegacyOperationStart *wrapper =
485 static_cast<AsyncLegacyOperationStart *>(msg);
486 msg = wrapper->get_action();
487 delete wrapper;
488 }
489 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
490 {
491 AsyncModuleOperationStart *wrapper =
492 static_cast<AsyncModuleOperationStart *>(msg);
493 msg = wrapper->get_action();
494 delete wrapper;
495 }
496 else if (msg->getType() == async_messages::ASYNC_OP_START)
497 {
498 AsyncOperationStart *wrapper =
499 static_cast<AsyncOperationStart *>(msg);
500 msg = wrapper->get_action();
501 delete wrapper;
502 }
503 kumpf 1.104 delete msg;
|
504 mday 1.39 }
505
506 msg = op->get_response();
|
507 kumpf 1.104 if (msg && (msg->getMask() & message_mask::ha_async))
|
508 mday 1.39 {
|
509 kumpf 1.104 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
510 {
511 AsyncLegacyOperationResult *wrapper =
512 static_cast<AsyncLegacyOperationResult *>(msg);
513 msg = wrapper->get_result();
514 delete wrapper;
515 }
516 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
517 {
518 AsyncModuleOperationResult *wrapper =
519 static_cast<AsyncModuleOperationResult *>(msg);
520 msg = wrapper->get_result();
521 delete wrapper;
522 }
|
523 mday 1.39 }
524 void (*callback)(Message *, void *, void *) = op->__async_callback;
525 void *handle = op->_callback_handle;
526 void *parm = op->_callback_parameter;
527 op->release();
528 return_op(op);
529 callback(msg, handle, parm);
530 }
|
531 kumpf 1.104 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
|
532 mday 1.39 {
|
533 kumpf 1.104 // note that _callback_node may be different from op
534 // op->_callback_response_q is a "this" pointer we can use for
|
535 mday 1.39 // static callback methods
536 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
537 }
|
538 mday 1.22 }
539
|
540 mday 1.1
|
541 kumpf 1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
|
542 mday 1.1 {
|
543 kumpf 1.104 if (operation != 0)
|
544 mday 1.5 {
|
545 kumpf 1.104
546 // ATTN: optimization
|
547 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
548 mday 1.6 operation->lock();
|
549 kumpf 1.104
|
550 mike 1.119.12.1 Message *rq = operation->_request.front();
|
551 kumpf 1.104
|
552 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
553 kumpf 1.104 // move this to the bottom of the loop when the majority of
554 // messages become async messages.
|
555 mday 1.31
|
556 mday 1.18 // divert legacy messages to handleEnqueue
|
557 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
558 mday 1.18 {
|
559 mike 1.119.12.1 rq = operation->_request.remove_front() ;
|
560 kumpf 1.104 operation->unlock();
561 // delete the op node
562 operation->release();
563 return_op(operation);
|
564 mday 1.24
|
565 kumpf 1.104 handleEnqueue(rq);
566 return;
|
567 mday 1.18 }
568
|
569 kumpf 1.104 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
570 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
571 (operation->_state & ASYNC_OPSTATE_COMPLETE))
|
572 mday 1.29 {
|
573 kumpf 1.104 operation->unlock();
574 _handle_async_callback(operation);
|
575 mday 1.29 }
|
576 kumpf 1.104 else
|
577 mday 1.29 {
|
578 kumpf 1.104 PEGASUS_ASSERT(rq != 0);
579 operation->unlock();
580 _handle_async_request(static_cast<AsyncRequest *>(rq));
|
581 mday 1.29 }
|
582 mday 1.5 }
583 return;
|
584 mday 1.1 }
585
586 void MessageQueueService::_handle_async_request(AsyncRequest *req)
587 {
|
588 kumpf 1.104 if (req != 0)
|
589 mday 1.4 {
590 req->op->processing();
|
591 kumpf 1.104
|
592 mday 1.4 Uint32 type = req->getType();
|
593 kumpf 1.104 if (type == async_messages::HEARTBEAT)
594 handle_heartbeat_request(req);
|
595 mday 1.4 else if (type == async_messages::IOCTL)
|
596 kumpf 1.104 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
|
597 mday 1.4 else if (type == async_messages::CIMSERVICE_START)
|
598 kumpf 1.104 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
|
599 mday 1.4 else if (type == async_messages::CIMSERVICE_STOP)
|
600 kumpf 1.104 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
|
601 mday 1.4 else if (type == async_messages::CIMSERVICE_PAUSE)
|
602 kumpf 1.104 handle_CimServicePause(static_cast<CimServicePause *>(req));
|
603 mday 1.4 else if (type == async_messages::CIMSERVICE_RESUME)
|
604 kumpf 1.104 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
605 else if (type == async_messages::ASYNC_OP_START)
606 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
607 else
|
608 mday 1.4 {
|
609 kumpf 1.104 // we don't handle this request message
610 _make_response(req, async_results::CIM_NAK);
|
611 mday 1.4 }
|
612 mday 1.1 }
613 }
614
|
615 mday 1.17
616 Boolean MessageQueueService::_enqueueResponse(
|
617 kumpf 1.104 Message* request,
|
618 mday 1.17 Message* response)
619 {
|
620 w.white 1.102
621 STAT_COPYDISPATCHER
622
|
623 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
624 "MessageQueueService::_enqueueResponse");
|
625 mday 1.25
|
626 kumpf 1.104 if (request->getMask() & message_mask::ha_async)
|
627 mday 1.25 {
|
628 kumpf 1.104 if (response->getMask() & message_mask::ha_async)
|
629 mday 1.25 {
|
630 kumpf 1.104 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
631 static_cast<AsyncReply *>(response),
632 ASYNC_OPSTATE_COMPLETE, 0);
|
633 kumpf 1.37 PEG_METHOD_EXIT();
|
634 kumpf 1.104 return true;
|
635 mday 1.25 }
636 }
|
637 kumpf 1.104
638 if (request->_async != 0)
|
639 mday 1.17 {
640 Uint32 mask = request->_async->getMask();
|
641 kumpf 1.104 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));
642
|
643 mday 1.18 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
644 AsyncOpNode *op = async->op;
645 request->_async = 0;
|
646 mday 1.63 // the legacy request is going to be deleted by its handler
|
647 kumpf 1.104 // remove it from the op node
|
648 mday 1.63
649 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
650 kumpf 1.104
651 AsyncLegacyOperationResult *async_result =
652 new AsyncLegacyOperationResult(
653 async->getKey(),
654 async->getRouting(),
655 op,
656 response);
657 _completeAsyncResponse(
658 async,
659 async_result,
660 ASYNC_OPSTATE_COMPLETE,
661 0);
|
662 kumpf 1.37 PEG_METHOD_EXIT();
|
663 mday 1.18 return true;
|
664 mday 1.17 }
|
665 kumpf 1.104
|
666 mday 1.18 // ensure that the destination queue is in response->dest
|
667 kumpf 1.37 PEG_METHOD_EXIT();
|
668 mday 1.24 return SendForget(response);
|
669 kumpf 1.104
|
670 mday 1.17 }
671
|
672 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
673 mday 1.1 {
|
674 mday 1.19 cimom::_make_response(req, code);
|
675 mday 1.1 }
676
677
|
678 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
679 AsyncRequest *request,
680 AsyncReply *reply,
681 Uint32 state,
682 Uint32 flag)
|
683 mday 1.5 {
|
684 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
685 "MessageQueueService::_completeAsyncResponse");
686
|
687 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
688 kumpf 1.37
689 PEG_METHOD_EXIT();
|
690 mday 1.5 }
691
692
|
693 kumpf 1.104 void MessageQueueService::_complete_op_node(
694 AsyncOpNode *op,
695 Uint32 state,
696 Uint32 flag,
697 Uint32 code)
|
698 mday 1.32 {
699 cimom::_complete_op_node(op, state, flag, code);
700 }
701
|
702 mday 1.5
703 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
704 {
|
705 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
706 mday 1.8 return false;
|
707 kumpf 1.104 if (_polling_thread == NULL)
708 {
709 _polling_thread = new Thread(
710 polling_routine,
711 reinterpret_cast<void *>(&_polling_list),
712 false);
|
713 konrad.r 1.113 ThreadStatus tr = PEGASUS_THREAD_OK;
714 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
|
715 a.arora 1.93 {
|
716 denise.eckstein 1.116 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
|
717 konrad.r 1.113 pegasus_yield();
|
718 denise.eckstein 1.116 else
719 throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
720 "Could not allocate thread for the polling thread."));
|
721 a.arora 1.93 }
|
722 kumpf 1.104 }
723 // ATTN optimization remove the message checking altogether in the base
|
724 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
725 mday 1.6 op->lock();
|
726 mike 1.119.12.1 Message *rq = op->_request.front();
727 Message *rp = op->_response.front();
|
728 mday 1.6 op->unlock();
|
729 kumpf 1.104
730 if ((rq != 0 && (true == messageOK(rq))) ||
|
731 mike 1.118 (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
|
732 mday 1.5 {
|
733 mike 1.119.12.1 _incoming.enqueue_wait(op);
|
734 mday 1.53 _polling_sem.signal();
|
735 mday 1.5 return true;
736 }
737 return false;
738 }
739
740 Boolean MessageQueueService::messageOK(const Message *msg)
741 {
|
742 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
743 mday 1.8 return false;
|
744 mday 1.18 return true;
745 }
746
|
747 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
748 {
|
749 kumpf 1.104 // default action is to echo a heartbeat response
750
751 AsyncReply *reply = new AsyncReply(
752 async_messages::HEARTBEAT,
753 req->getKey(),
754 req->getRouting(),
755 0,
756 req->op,
757 async_results::OK,
758 req->resp,
759 false);
760 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
761 mday 1.1 }
762
763
764 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
|
765 kumpf 1.104 {
|
766 mday 1.1 }
|
767 kumpf 1.104
|
768 mday 1.1 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
769 {
|
770 kumpf 1.104 switch (req->ctl)
|
771 mday 1.8 {
772 case AsyncIoctl::IO_CLOSE:
773 {
|
774 kumpf 1.104 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
775 kumpf 1.81
776 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
777 kumpf 1.104 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
778 kumpf 1.81 #endif
|
779 kumpf 1.104
780 // respond to this message. this is fire and forget, so we don't need to delete anything.
781 // this takes care of two problems that were being found
782 // << Thu Oct 9 10:52:48 2003 mdd >>
783 _make_response(req, async_results::OK);
784 // ensure we do not accept any further messages
785
786 // ensure we don't recurse on IO_CLOSE
|
787 mike 1.118 if (_incoming_queue_shutdown.get() > 0)
|
788 kumpf 1.104 break;
789
790 // set the closing flag
791 service->_incoming_queue_shutdown = 1;
792 // empty out the queue
793 while (1)
794 {
795 AsyncOpNode *operation;
796 try
797 {
|
798 mike 1.119.12.1 operation = service->_incoming.dequeue();
|
799 kumpf 1.104 }
800 catch(IPCException &)
801 {
802 break;
803 }
804 if (operation)
805 {
806 operation->_service_ptr = service;
807 service->_handle_incoming_operation(operation);
808 }
809 else
810 break;
811 } // message processing loop
812
|
813 mike 1.119.12.1 // shutdown the AsyncQueue
|
814 kumpf 1.104 service->_incoming.shutdown_queue();
815 return;
|
816 mday 1.8 }
817
818 default:
|
819 kumpf 1.104 _make_response(req, async_results::CIM_NAK);
|
820 mday 1.8 }
|
821 mday 1.1 }
|
822 mday 1.8
|
823 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
824 {
|
825 mday 1.79
|
826 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
827 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
828 kumpf 1.81 #endif
|
829 kumpf 1.104
|
830 mday 1.10 // clear the stoped bit and update
|
831 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
832 mday 1.10 _make_response(req, async_results::OK);
|
833 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
834 mday 1.10 update_service(_capabilities, _mask);
835
|
836 mday 1.1 }
837 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
838 {
|
839 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
840 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
841 kumpf 1.81 #endif
|
842 mday 1.10 // set the stopeed bit and update
843 _capabilities |= module_capabilities::stopped;
844 _make_response(req, async_results::CIM_STOPPED);
|
845 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
846 mday 1.10 update_service(_capabilities, _mask);
|
847 mday 1.1 }
|
848 kumpf 1.104
|
849 mday 1.1 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
850 {
|
851 mday 1.10 // set the paused bit and update
|
852 mday 1.13 _capabilities |= module_capabilities::paused;
|
853 mday 1.11 update_service(_capabilities, _mask);
|
854 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
|
855 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
856 mday 1.1 }
|
857 kumpf 1.104
|
858 mday 1.1 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
859 {
|
860 mday 1.10 // clear the paused bit and update
|
861 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
862 mday 1.11 update_service(_capabilities, _mask);
|
863 mday 1.10 _make_response(req, async_results::OK);
|
864 kumpf 1.104 // now tell the meta dispatcher we are stopped
|
865 mday 1.1 }
|
866 kumpf 1.104
|
867 mday 1.1 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
868 {
869 _make_response(req, async_results::CIM_NAK);
870 }
871
872 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
873 {
|
874 mday 1.14 ;
875 }
876
|
877 mday 1.10
|
878 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
879 {
880 // remove the legacy message from the request and enqueue it to its destination
881 Uint32 result = async_results::CIM_NAK;
|
882 kumpf 1.104
|
883 mday 1.25 Message *legacy = req->_act;
|
884 kumpf 1.104 if (legacy != 0)
|
885 mday 1.14 {
|
886 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
887 kumpf 1.104 if (queue != 0)
|
888 mday 1.14 {
|
889 kumpf 1.104 if (queue->isAsync() == true)
890 {
891 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
892 }
893 else
894 {
895 // Enqueue the response:
896 queue->enqueue(req->get_action());
897 }
898
899 result = async_results::OK;
|
900 mday 1.14 }
901 }
902 _make_response(req, result);
903 }
904
905 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
906 {
907 ;
|
908 mday 1.1 }
909
|
910 kumpf 1.104 AsyncOpNode *MessageQueueService::get_op()
|
911 mday 1.1 {
|
912 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
913 kumpf 1.104
|
914 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
915 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
916 kumpf 1.104
|
917 mday 1.1 return op;
918 }
919
920 void MessageQueueService::return_op(AsyncOpNode *op)
921 {
|
922 kumpf 1.104 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);
|
923 mday 1.4 delete op;
|
924 mday 1.1 }
925
|
926 mday 1.18
|
927 kumpf 1.104 Boolean MessageQueueService::ForwardOp(
928 AsyncOpNode *op,
929 Uint32 destination)
|
930 mday 1.29 {
|
931 kumpf 1.104 PEGASUS_ASSERT(op != 0);
|
932 mday 1.30 op->lock();
933 op->_op_dest = MessageQueue::lookup(destination);
|
934 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
935 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
936 mday 1.30 op->unlock();
|
937 kumpf 1.104 if (op->_op_dest == 0)
|
938 mday 1.30 return false;
|
939 kumpf 1.104
|
940 mday 1.29 return _meta_dispatcher->route_async(op);
941 }
942
|
943 kumpf 1.104
944 Boolean MessageQueueService::SendAsync(
945 AsyncOpNode *op,
946 Uint32 destination,
947 void (*callback)(AsyncOpNode *, MessageQueue *, void *),
948 MessageQueue *callback_response_q,
949 void *callback_ptr)
950 {
951 PEGASUS_ASSERT(op != 0 && callback != 0);
952
|
953 mday 1.21 // get the queue handle for the destination
954
|
955 mday 1.30 op->lock();
|
956 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
957 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
958 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
|
959 mday 1.30 // initialize the callback data
|
960 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
961 op->_callback_node = op; // the op node
962 op->_callback_response_q = callback_response_q; // the queue that will receive the response
963 op->_callback_ptr = callback_ptr; // user data for callback
964 op->_callback_request_q = this; // I am the originator of this request
|
965 kumpf 1.104
|
966 mday 1.30 op->unlock();
|
967 kumpf 1.104 if (op->_op_dest == 0)
|
968 mday 1.30 return false;
|
969 kumpf 1.104
|
970 mday 1.21 return _meta_dispatcher->route_async(op);
|
971 mday 1.18 }
972
973
|
974 kumpf 1.104 Boolean MessageQueueService::SendAsync(
975 Message *msg,
976 Uint32 destination,
977 void (*callback)(Message *response, void *handle, void *parameter),
978 void *handle,
979 void *parameter)
|
980 mday 1.39 {
|
981 kumpf 1.104 if (msg == NULL)
|
982 mday 1.39 return false;
|
983 kumpf 1.104 if (callback == NULL)
|
984 mday 1.39 return SendForget(msg);
985 AsyncOpNode *op = get_op();
|
986 mday 1.43 msg->dest = destination;
|
987 kumpf 1.104 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
|
988 mday 1.39 {
989 op->release();
990 return_op(op);
991 return false;
992 }
993 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
994 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
995 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
996 op->__async_callback = callback;
997 op->_callback_node = op;
998 op->_callback_handle = handle;
999 op->_callback_parameter = parameter;
1000 op->_callback_response_q = this;
1001
|
1002 kumpf 1.104 if (!(msg->getMask() & message_mask::ha_async))
|
1003 mday 1.39 {
|
1004 kumpf 1.104 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
1005 get_next_xid(),
1006 op,
1007 destination,
1008 msg,
1009 destination);
|
1010 mday 1.39 }
|
1011 kumpf 1.104 else
|
1012 mday 1.39 {
|
1013 mike 1.119.12.1 op->_request.insert_front(msg);
|
1014 mday 1.39 (static_cast<AsyncMessage *>(msg))->op = op;
1015 }
1016 return _meta_dispatcher->route_async(op);
1017 }
1018
1019
|
1020 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
1021 {
1022 AsyncOpNode *op = 0;
|
1023 mday 1.22 Uint32 mask = msg->getMask();
|
1024 kumpf 1.104
|
1025 mday 1.22 if (mask & message_mask::ha_async)
|
1026 mday 1.18 {
1027 op = (static_cast<AsyncMessage *>(msg))->op ;
1028 }
|
1029 mday 1.22
|
1030 kumpf 1.104 if (op == 0)
|
1031 mday 1.20 {
|
1032 mday 1.18 op = get_op();
|
1033 mike 1.119.12.1 op->_request.insert_front(msg);
|
1034 mday 1.22 if (mask & message_mask::ha_async)
|
1035 kumpf 1.104 {
1036 (static_cast<AsyncMessage *>(msg))->op = op;
1037 }
|
1038 mday 1.20 }
|
1039 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
1040 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
1041 kumpf 1.104 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1042 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
1043 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
1044 kumpf 1.104 if (op->_op_dest == 0)
|
1045 mday 1.39 {
1046 op->release();
1047 return_op(op);
|
1048 mday 1.21 return false;
|
1049 mday 1.39 }
|
1050 kumpf 1.104
|
1051 mday 1.18 // now see if the meta dispatcher will take it
|
1052 mday 1.30 return _meta_dispatcher->route_async(op);
|
1053 mday 1.18 }
|
1054 mday 1.2
|
1055 mday 1.1
|
1056 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
1057 mday 1.1 {
|
1058 kumpf 1.104 if (request == 0)
|
1059 mday 1.4 return 0 ;
|
1060 mday 1.5
1061 Boolean destroy_op = false;
|
1062 kumpf 1.104
|
1063 s.hills 1.80 if (request->op == 0)
|
1064 mday 1.5 {
1065 request->op = get_op();
|
1066 mike 1.119.12.1 request->op->_request.insert_front(request);
|
1067 mday 1.5 destroy_op = true;
1068 }
|
1069 kumpf 1.104
|
1070 mday 1.33 request->block = false;
|
1071 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
1072 kumpf 1.104 SendAsync(
1073 request->op,
1074 request->dest,
1075 _sendwait_callback,
1076 this,
1077 (void *)0);
1078
|
1079 baggio.br 1.100 request->op->_client_sem.wait();
1080
|
1081 mday 1.6 request->op->lock();
|
1082 mike 1.119.12.1 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_front());
|
1083 mday 1.6 rpl->op = 0;
1084 request->op->unlock();
|
1085 kumpf 1.104
1086 if (destroy_op == true)
|
1087 mday 1.5 {
|
1088 mday 1.6 request->op->lock();
1089 request->op->_request.remove(request);
1090 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1091 request->op->unlock();
1092 return_op(request->op);
|
1093 mday 1.7 request->op = 0;
|
1094 mday 1.5 }
1095 return rpl;
|
1096 mday 1.1 }
1097
1098
|
1099 kumpf 1.104 Boolean MessageQueueService::register_service(
1100 String name,
1101 Uint32 capabilities,
1102 Uint32 mask)
1103 {
1104 RegisterCimService *msg = new RegisterCimService(
1105 get_next_xid(),
1106 0,
1107 true,
1108 name,
1109 capabilities,
1110 mask,
1111 _queueId);
1112 msg->dest = CIMOM_Q_ID;
|
1113 mday 1.1
1114 Boolean registered = false;
|
1115 kumpf 1.104 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1116
1117 if (reply != 0)
1118 {
1119 if (reply->getMask() & message_mask::ha_async)
1120 {
1121 if (reply->getMask() & message_mask::ha_reply)
1122 {
1123 if (reply->result == async_results::OK ||
1124 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1125 {
1126 registered = true;
1127 }
1128 }
|
1129 mday 1.1 }
|
1130 kumpf 1.104
1131 delete reply;
|
1132 mday 1.1 }
|
1133 mday 1.5 delete msg;
|
1134 mday 1.1 return registered;
1135 }
1136
1137 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1138 {
|
1139 kumpf 1.104 UpdateCimService *msg = new UpdateCimService(
1140 get_next_xid(),
1141 0,
1142 true,
1143 _queueId,
1144 _capabilities,
1145 _mask);
|
1146 mday 1.1 Boolean registered = false;
|
1147 mday 1.2
1148 AsyncMessage *reply = SendWait(msg);
1149 if (reply)
|
1150 mday 1.1 {
|
1151 kumpf 1.104 if (reply->getMask() & message_mask::ha_async)
|
1152 mday 1.1 {
|
1153 kumpf 1.104 if (reply->getMask() & message_mask::ha_reply)
1154 {
1155 if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1156 {
1157 registered = true;
1158 }
1159 }
|
1160 mday 1.1 }
1161 delete reply;
1162 }
|
1163 mday 1.5 delete msg;
|
1164 mday 1.1 return registered;
1165 }
1166
1167
|
1168 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1169 mday 1.1 {
|
1170 mday 1.3
|
1171 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1172 return true;
|
1173 mday 1.1 }
1174
1175
|
1176 kumpf 1.104 void MessageQueueService::find_services(
1177 String name,
1178 Uint32 capabilities,
1179 Uint32 mask,
1180 Array<Uint32> *results)
|
1181 mday 1.1 {
|
1182 kumpf 1.104 if (results == 0)
1183 {
|
1184 mday 1.1 throw NullPointer();
|
1185 kumpf 1.104 }
1186
|
1187 mday 1.1 results->clear();
|
1188 kumpf 1.104
1189 FindServiceQueue *req = new FindServiceQueue(
1190 get_next_xid(),
1191 0,
1192 _queueId,
1193 true,
1194 name,
1195 capabilities,
1196 mask);
1197
|
1198 mday 1.44 req->dest = CIMOM_Q_ID;
|
1199 kumpf 1.104
1200 AsyncMessage *reply = SendWait(req);
1201 if (reply)
1202 {
1203 if (reply->getMask() & message_mask::ha_async)
1204 {
1205 if (reply->getMask() & message_mask::ha_reply)
1206 {
1207 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1208 {
1209 if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1210 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1211 }
1212 }
|
1213 mday 1.1 }
1214 delete reply;
1215 }
|
1216 mday 1.5 delete req;
|
1217 mday 1.1 return ;
1218 }
1219
1220 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1221 {
|
1222 kumpf 1.104 if (result == 0)
1223 {
|
1224 mday 1.1 throw NullPointer();
|
1225 kumpf 1.104 }
1226
1227 EnumerateService *req = new EnumerateService(
1228 get_next_xid(),
1229 0,
1230 _queueId,
1231 true,
1232 queue);
1233
|
1234 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1235 kumpf 1.104
|
1236 mday 1.2 if (reply)
|
1237 mday 1.1 {
1238 Boolean found = false;
|
1239 kumpf 1.104
1240 if (reply->getMask() & message_mask::ha_async)
|
1241 mday 1.1 {
|
1242 kumpf 1.104 if (reply->getMask() & message_mask::ha_reply)
1243 {
1244 if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1245 {
1246 if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1247 {
1248 if (found == false)
1249 {
1250 found = true;
1251
1252 result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1253 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1254 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1255 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1256 }
1257 }
1258 }
1259 }
|
1260 mday 1.1 }
1261 delete reply;
1262 }
|
1263 mday 1.5 delete req;
|
1264 kumpf 1.104
|
1265 mday 1.1 return;
1266 }
1267
|
1268 kumpf 1.104 Uint32 MessageQueueService::get_next_xid()
|
1269 mday 1.1 {
|
1270 kumpf 1.119.12.2 AutoMutex autoMut(_xidMutex);
1271 return ++_xid;
|
1272 mday 1.1 }
1273
1274 PEGASUS_NAMESPACE_END
|