1 w.otsuka 1.88.2.1 //%2005////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 w.otsuka 1.88.2.1 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mday 1.1 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 karl 1.82 //
|
19 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By:
|
33 a.arora 1.87 // Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
|
34 mday 1.1 //
35 //%/////////////////////////////////////////////////////////////////////////////
36
37 #include "MessageQueueService.h"
|
38 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
39 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
40 mday 1.1
41 PEGASUS_NAMESPACE_BEGIN
42
|
43 mday 1.15
44 cimom *MessageQueueService::_meta_dispatcher = 0;
45 AtomicInt MessageQueueService::_service_count = 0;
46 AtomicInt MessageQueueService::_xid(1);
|
47 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
48 mday 1.15
|
49 mday 1.58 static struct timeval create_time = {0, 1};
|
50 mday 1.70 static struct timeval destroy_time = {300, 0};
|
51 mday 1.59 static struct timeval deadlock_time = {0, 0};
|
52 mday 1.53
|
53 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
54 mday 1.53
|
55 kumpf 1.88.2.7 DQueue<MessageQueueService>* MessageQueueService::_polling_list = 0;
|
56 mday 1.53
|
57 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
58 mday 1.61
|
59 mday 1.69 ThreadPool *MessageQueueService::get_thread_pool(void)
60 {
61 return _thread_pool;
62 }
63
|
64 denise.eckstein 1.88.2.4 //
65 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
66 //
67 // JR Wunderlich Jun 6, 2005
68 //
69
70 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
71 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
72
73 Uint32 max_threads_per_svc_queue;
74
|
75 mday 1.69 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::kill_idle_threads(void *parm)
|
76 mday 1.53 {
|
77 mday 1.69
78 static struct timeval now, last = {0,0};
|
79 mday 1.53 gettimeofday(&now, NULL);
|
80 mday 1.56 int dead_threads = 0;
|
81 mday 1.53
|
82 mday 1.70 if( now.tv_sec - last.tv_sec > 120 )
|
83 mday 1.53 {
84 gettimeofday(&last, NULL);
|
85 mday 1.56 try
86 {
|
87 mday 1.69 dead_threads = MessageQueueService::_thread_pool->kill_dead_threads();
|
88 mday 1.56 }
|
89 mday 1.69 catch(...)
|
90 mday 1.56 {
91
92 }
|
93 mday 1.53 }
|
94 w.otsuka 1.88.2.1
95 #ifdef PEGASUS_POINTER_64BIT
96 return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
97 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
98 return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
99 #else
100 return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
101 #endif
|
102 mday 1.53 }
103
|
104 mday 1.68
|
105 mday 1.76 void MessageQueueService::force_shutdown(Boolean destroy_flag)
|
106 mday 1.68 {
|
107 mday 1.79 return;
108
|
109 kumpf 1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
110 humberto 1.71 //l10n
111 MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
|
112 humberto 1.74 "Forcing shutdown of CIMOM Message Router");
|
113 humberto 1.71 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
|
114 kumpf 1.75 #endif
|
115 mday 1.76
|
116 kumpf 1.75
|
117 mday 1.68 MessageQueueService *svc;
|
118 konrad.r 1.73 int counter = 0;
|
119 kumpf 1.88.2.7 _polling_list->lock();
120 svc = _polling_list->next(0);
|
121 humberto 1.71
|
122 mday 1.68 while(svc != 0)
123 {
|
124 kumpf 1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
125 humberto 1.71 //l10n - reuse same MessageLoaderParms to avoid multiple creates
126 parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";
127 parms.default_msg = "Stopping $0";
128 parms.arg0 = svc->getQueueName();
129 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
|
130 kumpf 1.75 #endif
|
131 humberto 1.71
|
132 kumpf 1.88.2.7 _polling_sem->signal();
|
133 mday 1.68 svc->_shutdown_incoming_queue();
|
134 konrad.r 1.73 counter++;
|
135 kumpf 1.88.2.7 _polling_sem->signal();
136 svc = _polling_list->next(svc);
|
137 mday 1.68 }
|
138 kumpf 1.88.2.7 _polling_list->unlock();
|
139 konrad.r 1.73
|
140 kumpf 1.88.2.7 _polling_sem->signal();
|
141 konrad.r 1.73
|
142 kumpf 1.88.2.7 *MessageQueueService::_stop_polling = 1;
|
143 mday 1.76
144 if(destroy_flag == true)
145 {
146
|
147 kumpf 1.88.2.7 svc = _polling_list->remove_last();
|
148 mday 1.76 while(svc)
149 {
150 delete svc;
|
151 kumpf 1.88.2.7 svc = _polling_list->remove_last();
|
152 mday 1.76 }
153
|
154 konrad.r 1.73 }
|
155 mday 1.68 }
156
157
|
158 mday 1.53 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
159 {
160 Thread *myself = reinterpret_cast<Thread *>(parm);
161 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
|
162 kumpf 1.88.2.7 while ( _stop_polling->value() == 0 )
|
163 mday 1.53 {
|
164 kumpf 1.88.2.7 _polling_sem->wait();
165 if(_stop_polling->value() != 0 )
|
166 mday 1.61 {
|
167 kumpf 1.62 break;
|
168 mday 1.61 }
|
169 denise.eckstein 1.88.2.5
170 // The polling_routine thread must hold the lock on the
171 // _polling_thread list while processing incoming messages.
172 // This lock is used to give this thread ownership of
173 // services on the _polling_routine list.
174
175 // This is necessary to avoid confict with other threads
176 // processing the _polling_list
177 // (e.g., MessageQueueServer::~MessageQueueService).
178
|
179 mday 1.53 list->lock();
180 MessageQueueService *service = list->next(0);
|
181 denise.eckstein 1.88.2.5 ThreadStatus rtn = PEGASUS_THREAD_OK;
182 while (service != NULL)
|
183 mday 1.53 {
|
184 denise.eckstein 1.88.2.5 if ((service->_incoming.count() > 0) &&
185 (service->_die.value() == 0) &&
186 (service->_threads < max_threads_per_svc_queue))
187 {
188 // The _threads count is used to track the
189 // number of active threads that have been allocated
190 // to process messages for this service.
191
192 // The _threads count MUST be incremented while
193 // the polling_routine owns the _polling_thread
194 // lock and has ownership of the service object.
195
196 service->_threads++;
197 try
198 {
199 rtn = _thread_pool->allocate_and_awaken(
|
200 kumpf 1.88.2.7 service, _req_proc, _polling_sem);
|
201 denise.eckstein 1.88.2.5 }
202 catch (...)
203 {
204 service->_threads--;
205
206 // allocate_and_awaken should never generate an exception.
207 PEGASUS_ASSERT(0);
208 }
209 // if no more threads available, break from processing loop
210 if (rtn != PEGASUS_THREAD_OK )
211 {
212 service->_threads--;
213 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
214 "Not enough threads to process this request. Skipping.");
215
216 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
217 "Could not allocate thread for %s. " \
218 "Queue has %d messages waiting and %d threads servicing." \
219 "Skipping the service for right now. ",
220 service->getQueueName(),
221 service->_incoming.count(),
222 denise.eckstein 1.88.2.5 service->_threads.value());
223
224 pegasus_yield();
225 service = NULL;
226 }
227 }
228 if (service != NULL)
229 {
230 service = list->next(service);
231 }
|
232 mday 1.53 }
233 list->unlock();
|
234 denise.eckstein 1.88.2.5
|
235 kumpf 1.88.2.7 if(_check_idle_flag->value() != 0 )
|
236 mday 1.69 {
|
237 kumpf 1.88.2.7 *_check_idle_flag = 0;
|
238 kumpf 1.84
|
239 denise.eckstein 1.88.2.3 // try to do idle thread clean up processing when system is not busy
240 // if system is busy there may not be a thread available to allocate
241 // so nothing will be done and that is OK.
242
243 if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads,
|
244 kumpf 1.88.2.7 _polling_sem) != PEGASUS_THREAD_OK)
|
245 denise.eckstein 1.88.2.3 {
246 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
247 "Not enough threads to kill idle threads. What an irony.");
248
249 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
250 "Could not allocate thread to kill idle threads." \
251 "Skipping. ");
252 }
|
253 mday 1.69 }
|
254 mday 1.53 }
255 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
256 return(0);
257 }
258
259
|
260 kumpf 1.88.2.7 Semaphore* MessageQueueService::_polling_sem = 0;
261 AtomicInt* MessageQueueService::_stop_polling = 0;
262 AtomicInt* MessageQueueService::_check_idle_flag = 0;
|
263 mday 1.53
|
264 mday 1.15
|
265 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
266 Uint32 queueID,
267 Uint32 capabilities,
268 Uint32 mask)
|
269 mday 1.15 : Base(name, true, queueID),
|
270 mday 1.22
|
271 mday 1.1 _mask(mask),
|
272 mday 1.4 _die(0),
|
273 denise.eckstein 1.88.2.3 _threads(0),
|
274 mday 1.41 _incoming(true, 0),
|
275 mday 1.39 _callback(true),
|
276 mday 1.7 _incoming_queue_shutdown(0),
|
277 mday 1.39 _callback_ready(0),
278 _req_thread(_req_proc, this, false),
279 _callback_thread(_callback_proc, this, false)
|
280 mday 1.53
|
281 mday 1.1 {
|
282 mday 1.60
|
283 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
284
|
285 mday 1.1 _default_op_timeout.tv_sec = 30;
286 _default_op_timeout.tv_usec = 100;
|
287 mday 1.15
|
288 denise.eckstein 1.88.2.4 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
289
290 // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT
291 // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
292
293 if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)
294 {
295 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
296 }
297
298 // if requested threads eq 0 (unlimited)
299 // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
300
301 if (max_threads_per_svc_queue == 0)
302 {
303 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_DEFAULT;
304 }
305
306 // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;
307 // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;
308
|
309 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
310 mday 1.15
311 if( _meta_dispatcher == 0 )
312 {
|
313 kumpf 1.88.2.7 // Instantiate the common objects
314 _polling_list = new DQueue<MessageQueueService>(true);
315 _stop_polling = new AtomicInt(0);
316 _polling_sem = new Semaphore(0);
317 _check_idle_flag = new AtomicInt(0);
318
319 *_stop_polling = 0;
|
320 mday 1.15 PEGASUS_ASSERT( _service_count.value() == 0 );
321 _meta_dispatcher = new cimom();
322 if (_meta_dispatcher == NULL )
323 {
|
324 a.arora 1.87 throw NullPointer();
|
325 mday 1.15 }
|
326 mday 1.58 _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
|
327 mday 1.55 create_time, destroy_time, deadlock_time);
|
328 mday 1.61
|
329 kumpf 1.62 _polling_thread = new Thread(polling_routine,
|
330 kumpf 1.88.2.7 reinterpret_cast<void *>(_polling_list),
|
331 kumpf 1.62 false);
|
332 kumpf 1.83 while (!_polling_thread->run())
333 {
334 pegasus_yield();
335 }
|
336 mday 1.15 }
337 _service_count++;
338
339 if( false == register_service(name, _capabilities, _mask) )
340 {
|
341 humberto 1.71 //l10n
342 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
343 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
344 humberto 1.74 "MessageQueueService Base Unable to register with Meta Dispatcher");
|
345 humberto 1.71
346 throw BindFailedException(parms);
|
347 mday 1.15 }
348
|
349 kumpf 1.88.2.7 _polling_list->insert_last(this);
|
350 mday 1.53
|
351 a.arora 1.87 // _meta_dispatcher_mutex.unlock(); //Bug#1090
|
352 mday 1.39 // _callback_thread.run();
353
|
354 mday 1.53 // _req_thread.run();
|
355 mday 1.1 }
356
|
357 mday 1.4
|
358 mday 1.1 MessageQueueService::~MessageQueueService(void)
359 {
360 _die = 1;
|
361 mday 1.76
|
362 denise.eckstein 1.88.2.5 // The polling_routine locks the _polling_list while
363 // processing the incoming messages for services on the
364 // list. Deleting the service from the _polling_list
365 // prior to processing, avoids synchronization issues
366 // with the _polling_routine.
367
|
368 kumpf 1.88.2.7 _polling_list->remove(this);
|
369 denise.eckstein 1.88.2.5
370 _callback_ready.signal();
371
372 // ATTN: The code for closing the _incoming queue
373 // is not working correctly. In OpenPegasus 2.4,
374 // execution of the following code is very timing
375 // dependent. This needs to be fix.
376 // See Bug 4079 for details.
377 if (_incoming_queue_shutdown.value() == 0)
|
378 mday 1.76 {
|
379 denise.eckstein 1.88.2.5 _shutdown_incoming_queue();
|
380 mday 1.76 }
|
381 denise.eckstein 1.88.2.3
|
382 denise.eckstein 1.88.2.5 // Wait until all threads processing the messages
383 // for this service have completed.
384
385 while (_threads.value() > 0)
386 {
387 pegasus_yield();
388 }
|
389 denise.eckstein 1.88.2.3
|
390 mday 1.15 {
|
391 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
392 _service_count--;
393 if (_service_count.value() == 0 )
394 {
|
395 mday 1.76
|
396 kumpf 1.88.2.7 (*_stop_polling)++;
397 _polling_sem->signal();
|
398 kumpf 1.62 _polling_thread->join();
399 delete _polling_thread;
|
400 kumpf 1.65 _polling_thread = 0;
|
401 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
402 delete _meta_dispatcher;
|
403 kumpf 1.45 _meta_dispatcher = 0;
|
404 mday 1.76
|
405 kumpf 1.65 delete _thread_pool;
406 _thread_pool = 0;
|
407 kumpf 1.88.2.7
408 // Clean up the common objects
409 delete _check_idle_flag;
410 delete _polling_sem;
411 delete _stop_polling;
412 delete _polling_list;
|
413 a.arora 1.87 }
414 } // mutex unlocks here
|
415 konrad.r 1.72 // Clean up in case there are extra stuff on the queue.
|
416 denise.eckstein 1.88.2.3 while (_incoming.count())
417 {
418 try
419 {
420 delete _incoming.remove_first();
421 }
422 catch (const ListClosed &e)
423 {
424 // If the list is closed, there is nothing we can do.
425 break;
426 }
427 }
|
428 mday 1.53 }
|
429 mday 1.1
|
430 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
431 {
432
|
433 mday 1.76
|
434 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
435 return ;
|
436 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
437 0,
438 _queueId,
439 _queueId,
440 true,
441 AsyncIoctl::IO_CLOSE,
442 0,
443 0);
|
444 mday 1.9
|
445 mday 1.8 msg->op = get_op();
|
446 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
447 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
448 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
449 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
450 mday 1.41
|
451 mday 1.32 msg->op->_op_dest = this;
|
452 mday 1.8 msg->op->_request.insert_first(msg);
|
453 mday 1.32
|
454 denise.eckstein 1.88.2.3 try
455 {
456 _incoming.insert_last_wait(msg->op);
|
457 kumpf 1.88.2.7 _polling_sem->signal();
|
458 denise.eckstein 1.88.2.3 }
459 catch (const ListClosed &)
460 {
461 // This means the queue has already been shut-down (happens when there
462 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
463 // processed.
464 delete msg;
465 }
466 catch (const Permission &)
467 {
468 delete msg;
469 }
|
470 mday 1.7 }
471
|
472 mday 1.22
473
|
474 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
475 mday 1.22 {
|
476 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
477
|
478 mday 1.22 Base::enqueue(msg);
|
479 kumpf 1.28
480 PEG_METHOD_EXIT();
|
481 mday 1.22 }
482
483
|
484 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
485 {
486 Thread *myself = reinterpret_cast<Thread *>(parm);
487 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
488 AsyncOpNode *operation = 0;
489
490 while ( service->_die.value() == 0 )
491 {
492 service->_callback_ready.wait();
493
494 service->_callback.lock();
495 operation = service->_callback.next(0);
496 while( operation != NULL)
497 {
498 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
499 {
500 operation = service->_callback.remove_no_lock(operation);
501 PEGASUS_ASSERT(operation != NULL);
502 operation->_thread_ptr = myself;
503 operation->_service_ptr = service;
504 service->_handle_async_callback(operation);
505 mday 1.39 break;
506 }
507 operation = service->_callback.next(operation);
508 }
509 service->_callback.unlock();
510 }
511 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
512 return(0);
513 }
514
|
515 mday 1.22
|
516 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
517 mday 1.1 {
|
518 mday 1.53 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
519 mday 1.5 // pull messages off the incoming queue and dispatch them. then
520 // check pending messages that are non-blocking
|
521 mday 1.7 AsyncOpNode *operation = 0;
522
|
523 mday 1.56 if ( service->_die.value() == 0 )
524 {
|
525 mday 1.41 try
526 {
|
527 mday 1.53 operation = service->_incoming.remove_first();
|
528 mday 1.41 }
529 catch(ListClosed & )
530 {
|
531 mday 1.56 operation = 0;
|
532 denise.eckstein 1.88.2.3 service->_threads--;
|
533 mday 1.56 return(0);
|
534 mday 1.41 }
535 if( operation )
536 {
537 operation->_service_ptr = service;
538 service->_handle_incoming_operation(operation);
539 }
|
540 mday 1.56 }
|
541 denise.eckstein 1.88.2.3 service->_threads--;
|
542 mday 1.5 return(0);
|
543 mday 1.1 }
544
|
545 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
546 {
547 return _callback.count();
548 }
549
550
551
|
552 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
553 MessageQueue *q,
554 void *parm)
555 {
556 op->_client_sem.signal();
557 }
558
|
559 mday 1.30
560 // callback function is responsible for cleaning up all resources
561 // including op, op->_callback_node, and op->_callback_ptr
|
562 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
563 {
|
564 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
565 {
566
567 Message *msg = op->get_request();
568 if( msg && ( msg->getMask() & message_mask::ha_async))
569 {
570 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
571 {
572 AsyncLegacyOperationStart *wrapper =
573 static_cast<AsyncLegacyOperationStart *>(msg);
574 msg = wrapper->get_action();
575 delete wrapper;
576 }
577 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
578 {
579 AsyncModuleOperationStart *wrapper =
580 static_cast<AsyncModuleOperationStart *>(msg);
581 msg = wrapper->get_action();
582 delete wrapper;
583 }
584 else if (msg->getType() == async_messages::ASYNC_OP_START)
585 mday 1.39 {
586 AsyncOperationStart *wrapper =
587 static_cast<AsyncOperationStart *>(msg);
588 msg = wrapper->get_action();
589 delete wrapper;
590 }
591 delete msg;
592 }
593
594 msg = op->get_response();
595 if( msg && ( msg->getMask() & message_mask::ha_async))
596 {
597 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
598 {
599 AsyncLegacyOperationResult *wrapper =
600 static_cast<AsyncLegacyOperationResult *>(msg);
601 msg = wrapper->get_result();
602 delete wrapper;
603 }
604 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
605 {
606 mday 1.39 AsyncModuleOperationResult *wrapper =
607 static_cast<AsyncModuleOperationResult *>(msg);
608 msg = wrapper->get_result();
609 delete wrapper;
610 }
611 }
612 void (*callback)(Message *, void *, void *) = op->__async_callback;
613 void *handle = op->_callback_handle;
614 void *parm = op->_callback_parameter;
615 op->release();
616 return_op(op);
617 callback(msg, handle, parm);
618 }
619 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
620 {
621 // note that _callback_node may be different from op
622 // op->_callback_response_q is a "this" pointer we can use for
623 // static callback methods
624 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
625 }
|
626 mday 1.22 }
627
|
628 mday 1.1
|
629 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
630 // Thread *thread,
631 // MessageQueue *queue)
|
632 mday 1.1 {
|
633 mday 1.5 if ( operation != 0 )
634 {
|
635 mday 1.29
|
636 mday 1.22 // ATTN: optimization
637 // << Tue Feb 19 14:10:38 2002 mdd >>
|
638 mday 1.6 operation->lock();
|
639 mday 1.29
|
640 mday 1.6 Message *rq = operation->_request.next(0);
|
641 mday 1.29
|
642 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
643 // move this to the bottom of the loop when the majority of
644 // messages become async messages.
645
|
646 mday 1.18 // divert legacy messages to handleEnqueue
|
647 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
648 mday 1.18 {
649 rq = operation->_request.remove_first() ;
650 operation->unlock();
651 // delete the op node
|
652 mday 1.39 operation->release();
653 return_op( operation);
|
654 mday 1.24
|
655 mday 1.26 handleEnqueue(rq);
|
656 mday 1.18 return;
657 }
658
|
659 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
660 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
661 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
662 {
|
663 mday 1.49
|
664 mday 1.29 operation->unlock();
665 _handle_async_callback(operation);
666 }
667 else
668 {
669 PEGASUS_ASSERT(rq != 0 );
670 operation->unlock();
671 _handle_async_request(static_cast<AsyncRequest *>(rq));
672 }
|
673 mday 1.5 }
674 return;
|
675 mday 1.1 }
676
677 void MessageQueueService::_handle_async_request(AsyncRequest *req)
678 {
|
679 mday 1.4 if ( req != 0 )
680 {
681 req->op->processing();
|
682 mday 1.1
|
683 mday 1.4 Uint32 type = req->getType();
684 if( type == async_messages::HEARTBEAT )
685 handle_heartbeat_request(req);
686 else if (type == async_messages::IOCTL)
687 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
688 else if (type == async_messages::CIMSERVICE_START)
689 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
690 else if (type == async_messages::CIMSERVICE_STOP)
691 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
692 else if (type == async_messages::CIMSERVICE_PAUSE)
693 handle_CimServicePause(static_cast<CimServicePause *>(req));
694 else if (type == async_messages::CIMSERVICE_RESUME)
695 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
696 else if ( type == async_messages::ASYNC_OP_START)
697 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
698 else
699 {
700 // we don't handle this request message
701 _make_response(req, async_results::CIM_NAK );
702 }
|
703 mday 1.1 }
704 }
705
|
706 mday 1.17
707 Boolean MessageQueueService::_enqueueResponse(
708 Message* request,
709 Message* response)
|
710 mday 1.18
|
711 mday 1.17 {
|
712 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
713 "MessageQueueService::_enqueueResponse");
|
714 mday 1.25
715 if( request->getMask() & message_mask::ha_async)
716 {
717 if (response->getMask() & message_mask::ha_async )
718 {
719 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
720 static_cast<AsyncReply *>(response),
721 ASYNC_OPSTATE_COMPLETE, 0 );
|
722 kumpf 1.37 PEG_METHOD_EXIT();
|
723 mday 1.25 return true;
724 }
725 }
726
|
727 mday 1.17 if(request->_async != 0 )
728 {
729 Uint32 mask = request->_async->getMask();
|
730 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
731
732 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
733 AsyncOpNode *op = async->op;
734 request->_async = 0;
|
735 mday 1.63 // the legacy request is going to be deleted by its handler
|
736 mday 1.27 // remove it from the op node
|
737 mday 1.63
738 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
739 mday 1.18
|
740 mday 1.63
|
741 mday 1.18 AsyncLegacyOperationResult *async_result =
742 new AsyncLegacyOperationResult(
743 async->getKey(),
744 async->getRouting(),
745 op,
746 response);
747 _completeAsyncResponse(async,
748 async_result,
749 ASYNC_OPSTATE_COMPLETE,
750 0);
|
751 kumpf 1.37 PEG_METHOD_EXIT();
|
752 mday 1.18 return true;
|
753 mday 1.17 }
|
754 mday 1.18
755 // ensure that the destination queue is in response->dest
|
756 kumpf 1.37 PEG_METHOD_EXIT();
|
757 mday 1.24 return SendForget(response);
|
758 mday 1.18
|
759 mday 1.17 }
760
|
761 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
762 mday 1.1 {
|
763 mday 1.19 cimom::_make_response(req, code);
|
764 mday 1.1 }
765
766
|
767 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
768 AsyncReply *reply,
769 Uint32 state,
770 Uint32 flag)
771 {
|
772 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
773 "MessageQueueService::_completeAsyncResponse");
774
|
775 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
776 kumpf 1.37
777 PEG_METHOD_EXIT();
|
778 mday 1.5 }
779
780
|
781 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
782 Uint32 state,
783 Uint32 flag,
784 Uint32 code)
785 {
786 cimom::_complete_op_node(op, state, flag, code);
787 }
788
|
789 mday 1.5
790 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
791 {
|
792 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
|
793 denise.eckstein 1.88.2.6 return true;
|
794 mday 1.8
|
795 mday 1.20 // ATTN optimization remove the message checking altogether in the base
796 // << Mon Feb 18 14:02:20 2002 mdd >>
|
797 mday 1.6 op->lock();
798 Message *rq = op->_request.next(0);
|
799 mday 1.20 Message *rp = op->_response.next(0);
|
800 mday 1.6 op->unlock();
801
|
802 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
803 _die.value() == 0 )
|
804 mday 1.5 {
|
805 mday 1.6 _incoming.insert_last_wait(op);
|
806 kumpf 1.88.2.7 _polling_sem->signal();
|
807 mday 1.5 return true;
808 }
809 return false;
810 }
811
812 Boolean MessageQueueService::messageOK(const Message *msg)
813 {
|
814 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
815 return false;
|
816 mday 1.18 return true;
817 }
818
|
819 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
820 {
821 // default action is to echo a heartbeat response
822
823 AsyncReply *reply =
824 new AsyncReply(async_messages::HEARTBEAT,
825 req->getKey(),
826 req->getRouting(),
827 0,
828 req->op,
829 async_results::OK,
830 req->resp,
831 false);
|
832 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
833 mday 1.1 }
834
835
836 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
837 {
838 ;
839 }
840
841 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
842 {
|
843 mday 1.8
844 switch( req->ctl )
845 {
846 case AsyncIoctl::IO_CLOSE:
847 {
|
848 mday 1.76
|
849 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
850 kumpf 1.81
851 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
852 mday 1.79 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
853 kumpf 1.81 #endif
|
854 mday 1.8
|
855 mday 1.76 // respond to this message. this is fire and forget, so we don't need to delete anything.
856 // this takes care of two problems that were being found
857 // << Thu Oct 9 10:52:48 2003 mdd >>
858 _make_response(req, async_results::OK);
|
859 mday 1.8 // ensure we do not accept any further messages
860
861 // ensure we don't recurse on IO_CLOSE
862 if( _incoming_queue_shutdown.value() > 0 )
863 break;
864
865 // set the closing flag
866 service->_incoming_queue_shutdown = 1;
867 // empty out the queue
868 while( 1 )
869 {
870 AsyncOpNode *operation;
871 try
872 {
873 operation = service->_incoming.remove_first();
874 }
875 catch(IPCException & )
876 {
877 break;
878 }
879 if( operation )
880 mday 1.8 {
|
881 mday 1.34 operation->_service_ptr = service;
882 service->_handle_incoming_operation(operation);
|
883 mday 1.8 }
884 else
885 break;
886 } // message processing loop
|
887 mday 1.76
|
888 mday 1.8 // shutdown the AsyncDQueue
889 service->_incoming.shutdown_queue();
890 return;
891 }
892
893 default:
894 _make_response(req, async_results::CIM_NAK);
895 }
|
896 mday 1.1 }
|
897 mday 1.8
|
898 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
899 {
|
900 mday 1.79
|
901 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
902 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
903 kumpf 1.81 #endif
|
904 mday 1.79
|
905 mday 1.10 // clear the stoped bit and update
|
906 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
907 mday 1.10 _make_response(req, async_results::OK);
908 // now tell the meta dispatcher we are stopped
909 update_service(_capabilities, _mask);
910
|
911 mday 1.1 }
912 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
913 {
|
914 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
915 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
916 kumpf 1.81 #endif
|
917 mday 1.10 // set the stopeed bit and update
918 _capabilities |= module_capabilities::stopped;
919 _make_response(req, async_results::CIM_STOPPED);
920 // now tell the meta dispatcher we are stopped
921 update_service(_capabilities, _mask);
922
|
923 mday 1.1 }
924 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
925 {
|
926 mday 1.10 // set the paused bit and update
|
927 mday 1.13 _capabilities |= module_capabilities::paused;
|
928 mday 1.11 update_service(_capabilities, _mask);
|
929 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
930 // now tell the meta dispatcher we are stopped
|
931 mday 1.1 }
932 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
933 {
|
934 mday 1.10 // clear the paused bit and update
|
935 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
936 mday 1.11 update_service(_capabilities, _mask);
|
937 mday 1.10 _make_response(req, async_results::OK);
938 // now tell the meta dispatcher we are stopped
|
939 mday 1.1 }
940
941 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
942 {
943 _make_response(req, async_results::CIM_NAK);
944 }
945
946 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
947 {
|
948 mday 1.14 ;
949 }
950
|
951 mday 1.10
|
952 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
953 {
954 // remove the legacy message from the request and enqueue it to its destination
955 Uint32 result = async_results::CIM_NAK;
956
|
957 mday 1.25 Message *legacy = req->_act;
|
958 mday 1.14 if ( legacy != 0 )
959 {
|
960 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
961 mday 1.14 if( queue != 0 )
962 {
|
963 mday 1.25 if(queue->isAsync() == true )
964 {
965 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
966 }
967 else
968 {
969 // Enqueue the response:
970 queue->enqueue(req->get_action());
971 }
972
|
973 mday 1.14 result = async_results::OK;
974 }
975 }
976 _make_response(req, result);
977 }
978
979 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
980 {
981 ;
|
982 mday 1.1 }
983
984 AsyncOpNode *MessageQueueService::get_op(void)
985 {
|
986 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
987 mday 1.1
|
988 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
989 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
990 mday 1.4
|
991 mday 1.1 return op;
992 }
993
994 void MessageQueueService::return_op(AsyncOpNode *op)
995 {
996 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
997 mday 1.4 delete op;
|
998 mday 1.1 }
999
|
1000 mday 1.18
|
1001 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
1002 Uint32 destination)
1003 {
1004 PEGASUS_ASSERT(op != 0 );
|
1005 mday 1.30 op->lock();
1006 op->_op_dest = MessageQueue::lookup(destination);
|
1007 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
1008 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
1009 mday 1.30 op->unlock();
1010 if ( op->_op_dest == 0 )
1011 return false;
1012
|
1013 mday 1.29 return _meta_dispatcher->route_async(op);
1014 }
1015
|
1016 mday 1.39
|
1017 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
1018 Uint32 destination,
|
1019 mday 1.18 void (*callback)(AsyncOpNode *,
|
1020 mday 1.32 MessageQueue *,
|
1021 mday 1.30 void *),
1022 MessageQueue *callback_response_q,
1023 void *callback_ptr)
|
1024 mday 1.20 {
|
1025 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
1026 mday 1.18
|
1027 mday 1.21 // get the queue handle for the destination
1028
|
1029 mday 1.30 op->lock();
|
1030 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
1031 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
1032 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
1033 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
1034 mday 1.30 // initialize the callback data
|
1035 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
1036 op->_callback_node = op; // the op node
1037 op->_callback_response_q = callback_response_q; // the queue that will receive the response
1038 op->_callback_ptr = callback_ptr; // user data for callback
1039 op->_callback_request_q = this; // I am the originator of this request
|
1040 mday 1.30
1041 op->unlock();
1042 if(op->_op_dest == 0)
1043 return false;
|
1044 mday 1.21
1045 return _meta_dispatcher->route_async(op);
|
1046 mday 1.18 }
1047
1048
|
1049 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
1050 Uint32 destination,
1051 void (*callback)(Message *response,
1052 void *handle,
1053 void *parameter),
1054 void *handle,
1055 void *parameter)
1056 {
1057 if(msg == NULL)
1058 return false;
1059 if(callback == NULL)
1060 return SendForget(msg);
1061 AsyncOpNode *op = get_op();
|
1062 mday 1.43 msg->dest = destination;
|
1063 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
1064 {
1065 op->release();
1066 return_op(op);
1067 return false;
1068 }
1069 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
1070 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
1071 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1072 op->__async_callback = callback;
1073 op->_callback_node = op;
1074 op->_callback_handle = handle;
1075 op->_callback_parameter = parameter;
1076 op->_callback_response_q = this;
1077
1078
1079 if( ! (msg->getMask() & message_mask::ha_async) )
1080 {
1081 AsyncLegacyOperationStart *wrapper =
1082 new AsyncLegacyOperationStart(get_next_xid(),
1083 op,
1084 mday 1.39 destination,
1085 msg,
1086 destination);
1087 }
1088 else
1089 {
1090 op->_request.insert_first(msg);
1091 (static_cast<AsyncMessage *>(msg))->op = op;
1092 }
1093
1094 _callback.insert_last(op);
1095 return _meta_dispatcher->route_async(op);
1096 }
1097
1098
|
1099 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
1100 {
1101
|
1102 mday 1.24
|
1103 mday 1.18 AsyncOpNode *op = 0;
|
1104 mday 1.22 Uint32 mask = msg->getMask();
1105
1106 if (mask & message_mask::ha_async)
|
1107 mday 1.18 {
1108 op = (static_cast<AsyncMessage *>(msg))->op ;
1109 }
|
1110 mday 1.22
|
1111 mday 1.18 if( op == 0 )
|
1112 mday 1.20 {
|
1113 mday 1.18 op = get_op();
|
1114 mday 1.20 op->_request.insert_first(msg);
|
1115 mday 1.22 if (mask & message_mask::ha_async)
1116 (static_cast<AsyncMessage *>(msg))->op = op;
|
1117 mday 1.20 }
|
1118 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
1119 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
1120 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1121 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
1122 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
1123 mday 1.30 if ( op->_op_dest == 0 )
|
1124 mday 1.39 {
1125 op->release();
1126 return_op(op);
|
1127 mday 1.21 return false;
|
1128 mday 1.39 }
|
1129 mday 1.24
|
1130 mday 1.18 // now see if the meta dispatcher will take it
|
1131 mday 1.30 return _meta_dispatcher->route_async(op);
|
1132 mday 1.18 }
|
1133 mday 1.2
|
1134 mday 1.1
|
1135 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
1136 mday 1.1 {
|
1137 mday 1.4 if ( request == 0 )
1138 return 0 ;
|
1139 mday 1.5
1140 Boolean destroy_op = false;
1141
|
1142 s.hills 1.80 if (request->op == 0)
|
1143 mday 1.5 {
1144 request->op = get_op();
|
1145 mday 1.7 request->op->_request.insert_first(request);
|
1146 mday 1.5 destroy_op = true;
1147 }
|
1148 mday 1.4
|
1149 mday 1.33 request->block = false;
|
1150 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
1151 mday 1.33 SendAsync(request->op,
1152 request->dest,
|
1153 mday 1.36 _sendwait_callback,
|
1154 mday 1.33 this,
1155 (void *)0);
|
1156 mday 1.4
|
1157 mday 1.33 request->op->_client_sem.wait();
|
1158 mday 1.6 request->op->lock();
1159 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
1160 rpl->op = 0;
1161 request->op->unlock();
1162
|
1163 mday 1.5 if( destroy_op == true)
1164 {
|
1165 mday 1.6 request->op->lock();
1166 request->op->_request.remove(request);
1167 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1168 request->op->unlock();
1169 return_op(request->op);
|
1170 mday 1.7 request->op = 0;
|
1171 mday 1.5 }
1172 return rpl;
|
1173 mday 1.1 }
1174
1175
1176 Boolean MessageQueueService::register_service(String name,
1177 Uint32 capabilities,
1178 Uint32 mask)
1179
1180 {
1181 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
1182 mday 1.5 0,
|
1183 mday 1.1 true,
1184 name,
1185 capabilities,
1186 mask,
1187 _queueId);
|
1188 mday 1.44 msg->dest = CIMOM_Q_ID;
1189
|
1190 mday 1.1 Boolean registered = false;
|
1191 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
1192 mday 1.1
|
1193 mday 1.2 if ( reply != 0 )
|
1194 mday 1.1 {
1195 if(reply->getMask() & message_mask:: ha_async)
1196 {
1197 if(reply->getMask() & message_mask::ha_reply)
1198 {
|
1199 mday 1.15 if(reply->result == async_results::OK ||
1200 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
1201 mday 1.1 registered = true;
1202 }
1203 }
1204
|
1205 mday 1.7 delete reply;
|
1206 mday 1.1 }
|
1207 mday 1.5 delete msg;
|
1208 mday 1.1 return registered;
1209 }
1210
1211 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1212 {
1213
1214
1215 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
1216 mday 1.5 0,
|
1217 mday 1.1 true,
1218 _queueId,
1219 _capabilities,
1220 _mask);
1221 Boolean registered = false;
|
1222 mday 1.2
1223 AsyncMessage *reply = SendWait(msg);
1224 if (reply)
|
1225 mday 1.1 {
1226 if(reply->getMask() & message_mask:: ha_async)
1227 {
1228 if(reply->getMask() & message_mask::ha_reply)
1229 {
|
1230 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
1231 mday 1.1 registered = true;
1232 }
1233 }
1234 delete reply;
1235 }
|
1236 mday 1.5 delete msg;
|
1237 mday 1.1 return registered;
1238 }
1239
1240
1241 Boolean MessageQueueService::deregister_service(void)
1242 {
|
1243 mday 1.3
|
1244 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1245 return true;
|
1246 mday 1.1 }
1247
1248
1249 void MessageQueueService::find_services(String name,
1250 Uint32 capabilities,
1251 Uint32 mask,
1252 Array<Uint32> *results)
1253 {
1254
1255 if( results == 0 )
1256 throw NullPointer();
|
1257 mday 1.5
|
1258 mday 1.1 results->clear();
1259
1260 FindServiceQueue *req =
1261 new FindServiceQueue(get_next_xid(),
|
1262 mday 1.5 0,
|
1263 mday 1.1 _queueId,
1264 true,
1265 name,
1266 capabilities,
1267 mask);
|
1268 mday 1.44
1269 req->dest = CIMOM_Q_ID;
|
1270 mday 1.1
|
1271 mday 1.2 AsyncMessage *reply = SendWait(req);
1272 if(reply)
|
1273 mday 1.1 {
1274 if( reply->getMask() & message_mask::ha_async)
1275 {
1276 if(reply->getMask() & message_mask::ha_reply)
1277 {
1278 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1279 {
1280 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1281 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1282 }
1283 }
1284 }
1285 delete reply;
1286 }
|
1287 mday 1.5 delete req;
|
1288 mday 1.1 return ;
1289 }
1290
1291 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1292 {
1293 if(result == 0)
1294 throw NullPointer();
1295
1296 EnumerateService *req
1297 = new EnumerateService(get_next_xid(),
|
1298 mday 1.5 0,
|
1299 mday 1.1 _queueId,
1300 true,
1301 queue);
1302
|
1303 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1304 mday 1.1
|
1305 mday 1.2 if (reply)
|
1306 mday 1.1 {
1307 Boolean found = false;
1308
1309 if( reply->getMask() & message_mask::ha_async)
1310 {
1311 if(reply->getMask() & message_mask::ha_reply)
1312 {
1313 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1314 {
1315 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1316 {
1317 if( found == false)
1318 {
1319 found = true;
1320
1321 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1322 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1323 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1324 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1325 }
1326 }
1327 mday 1.1 }
1328 }
1329 }
1330 delete reply;
1331 }
|
1332 mday 1.5 delete req;
1333
|
1334 mday 1.1 return;
1335 }
1336
1337 Uint32 MessageQueueService::get_next_xid(void)
1338 {
|
1339 mday 1.78 static Mutex _monitor;
1340 Uint32 value;
|
1341 a.arora 1.87 AutoMutex autoMut(_monitor);
|
1342 mday 1.1 _xid++;
|
1343 mday 1.78 value = _xid.value();
1344 return value;
1345
|
1346 mday 1.1 }
1347
1348 PEGASUS_NAMESPACE_END
|