1 karl 1.119 //%2006////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mday 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.82 //
|
21 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "MessageQueueService.h"
|
35 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
|
37 mday 1.1
38 PEGASUS_NAMESPACE_BEGIN
39
|
40 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
41 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
42 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
43 mday 1.15
|
44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
45 mday 1.53
|
46 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
47 mday 1.53
|
48 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
49 Mutex MessageQueueService::_polling_list_mutex;
|
50 mday 1.53
|
51 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
52 mday 1.61
|
53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
54 mday 1.69 {
55 return _thread_pool;
56 }
|
57 kumpf 1.117
|
58 jim.wunderlich 1.110 //
|
59 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
60 jim.wunderlich 1.110 //
61 // JR Wunderlich Jun 6, 2005
62 //
63
|
64 kumpf 1.131 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
66 jim.wunderlich 1.110
|
67 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
68 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
69 #endif
70
|
71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
72 mday 1.69
|
73 kumpf 1.131 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
74 void* parm)
|
75 mday 1.53 {
|
76 kumpf 1.131 Thread *myself = reinterpret_cast<Thread *>(parm);
77 List<MessageQueueService, Mutex> *list =
78 reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
79
80 while (_stop_polling.get() == 0)
81 {
82 _polling_sem.wait();
83
84 if (_stop_polling.get() != 0)
85 {
86 break;
87 }
88
89 // The polling_routine thread must hold the lock on the
90 // _polling_list while processing incoming messages.
91 // This lock is used to give this thread ownership of
92 // services on the _polling_routine list.
93
94 // This is necessary to avoid confict with other threads
95 // processing the _polling_list
96 // (e.g., MessageQueueServer::~MessageQueueService).
97 kumpf 1.131
98 list->lock();
99 MessageQueueService *service = list->front();
100 ThreadStatus rtn = PEGASUS_THREAD_OK;
101 while (service != NULL)
102 {
103 if ((service->_incoming.count() > 0) &&
104 (service->_die.get() == 0) &&
105 (service->_threads.get() < max_threads_per_svc_queue))
106 {
107 // The _threads count is used to track the
108 // number of active threads that have been allocated
109 // to process messages for this service.
110
111 // The _threads count MUST be incremented while
112 // the polling_routine owns the _polling_thread
113 // lock and has ownership of the service object.
114
115 service->_threads++;
116 try
117 {
118 kumpf 1.131 rtn = _thread_pool->allocate_and_awaken(
119 service, _req_proc, &_polling_sem);
120 }
121 catch (...)
122 {
123 service->_threads--;
124
125 // allocate_and_awaken should never generate an exception.
126 PEGASUS_ASSERT(0);
127 }
128 // if no more threads available, break from processing loop
129 if (rtn != PEGASUS_THREAD_OK )
130 {
131 service->_threads--;
|
132 marek 1.137 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
|
133 kumpf 1.131 "Could not allocate thread for %s. Queue has %d "
134 "messages waiting and %d threads servicing."
135 "Skipping the service for right now. ",
136 service->getQueueName(),
137 service->_incoming.count(),
|
138 marek 1.132 service->_threads.get()));
|
139 kumpf 1.131
140 Threads::yield();
141 service = NULL;
142 }
143 }
144 if (service != NULL)
145 {
146 service = list->next_of(service);
147 }
148 }
149 list->unlock();
150 }
|
151 kumpf 1.135 return ThreadReturnType(0);
|
152 mday 1.53 }
153
154
155 Semaphore MessageQueueService::_polling_sem(0);
156 AtomicInt MessageQueueService::_stop_polling(0);
157
|
158 mday 1.15
|
159 kumpf 1.104 MessageQueueService::MessageQueueService(
|
160 kumpf 1.131 const char* name,
161 Uint32 queueID,
162 Uint32 capabilities,
163 Uint32 mask)
164 : Base(name, true, queueID),
165 _mask(mask),
166 _die(0),
167 _threads(0),
168 _incoming(),
169 _incoming_queue_shutdown(0)
170 {
171 _capabilities = (capabilities | module_capabilities::async);
172
173 _default_op_timeout.tv_sec = 30;
174 _default_op_timeout.tv_usec = 100;
175
176 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
177
178 // if requested thread max is out of range, then set to
179 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
180
181 kumpf 1.131 if ((max_threads_per_svc_queue < 1) ||
182 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
183 {
184 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
185 }
186
|
187 marek 1.132 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
188 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
|
189 kumpf 1.131
190 AutoMutex autoMut(_meta_dispatcher_mutex);
191
192 if (_meta_dispatcher == 0)
193 {
194 _stop_polling = 0;
195 PEGASUS_ASSERT(_service_count.get() == 0);
196 _meta_dispatcher = new cimom();
197
198 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
199 // minimum_cnt, maximum_cnt, deallocateWait);
200 //
201 _thread_pool =
202 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
203 }
204 _service_count++;
205
206 if (false == register_service(name, _capabilities, _mask))
207 {
208 MessageLoaderParms parms(
209 "Common.MessageQueueService.UNABLE_TO_REGISTER",
210 kumpf 1.131 "CIM base message queue service is unable to register with the "
211 "CIMOM dispatcher.");
212 throw BindFailedException(parms);
213 }
|
214 kumpf 1.104
|
215 kumpf 1.131 _get_polling_list()->insert_back(this);
|
216 mday 1.1 }
217
|
218 mday 1.4
|
219 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
220 mday 1.1 {
|
221 kumpf 1.131 _die = 1;
222
223 // The polling_routine locks the _polling_list while
224 // processing the incoming messages for services on the
225 // list. Deleting the service from the _polling_list
226 // prior to processing, avoids synchronization issues
227 // with the _polling_routine.
228
229 // ATTN: added to prevent assertion in List in which the list does not
230 // contain this element.
231
232 if (_get_polling_list()->contains(this))
233 _get_polling_list()->remove(this);
234
235 // ATTN: The code for closing the _incoming queue
236 // is not working correctly. In OpenPegasus 2.5,
237 // execution of the following code is very timing
238 // dependent. This needs to be fix.
239 // See Bug 4079 for details.
240 if (_incoming_queue_shutdown.get() == 0)
241 {
242 kumpf 1.131 _shutdown_incoming_queue();
243 }
|
244 mday 1.76
|
245 kumpf 1.131 // Wait until all threads processing the messages
246 // for this service have completed.
247
248 while (_threads.get() > 0)
|
249 konrad.r 1.109 {
|
250 kumpf 1.131 Threads::yield();
251 }
252
253 {
254 AutoMutex autoMut(_meta_dispatcher_mutex);
255 _service_count--;
256 if (_service_count.get() == 0)
257 {
258
259 _stop_polling++;
260 _polling_sem.signal();
261 if (_polling_thread)
262 {
263 _polling_thread->join();
264 delete _polling_thread;
265 _polling_thread = 0;
266 }
267 _meta_dispatcher->_shutdown_routed_queue();
268 delete _meta_dispatcher;
269 _meta_dispatcher = 0;
270
271 kumpf 1.131 delete _thread_pool;
272 _thread_pool = 0;
273 }
274 } // mutex unlocks here
275 // Clean up in case there are extra stuff on the queue.
276 while (_incoming.count())
277 {
278 try
279 {
280 delete _incoming.dequeue();
281 }
282 catch (const ListClosed&)
283 {
284 // If the list is closed, there is nothing we can do.
285 break;
286 }
|
287 konrad.r 1.109 }
|
288 kumpf 1.104 }
|
289 mday 1.1
|
290 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
291 mday 1.7 {
|
292 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
293 return;
294
295 AsyncIoctl *msg = new AsyncIoctl(
296 0,
297 _queueId,
298 _queueId,
299 true,
300 AsyncIoctl::IO_CLOSE,
301 0,
302 0);
303
304 msg->op = get_op();
305 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
306 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
307 | ASYNC_OPFLAGS_SIMPLE_STATUS);
308 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
309 kumpf 1.104
|
310 kumpf 1.131 msg->op->_op_dest = this;
311 msg->op->_request.reset(msg);
312 try
313 {
314 _incoming.enqueue_wait(msg->op);
315 _polling_sem.signal();
316 }
317 catch (const ListClosed&)
318 {
|
319 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
320 kumpf 1.131 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
321 // processed.
322 delete msg;
323 }
324 catch (const Permission&)
325 {
326 delete msg;
327 }
|
328 mday 1.7 }
329
|
330 mday 1.22
331
|
332 kumpf 1.131 void MessageQueueService::enqueue(Message* msg)
|
333 mday 1.22 {
|
334 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
|
335 kumpf 1.28
|
336 kumpf 1.131 Base::enqueue(msg);
|
337 kumpf 1.28
|
338 kumpf 1.131 PEG_METHOD_EXIT();
|
339 mday 1.22 }
340
341
|
342 mike 1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
|
343 kumpf 1.131 void* parm)
|
344 kumpf 1.103 {
|
345 konrad.r 1.107 MessageQueueService* service =
|
346 kumpf 1.131 reinterpret_cast<MessageQueueService*>(parm);
|
347 konrad.r 1.107 PEGASUS_ASSERT(service != 0);
|
348 kumpf 1.103 try
349 {
|
350 mike 1.118 if (service->_die.get() != 0)
|
351 kumpf 1.103 {
|
352 denise.eckstein 1.116 service->_threads--;
|
353 kumpf 1.131 return 0;
|
354 kumpf 1.103 }
355 // pull messages off the incoming queue and dispatch them. then
356 // check pending messages that are non-blocking
357 AsyncOpNode *operation = 0;
358
359 // many operations may have been queued.
360 do
361 {
362 try
363 {
|
364 mike 1.120 operation = service->_incoming.dequeue();
|
365 kumpf 1.103 }
|
366 kumpf 1.131 catch (ListClosed&)
|
367 kumpf 1.103 {
|
368 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
|
369 marek 1.132 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
370 kumpf 1.104 // "Caught ListClosed exception. Exiting _req_proc.");
|
371 kumpf 1.103 break;
372 }
373
374 if (operation)
375 {
376 operation->_service_ptr = service;
377 service->_handle_incoming_operation(operation);
378 }
379 } while (operation);
380 }
381 catch (const Exception& e)
382 {
383 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
384 String("Caught exception: \"") + e.getMessage() +
385 "\". Exiting _req_proc.");
386 }
387 catch (...)
388 {
|
389 marek 1.132 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
390 kumpf 1.103 "Caught unrecognized exception. Exiting _req_proc.");
391 }
|
392 konrad.r 1.107 service->_threads--;
|
393 kumpf 1.131 return 0;
|
394 mday 1.1 }
395
|
396 mday 1.43
|
397 kumpf 1.104 void MessageQueueService::_sendwait_callback(
|
398 kumpf 1.131 AsyncOpNode* op,
399 MessageQueue* q,
|
400 kumpf 1.104 void *parm)
|
401 mday 1.33 {
|
402 kumpf 1.131 op->_client_sem.signal();
|
403 mday 1.33 }
404
|
405 mday 1.30
406 // callback function is responsible for cleaning up all resources
407 // including op, op->_callback_node, and op->_callback_ptr
|
408 kumpf 1.131 void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
|
409 mday 1.22 {
|
410 kumpf 1.131 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
411 {
412 Message *msg = op->removeRequest();
413 if (msg && (msg->getMask() & MessageMask::ha_async))
414 {
|
415 kumpf 1.133 if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)
|
416 kumpf 1.131 {
417 AsyncLegacyOperationStart *wrapper =
418 static_cast<AsyncLegacyOperationStart *>(msg);
419 msg = wrapper->get_action();
420 delete wrapper;
421 }
|
422 kumpf 1.133 else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)
|
423 kumpf 1.131 {
424 AsyncModuleOperationStart *wrapper =
425 static_cast<AsyncModuleOperationStart *>(msg);
426 msg = wrapper->get_action();
427 delete wrapper;
428 }
|
429 kumpf 1.133 else if (msg->getType() == ASYNC_ASYNC_OP_START)
|
430 kumpf 1.131 {
431 AsyncOperationStart *wrapper =
432 static_cast<AsyncOperationStart *>(msg);
433 msg = wrapper->get_action();
434 delete wrapper;
435 }
436 delete msg;
437 }
|
438 mday 1.39
|
439 kumpf 1.131 msg = op->removeResponse();
440 if (msg && (msg->getMask() & MessageMask::ha_async))
441 {
|
442 kumpf 1.133 if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)
|
443 kumpf 1.131 {
444 AsyncLegacyOperationResult *wrapper =
445 static_cast<AsyncLegacyOperationResult *>(msg);
446 msg = wrapper->get_result();
447 delete wrapper;
448 }
|
449 kumpf 1.133 else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)
|
450 kumpf 1.131 {
451 AsyncModuleOperationResult *wrapper =
452 static_cast<AsyncModuleOperationResult *>(msg);
453 msg = wrapper->get_result();
454 delete wrapper;
455 }
456 }
457 void (*callback)(Message *, void *, void *) = op->__async_callback;
458 void *handle = op->_callback_handle;
459 void *parm = op->_callback_parameter;
460 op->release();
461 return_op(op);
462 callback(msg, handle, parm);
463 }
464 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
465 {
466 // note that _callback_node may be different from op
467 // op->_callback_response_q is a "this" pointer we can use for
468 // static callback methods
469 op->_async_callback(
470 op->_callback_node, op->_callback_response_q, op->_callback_ptr);
471 kumpf 1.131 }
|
472 mday 1.22 }
473
|
474 mday 1.1
|
475 kumpf 1.131 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
|
476 mday 1.1 {
|
477 kumpf 1.131 if (operation != 0)
478 {
|
479 kumpf 1.104
480 // ATTN: optimization
|
481 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
482 kumpf 1.131 operation->lock();
|
483 kumpf 1.104
|
484 kumpf 1.131 Message *rq = operation->_request.get();
|
485 kumpf 1.104
|
486 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
487 kumpf 1.104 // move this to the bottom of the loop when the majority of
488 // messages become async messages.
|
489 mday 1.31
|
490 kumpf 1.131 // divert legacy messages to handleEnqueue
491 if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
492 {
493 operation->_request.release();
494 operation->unlock();
495 // delete the op node
496 operation->release();
497 return_op(operation);
498
499 handleEnqueue(rq);
500 return;
501 }
502
503 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
504 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
505 (operation->_state & ASYNC_OPSTATE_COMPLETE))
506 {
507 operation->unlock();
508 _handle_async_callback(operation);
509 }
510 else
511 kumpf 1.131 {
512 PEGASUS_ASSERT(rq != 0);
513 operation->unlock();
514 _handle_async_request(static_cast<AsyncRequest *>(rq));
515 }
516 }
517 return;
|
518 mday 1.1 }
519
520 void MessageQueueService::_handle_async_request(AsyncRequest *req)
521 {
|
522 kumpf 1.131 if (req != 0)
523 {
524 req->op->processing();
525
|
526 kumpf 1.133 MessageType type = req->getType();
527 if (type == ASYNC_HEARTBEAT)
|
528 kumpf 1.131 handle_heartbeat_request(req);
|
529 kumpf 1.133 else if (type == ASYNC_IOCTL)
|
530 kumpf 1.131 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
|
531 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_START)
|
532 kumpf 1.131 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
|
533 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_STOP)
|
534 kumpf 1.131 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
|
535 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_PAUSE)
|
536 kumpf 1.131 handle_CimServicePause(static_cast<CimServicePause *>(req));
|
537 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_RESUME)
|
538 kumpf 1.131 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
|
539 kumpf 1.133 else if (type == ASYNC_ASYNC_OP_START)
|
540 kumpf 1.131 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
541 else
542 {
543 // we don't handle this request message
544 _make_response(req, async_results::CIM_NAK);
545 }
546 }
|
547 mday 1.1 }
548
|
549 mday 1.17
550 Boolean MessageQueueService::_enqueueResponse(
|
551 kumpf 1.131 Message* request,
552 Message* response)
|
553 mday 1.17 {
|
554 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
555 "MessageQueueService::_enqueueResponse");
|
556 mday 1.25
|
557 kumpf 1.131 if (request->getMask() & MessageMask::ha_async)
558 {
559 if (response->getMask() & MessageMask::ha_async)
560 {
561 _completeAsyncResponse(
562 static_cast<AsyncRequest *>(request),
563 static_cast<AsyncReply *>(response),
564 ASYNC_OPSTATE_COMPLETE, 0);
565 PEG_METHOD_EXIT();
566 return true;
567 }
568 }
|
569 mday 1.63
|
570 kumpf 1.134 AsyncRequest* asyncRequest =
571 static_cast<AsyncRequest*>(request->get_async());
572
573 if (asyncRequest != 0)
|
574 kumpf 1.131 {
|
575 kumpf 1.134 PEGASUS_ASSERT(asyncRequest->getMask() &
|
576 kumpf 1.131 (MessageMask::ha_async | MessageMask::ha_request));
577
|
578 kumpf 1.134 AsyncOpNode* op = asyncRequest->op;
579
|
580 kumpf 1.131 // the legacy request is going to be deleted by its handler
581 // remove it from the op node
582
|
583 kumpf 1.134 static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action();
|
584 kumpf 1.131
585 AsyncLegacyOperationResult *async_result =
586 new AsyncLegacyOperationResult(
587 op,
588 response);
589 _completeAsyncResponse(
|
590 kumpf 1.134 asyncRequest,
|
591 kumpf 1.131 async_result,
592 ASYNC_OPSTATE_COMPLETE,
593 0);
594 PEG_METHOD_EXIT();
595 return true;
596 }
|
597 kumpf 1.104
|
598 kumpf 1.131 // ensure that the destination queue is in response->dest
599 PEG_METHOD_EXIT();
600 return SendForget(response);
|
601 mday 1.17 }
602
|
603 kumpf 1.131 void MessageQueueService::_make_response(Message* req, Uint32 code)
|
604 mday 1.1 {
|
605 kumpf 1.131 cimom::_make_response(req, code);
|
606 mday 1.1 }
607
608
|
609 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
|
610 kumpf 1.131 AsyncRequest* request,
611 AsyncReply* reply,
|
612 kumpf 1.104 Uint32 state,
613 Uint32 flag)
|
614 mday 1.5 {
|
615 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
616 "MessageQueueService::_completeAsyncResponse");
|
617 kumpf 1.37
|
618 kumpf 1.131 cimom::_completeAsyncResponse(request, reply, state, flag);
|
619 kumpf 1.37
|
620 kumpf 1.131 PEG_METHOD_EXIT();
|
621 mday 1.5 }
622
623
|
624 kumpf 1.104 void MessageQueueService::_complete_op_node(
|
625 kumpf 1.131 AsyncOpNode* op,
|
626 kumpf 1.104 Uint32 state,
627 Uint32 flag,
628 Uint32 code)
|
629 mday 1.32 {
|
630 kumpf 1.131 cimom::_complete_op_node(op, state, flag, code);
|
631 mday 1.32 }
632
|
633 mday 1.5
|
634 kumpf 1.131 Boolean MessageQueueService::accept_async(AsyncOpNode* op)
|
635 mday 1.5 {
|
636 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
637 return false;
638 if (_polling_thread == NULL)
639 {
640 _polling_thread = new Thread(
641 polling_routine,
642 reinterpret_cast<void *>(_get_polling_list()),
643 false);
644 ThreadStatus tr = PEGASUS_THREAD_OK;
645 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
646 {
647 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
648 Threads::yield();
649 else
650 throw Exception(MessageLoaderParms(
651 "Common.MessageQueueService.NOT_ENOUGH_THREAD",
652 "Could not allocate thread for the polling thread."));
653 }
654 }
|
655 kumpf 1.104 // ATTN optimization remove the message checking altogether in the base
|
656 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
657 kumpf 1.131 op->lock();
658 Message *rq = op->_request.get();
659 Message *rp = op->_response.get();
660 op->unlock();
|
661 kumpf 1.104
|
662 kumpf 1.131 if ((rq != 0 && (true == messageOK(rq))) ||
663 (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
664 {
665 _incoming.enqueue_wait(op);
666 _polling_sem.signal();
667 return true;
668 }
669 return false;
|
670 mday 1.5 }
671
|
672 kumpf 1.131 Boolean MessageQueueService::messageOK(const Message* msg)
|
673 mday 1.5 {
|
674 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
675 return false;
676 return true;
|
677 mday 1.18 }
678
|
679 kumpf 1.131 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)
|
680 mday 1.1 {
|
681 kumpf 1.131 // default action is to echo a heartbeat response
|
682 kumpf 1.104
|
683 kumpf 1.131 AsyncReply *reply = new AsyncReply(
|
684 kumpf 1.133 ASYNC_HEARTBEAT,
|
685 kumpf 1.131 0,
686 req->op,
687 async_results::OK,
688 req->resp,
689 false);
690 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
691 mday 1.1 }
692
693
|
694 kumpf 1.131 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)
|
695 kumpf 1.104 {
|
696 mday 1.1 }
|
697 kumpf 1.104
|
698 kumpf 1.131 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
|
699 mday 1.1 {
|
700 kumpf 1.131 switch (req->ctl)
701 {
702 case AsyncIoctl::IO_CLOSE:
703 {
704 MessageQueueService *service =
705 static_cast<MessageQueueService *>(req->op->_service_ptr);
|
706 kumpf 1.81
707 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
708 kumpf 1.131 PEGASUS_STD(cout) << service->getQueueName() <<
709 " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
710 kumpf 1.81 #endif
|
711 kumpf 1.104
|
712 kumpf 1.131 // respond to this message. this is fire and forget, so we
713 // don't need to delete anything.
714 // this takes care of two problems that were being found
715 // << Thu Oct 9 10:52:48 2003 mdd >>
716 _make_response(req, async_results::OK);
717 // ensure we do not accept any further messages
|
718 kumpf 1.104
|
719 kumpf 1.131 // ensure we don't recurse on IO_CLOSE
720 if (_incoming_queue_shutdown.get() > 0)
721 break;
|
722 kumpf 1.104
|
723 kumpf 1.131 // set the closing flag
724 service->_incoming_queue_shutdown = 1;
725 // empty out the queue
726 while (1)
|
727 kumpf 1.104 {
|
728 kumpf 1.131 AsyncOpNode *operation;
729 try
730 {
731 operation = service->_incoming.dequeue();
732 }
733 catch (IPCException&)
734 {
735 break;
736 }
737 if (operation)
738 {
739 operation->_service_ptr = service;
740 service->_handle_incoming_operation(operation);
741 }
742 else
743 break;
744 } // message processing loop
745
746 // shutdown the AsyncQueue
747 service->_incoming.close();
748 return;
749 kumpf 1.131 }
|
750 kumpf 1.104
|
751 kumpf 1.131 default:
752 _make_response(req, async_results::CIM_NAK);
753 }
|
754 mday 1.1 }
|
755 mday 1.8
|
756 kumpf 1.131 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
|
757 mday 1.1 {
|
758 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
759 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received START" <<
760 PEGASUS_STD(endl);
|
761 kumpf 1.81 #endif
|
762 kumpf 1.104
|
763 kumpf 1.131 // clear the stoped bit and update
764 _capabilities &= (~(module_capabilities::stopped));
765 _make_response(req, async_results::OK);
766 // now tell the meta dispatcher we are stopped
767 update_service(_capabilities, _mask);
768 }
|
769 mday 1.10
|
770 kumpf 1.131 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
|
771 mday 1.1 {
|
772 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
773 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
774 kumpf 1.81 #endif
|
775 kumpf 1.131 // set the stopeed bit and update
776 _capabilities |= module_capabilities::stopped;
777 _make_response(req, async_results::CIM_STOPPED);
778 // now tell the meta dispatcher we are stopped
779 update_service(_capabilities, _mask);
|
780 mday 1.1 }
|
781 kumpf 1.104
|
782 kumpf 1.131 void MessageQueueService::handle_CimServicePause(CimServicePause* req)
|
783 mday 1.1 {
|
784 kumpf 1.131 // set the paused bit and update
785 _capabilities |= module_capabilities::paused;
786 update_service(_capabilities, _mask);
787 _make_response(req, async_results::CIM_PAUSED);
788 // now tell the meta dispatcher we are stopped
|
789 mday 1.1 }
|
790 kumpf 1.104
|
791 kumpf 1.131 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)
|
792 mday 1.1 {
|
793 kumpf 1.131 // clear the paused bit and update
794 _capabilities &= (~(module_capabilities::paused));
795 update_service(_capabilities, _mask);
796 _make_response(req, async_results::OK);
797 // now tell the meta dispatcher we are stopped
|
798 mday 1.1 }
|
799 kumpf 1.104
|
800 kumpf 1.131 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)
|
801 mday 1.1 {
|
802 kumpf 1.131 _make_response(req, async_results::CIM_NAK);
|
803 mday 1.1 }
804
|
805 kumpf 1.131 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)
|
806 mday 1.1 {
|
807 mday 1.14 }
808
|
809 mday 1.10
|
810 kumpf 1.131 void MessageQueueService::handle_AsyncLegacyOperationStart(
811 AsyncLegacyOperationStart* req)
|
812 mday 1.14 {
|
813 kumpf 1.131 // remove the legacy message from the request and enqueue it to its
814 // destination
815 Uint32 result = async_results::CIM_NAK;
|
816 kumpf 1.104
|
817 kumpf 1.131 Message* legacy = req->_act;
818 if (legacy != 0)
819 {
820 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
821 if (queue != 0)
822 {
823 if (queue->isAsync() == true)
824 {
825 (static_cast<MessageQueueService *>(queue))->handleEnqueue(
826 legacy);
827 }
828 else
829 {
830 // Enqueue the response:
831 queue->enqueue(req->get_action());
832 }
|
833 kumpf 1.104
|
834 kumpf 1.131 result = async_results::OK;
835 }
836 }
837 _make_response(req, result);
|
838 mday 1.14 }
839
|
840 kumpf 1.131 void MessageQueueService::handle_AsyncLegacyOperationResult(
841 AsyncLegacyOperationResult* rep)
|
842 mday 1.14 {
|
843 mday 1.1 }
844
|
845 kumpf 1.131 AsyncOpNode* MessageQueueService::get_op()
|
846 mday 1.1 {
|
847 kumpf 1.131 AsyncOpNode* op = new AsyncOpNode();
|
848 kumpf 1.104
|
849 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
850 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
851 kumpf 1.104
|
852 mday 1.1 return op;
853 }
854
|
855 kumpf 1.131 void MessageQueueService::return_op(AsyncOpNode* op)
|
856 mday 1.1 {
|
857 kumpf 1.131 PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
858 delete op;
|
859 mday 1.1 }
860
|
861 mday 1.18
|
862 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
863 kumpf 1.131 AsyncOpNode* op,
|
864 kumpf 1.104 Uint32 destination,
|
865 kumpf 1.131 void (*callback)(AsyncOpNode*, MessageQueue*, void*),
866 MessageQueue* callback_response_q,
867 void* callback_ptr)
868 {
869 PEGASUS_ASSERT(op != 0 && callback != 0);
870
871 // get the queue handle for the destination
872
873 op->lock();
874 // destination of this message
875 op->_op_dest = MessageQueue::lookup(destination);
876 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
877 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
878 // initialize the callback data
879 // callback function to be executed by recpt. of response
880 op->_async_callback = callback;
881 // the op node
882 op->_callback_node = op;
883 // the queue that will receive the response
884 op->_callback_response_q = callback_response_q;
885 // user data for callback
886 kumpf 1.131 op->_callback_ptr = callback_ptr;
887 // I am the originator of this request
888 op->_callback_request_q = this;
889
890 op->unlock();
891 if (op->_op_dest == 0)
892 return false;
|
893 kumpf 1.104
|
894 kumpf 1.131 return _meta_dispatcher->route_async(op);
|
895 mday 1.18 }
896
897
|
898 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
899 kumpf 1.131 Message* msg,
|
900 kumpf 1.104 Uint32 destination,
|
901 kumpf 1.131 void (*callback)(Message* response, void* handle, void* parameter),
902 void* handle,
903 void* parameter)
904 {
905 if (msg == NULL)
906 return false;
907 if (callback == NULL)
908 return SendForget(msg);
909 AsyncOpNode *op = get_op();
910 msg->dest = destination;
911 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
912 {
913 op->release();
914 return_op(op);
915 return false;
916 }
917 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
918 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
919 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
920 op->__async_callback = callback;
921 op->_callback_node = op;
922 kumpf 1.131 op->_callback_handle = handle;
923 op->_callback_parameter = parameter;
924 op->_callback_response_q = this;
925
926 if (!(msg->getMask() & MessageMask::ha_async))
927 {
928 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
929 op,
930 destination,
931 msg,
932 destination);
933 }
934 else
935 {
936 op->_request.reset(msg);
937 (static_cast<AsyncMessage *>(msg))->op = op;
938 }
939 return _meta_dispatcher->route_async(op);
940 }
941
942
943 kumpf 1.131 Boolean MessageQueueService::SendForget(Message* msg)
944 {
945 AsyncOpNode* op = 0;
946 Uint32 mask = msg->getMask();
947
948 if (mask & MessageMask::ha_async)
949 {
950 op = (static_cast<AsyncMessage *>(msg))->op;
951 }
952
953 if (op == 0)
954 {
955 op = get_op();
956 op->_request.reset(msg);
957 if (mask & MessageMask::ha_async)
958 {
959 (static_cast<AsyncMessage *>(msg))->op = op;
960 }
961 }
962 op->_op_dest = MessageQueue::lookup(msg->dest);
963 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
964 kumpf 1.131 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
965 | ASYNC_OPFLAGS_SIMPLE_STATUS);
966 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
967 if (op->_op_dest == 0)
968 {
969 op->release();
970 return_op(op);
971 return false;
972 }
973
974 // now see if the meta dispatcher will take it
975 return _meta_dispatcher->route_async(op);
976 }
977
978
979 AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
980 {
981 if (request == 0)
982 return 0;
983
984 Boolean destroy_op = false;
985 kumpf 1.131
986 if (request->op == 0)
987 {
988 request->op = get_op();
989 request->op->_request.reset(request);
990 destroy_op = true;
991 }
992
993 request->block = false;
994 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
995 SendAsync(
996 request->op,
997 request->dest,
998 _sendwait_callback,
999 this,
1000 (void *)0);
1001
1002 request->op->_client_sem.wait();
1003
1004 AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
1005 rpl->op = 0;
1006 kumpf 1.131
1007 if (destroy_op == true)
1008 {
1009 request->op->lock();
1010 request->op->_request.release();
1011 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1012 request->op->unlock();
1013 return_op(request->op);
1014 request->op = 0;
1015 }
1016 return rpl;
|
1017 mday 1.1 }
1018
1019
|
1020 kumpf 1.104 Boolean MessageQueueService::register_service(
1021 String name,
1022 Uint32 capabilities,
1023 Uint32 mask)
1024 {
|
1025 kumpf 1.131 RegisterCimService *msg = new RegisterCimService(
1026 0,
1027 true,
1028 name,
1029 capabilities,
1030 mask,
1031 _queueId);
1032 msg->dest = CIMOM_Q_ID;
1033
1034 Boolean registered = false;
1035 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1036
1037 if (reply != 0)
1038 {
1039 if (reply->getMask() & MessageMask::ha_async)
1040 {
1041 if (reply->getMask() & MessageMask::ha_reply)
|
1042 kumpf 1.104 {
|
1043 kumpf 1.131 if (reply->result == async_results::OK ||
1044 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1045 {
1046 registered = true;
1047 }
|
1048 kumpf 1.104 }
|
1049 kumpf 1.131 }
|
1050 kumpf 1.104
|
1051 kumpf 1.131 delete reply;
1052 }
1053 delete msg;
1054 return registered;
|
1055 mday 1.1 }
1056
1057 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1058 {
|
1059 kumpf 1.131 UpdateCimService *msg = new UpdateCimService(
1060 0,
1061 true,
1062 _queueId,
1063 _capabilities,
1064 _mask);
1065 Boolean registered = false;
1066
1067 AsyncMessage* reply = SendWait(msg);
1068 if (reply)
1069 {
1070 if (reply->getMask() & MessageMask::ha_async)
1071 {
1072 if (reply->getMask() & MessageMask::ha_reply)
|
1073 kumpf 1.104 {
|
1074 kumpf 1.131 if (static_cast<AsyncReply *>(reply)->result ==
1075 async_results::OK)
1076 {
1077 registered = true;
1078 }
|
1079 kumpf 1.104 }
|
1080 kumpf 1.131 }
1081 delete reply;
1082 }
1083 delete msg;
1084 return registered;
|
1085 mday 1.1 }
1086
1087
|
1088 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1089 mday 1.1 {
|
1090 kumpf 1.131 _meta_dispatcher->deregister_module(_queueId);
1091 return true;
|
1092 mday 1.1 }
1093
1094
|
1095 kumpf 1.104 void MessageQueueService::find_services(
1096 String name,
1097 Uint32 capabilities,
1098 Uint32 mask,
|
1099 kumpf 1.131 Array<Uint32>* results)
|
1100 mday 1.1 {
|
1101 kumpf 1.131 if (results == 0)
1102 {
1103 throw NullPointer();
1104 }
1105
1106 results->clear();
1107
1108 FindServiceQueue *req = new FindServiceQueue(
1109 0,
1110 _queueId,
1111 true,
1112 name,
1113 capabilities,
1114 mask);
1115
1116 req->dest = CIMOM_Q_ID;
1117
1118 AsyncMessage *reply = SendWait(req);
1119 if (reply)
1120 {
1121 if (reply->getMask() & MessageMask::ha_async)
1122 kumpf 1.131 {
1123 if (reply->getMask() & MessageMask::ha_reply)
|
1124 kumpf 1.104 {
|
1125 kumpf 1.133 if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)
|
1126 kumpf 1.131 {
1127 if ((static_cast<FindServiceQueueResult*>(reply))->result ==
1128 async_results::OK)
1129 *results =
1130 (static_cast<FindServiceQueueResult*>(reply))->qids;
1131 }
|
1132 kumpf 1.104 }
|
1133 kumpf 1.131 }
1134 delete reply;
1135 }
1136 delete req;
1137 }
1138
1139 void MessageQueueService::enumerate_service(
1140 Uint32 queue,
1141 message_module* result)
1142 {
1143 if (result == 0)
1144 {
1145 throw NullPointer();
1146 }
1147
1148 EnumerateService *req = new EnumerateService(
1149 0,
1150 _queueId,
1151 true,
1152 queue);
1153
1154 kumpf 1.131 AsyncMessage* reply = SendWait(req);
1155
1156 if (reply)
1157 {
1158 Boolean found = false;
1159
1160 if (reply->getMask() & MessageMask::ha_async)
1161 {
1162 if (reply->getMask() & MessageMask::ha_reply)
|
1163 kumpf 1.104 {
|
1164 kumpf 1.133 if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)
|
1165 kumpf 1.131 {
1166 if ((static_cast<EnumerateServiceResponse*>(reply))->
1167 result == async_results::OK)
1168 {
1169 if (found == false)
1170 {
1171 found = true;
1172
1173 result->put_name((static_cast<
1174 EnumerateServiceResponse*>(reply))->name);
1175 result->put_capabilities((static_cast<
1176 EnumerateServiceResponse*>(reply))->
1177 capabilities);
1178 result->put_mask((static_cast<
1179 EnumerateServiceResponse*>(reply))->mask);
1180 result->put_queue((static_cast<
1181 EnumerateServiceResponse*>(reply))->qid);
1182 }
1183 }
1184 }
|
1185 kumpf 1.104 }
|
1186 kumpf 1.131 }
1187 delete reply;
1188 }
1189 delete req;
|
1190 mday 1.1 }
1191
|
1192 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1193 {
1194 _polling_list_mutex.lock();
1195
1196 if (!_polling_list)
|
1197 kumpf 1.131 _polling_list = new PollingList;
|
1198 mike 1.120
1199 _polling_list_mutex.unlock();
|
1200 kumpf 1.104
|
1201 mike 1.120 return _polling_list;
|
1202 mday 1.1 }
1203
1204 PEGASUS_NAMESPACE_END
|