1 karl 1.119 //%2006////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mday 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.82 //
|
21 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "MessageQueueService.h"
|
35 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
|
37 mday 1.1
38 PEGASUS_NAMESPACE_BEGIN
39
|
40 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
41 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
42 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
43 mday 1.15
|
44 karl 1.133.2.3 static struct timeval deallocateWait = {30, 0};
45
46 // set max threads in a single thread pool. Since there is only
47 // a single threadpool, this sets the maximum threads that can
48 // be used minus permanent threads and provider added threads.
49 const Uint32 maximumThreadsInPool = 20;
|
50 mday 1.53
|
51 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
52 mday 1.53
|
53 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
54 Mutex MessageQueueService::_polling_list_mutex;
|
55 mday 1.53
|
56 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
57 mday 1.61
|
58 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
59 mday 1.69 {
60 return _thread_pool;
61 }
|
62 kumpf 1.117
|
63 jim.wunderlich 1.110 //
|
64 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
65 jim.wunderlich 1.110 //
66 // JR Wunderlich Jun 6, 2005
67 //
68
|
69 kumpf 1.131 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
70 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
71 jim.wunderlich 1.110
|
72 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
73 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
74 #endif
75
|
76 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
77 mday 1.69
|
78 kumpf 1.131 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
79 void* parm)
|
80 mday 1.53 {
|
81 kumpf 1.131 Thread *myself = reinterpret_cast<Thread *>(parm);
82 List<MessageQueueService, Mutex> *list =
83 reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
84
85 while (_stop_polling.get() == 0)
86 {
87 _polling_sem.wait();
88
89 if (_stop_polling.get() != 0)
90 {
91 break;
92 }
93
94 // The polling_routine thread must hold the lock on the
95 // _polling_list while processing incoming messages.
96 // This lock is used to give this thread ownership of
97 // services on the _polling_routine list.
98
99 // This is necessary to avoid confict with other threads
100 // processing the _polling_list
101 // (e.g., MessageQueueServer::~MessageQueueService).
102 kumpf 1.131
103 list->lock();
104 MessageQueueService *service = list->front();
105 ThreadStatus rtn = PEGASUS_THREAD_OK;
106 while (service != NULL)
107 {
108 if ((service->_incoming.count() > 0) &&
109 (service->_die.get() == 0) &&
110 (service->_threads.get() < max_threads_per_svc_queue))
111 {
112 // The _threads count is used to track the
113 // number of active threads that have been allocated
114 // to process messages for this service.
115
116 // The _threads count MUST be incremented while
117 // the polling_routine owns the _polling_thread
118 // lock and has ownership of the service object.
119
120 service->_threads++;
121 try
122 {
123 kumpf 1.131 rtn = _thread_pool->allocate_and_awaken(
124 service, _req_proc, &_polling_sem);
125 }
126 catch (...)
127 {
128 service->_threads--;
129
130 // allocate_and_awaken should never generate an exception.
131 PEGASUS_ASSERT(0);
132 }
133 // if no more threads available, break from processing loop
134 if (rtn != PEGASUS_THREAD_OK )
135 {
136 service->_threads--;
137 Logger::put(
138 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
139 "Not enough threads to process this request. "
140 "Skipping.");
141
|
142 marek 1.132 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
|
143 kumpf 1.131 "Could not allocate thread for %s. Queue has %d "
144 "messages waiting and %d threads servicing."
145 "Skipping the service for right now. ",
146 service->getQueueName(),
147 service->_incoming.count(),
|
148 marek 1.132 service->_threads.get()));
|
149 kumpf 1.131
150 Threads::yield();
151 service = NULL;
152 }
153 }
154 if (service != NULL)
155 {
156 service = list->next_of(service);
157 }
158 }
159 list->unlock();
160 }
161 myself->exit_self( (ThreadReturnType) 1 );
162 return 0;
|
163 mday 1.53 }
164
165
166 Semaphore MessageQueueService::_polling_sem(0);
167 AtomicInt MessageQueueService::_stop_polling(0);
168
|
169 mday 1.15
|
170 kumpf 1.104 MessageQueueService::MessageQueueService(
|
171 kumpf 1.131 const char* name,
172 Uint32 queueID,
173 Uint32 capabilities,
174 Uint32 mask)
175 : Base(name, true, queueID),
176 _mask(mask),
177 _die(0),
178 _threads(0),
179 _incoming(),
180 _incoming_queue_shutdown(0)
181 {
182 _capabilities = (capabilities | module_capabilities::async);
183
184 _default_op_timeout.tv_sec = 30;
185 _default_op_timeout.tv_usec = 100;
186
187 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
188
189 // if requested thread max is out of range, then set to
190 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
191
192 kumpf 1.131 if ((max_threads_per_svc_queue < 1) ||
193 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
194 {
195 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
196 }
197
|
198 marek 1.132 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
199 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
|
200 kumpf 1.131
201 AutoMutex autoMut(_meta_dispatcher_mutex);
202
203 if (_meta_dispatcher == 0)
204 {
205 _stop_polling = 0;
206 PEGASUS_ASSERT(_service_count.get() == 0);
207 _meta_dispatcher = new cimom();
208
209 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
210 // minimum_cnt, maximum_cnt, deallocateWait);
211 //
|
212 karl 1.133.2.3
213 _thread_pool = new ThreadPool(0, "MessageQueueService", 0,
214 maximumThreadsInPool, deallocateWait);
|
215 kumpf 1.131 }
216 _service_count++;
217
218 if (false == register_service(name, _capabilities, _mask))
219 {
220 MessageLoaderParms parms(
221 "Common.MessageQueueService.UNABLE_TO_REGISTER",
222 "CIM base message queue service is unable to register with the "
223 "CIMOM dispatcher.");
224 throw BindFailedException(parms);
225 }
|
226 kumpf 1.104
|
227 kumpf 1.131 _get_polling_list()->insert_back(this);
|
228 mday 1.1 }
229
|
230 mday 1.4
|
231 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
232 mday 1.1 {
|
233 kumpf 1.131 _die = 1;
234
235 // The polling_routine locks the _polling_list while
236 // processing the incoming messages for services on the
237 // list. Deleting the service from the _polling_list
238 // prior to processing, avoids synchronization issues
239 // with the _polling_routine.
240
241 // ATTN: added to prevent assertion in List in which the list does not
242 // contain this element.
243
244 if (_get_polling_list()->contains(this))
245 _get_polling_list()->remove(this);
246
247 // ATTN: The code for closing the _incoming queue
248 // is not working correctly. In OpenPegasus 2.5,
249 // execution of the following code is very timing
250 // dependent. This needs to be fix.
251 // See Bug 4079 for details.
252 if (_incoming_queue_shutdown.get() == 0)
253 {
254 kumpf 1.131 _shutdown_incoming_queue();
255 }
|
256 mday 1.76
|
257 kumpf 1.131 // Wait until all threads processing the messages
258 // for this service have completed.
259
260 while (_threads.get() > 0)
|
261 konrad.r 1.109 {
|
262 kumpf 1.131 Threads::yield();
263 }
264
265 {
266 AutoMutex autoMut(_meta_dispatcher_mutex);
267 _service_count--;
268 if (_service_count.get() == 0)
269 {
270
271 _stop_polling++;
272 _polling_sem.signal();
273 if (_polling_thread)
274 {
275 _polling_thread->join();
276 delete _polling_thread;
277 _polling_thread = 0;
278 }
279 _meta_dispatcher->_shutdown_routed_queue();
280 delete _meta_dispatcher;
281 _meta_dispatcher = 0;
282
283 kumpf 1.131 delete _thread_pool;
284 _thread_pool = 0;
285 }
286 } // mutex unlocks here
287 // Clean up in case there are extra stuff on the queue.
288 while (_incoming.count())
289 {
290 try
291 {
292 delete _incoming.dequeue();
293 }
294 catch (const ListClosed&)
295 {
296 // If the list is closed, there is nothing we can do.
297 break;
298 }
|
299 konrad.r 1.109 }
|
300 kumpf 1.104 }
|
301 mday 1.1
|
302 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
303 mday 1.7 {
|
304 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
305 return;
306
307 AsyncIoctl *msg = new AsyncIoctl(
308 0,
309 _queueId,
310 _queueId,
311 true,
312 AsyncIoctl::IO_CLOSE,
313 0,
314 0);
315
316 msg->op = get_op();
317 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
318 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
319 | ASYNC_OPFLAGS_SIMPLE_STATUS);
320 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
321 kumpf 1.104
|
322 kumpf 1.131 msg->op->_op_dest = this;
323 msg->op->_request.reset(msg);
324 try
325 {
326 _incoming.enqueue_wait(msg->op);
327 _polling_sem.signal();
328 }
329 catch (const ListClosed&)
330 {
|
331 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
332 kumpf 1.131 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
333 // processed.
334 delete msg;
335 }
336 catch (const Permission&)
337 {
338 delete msg;
339 }
|
340 mday 1.7 }
341
|
342 mday 1.22
343
|
344 kumpf 1.131 void MessageQueueService::enqueue(Message* msg)
|
345 mday 1.22 {
|
346 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
|
347 kumpf 1.28
|
348 kumpf 1.131 Base::enqueue(msg);
|
349 kumpf 1.28
|
350 kumpf 1.131 PEG_METHOD_EXIT();
|
351 mday 1.22 }
352
353
|
354 mike 1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
|
355 kumpf 1.131 void* parm)
|
356 kumpf 1.103 {
|
357 konrad.r 1.107 MessageQueueService* service =
|
358 kumpf 1.131 reinterpret_cast<MessageQueueService*>(parm);
|
359 konrad.r 1.107 PEGASUS_ASSERT(service != 0);
|
360 kumpf 1.103 try
361 {
|
362 mike 1.118 if (service->_die.get() != 0)
|
363 kumpf 1.103 {
|
364 denise.eckstein 1.116 service->_threads--;
|
365 kumpf 1.131 return 0;
|
366 kumpf 1.103 }
367 // pull messages off the incoming queue and dispatch them. then
368 // check pending messages that are non-blocking
369 AsyncOpNode *operation = 0;
370
371 // many operations may have been queued.
372 do
373 {
374 try
375 {
|
376 mike 1.120 operation = service->_incoming.dequeue();
|
377 kumpf 1.103 }
|
378 kumpf 1.131 catch (ListClosed&)
|
379 kumpf 1.103 {
|
380 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
|
381 marek 1.132 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
382 kumpf 1.104 // "Caught ListClosed exception. Exiting _req_proc.");
|
383 kumpf 1.103 break;
384 }
385
386 if (operation)
387 {
388 operation->_service_ptr = service;
389 service->_handle_incoming_operation(operation);
390 }
391 } while (operation);
392 }
393 catch (const Exception& e)
394 {
395 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
396 String("Caught exception: \"") + e.getMessage() +
397 "\". Exiting _req_proc.");
398 }
399 catch (...)
400 {
|
401 marek 1.132 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
402 kumpf 1.103 "Caught unrecognized exception. Exiting _req_proc.");
403 }
|
404 konrad.r 1.107 service->_threads--;
|
405 kumpf 1.131 return 0;
|
406 mday 1.1 }
407
|
408 mday 1.43
|
409 kumpf 1.104 void MessageQueueService::_sendwait_callback(
|
410 kumpf 1.131 AsyncOpNode* op,
411 MessageQueue* q,
|
412 kumpf 1.104 void *parm)
|
413 mday 1.33 {
|
414 kumpf 1.131 op->_client_sem.signal();
|
415 mday 1.33 }
416
|
417 mday 1.30
418 // callback function is responsible for cleaning up all resources
419 // including op, op->_callback_node, and op->_callback_ptr
|
420 kumpf 1.131 void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
|
421 mday 1.22 {
|
422 kumpf 1.131 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
423 {
424 Message *msg = op->removeRequest();
425 if (msg && (msg->getMask() & MessageMask::ha_async))
426 {
|
427 kumpf 1.133 if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)
|
428 kumpf 1.131 {
429 AsyncLegacyOperationStart *wrapper =
430 static_cast<AsyncLegacyOperationStart *>(msg);
431 msg = wrapper->get_action();
432 delete wrapper;
433 }
|
434 kumpf 1.133 else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)
|
435 kumpf 1.131 {
436 AsyncModuleOperationStart *wrapper =
437 static_cast<AsyncModuleOperationStart *>(msg);
438 msg = wrapper->get_action();
439 delete wrapper;
440 }
|
441 kumpf 1.133 else if (msg->getType() == ASYNC_ASYNC_OP_START)
|
442 kumpf 1.131 {
443 AsyncOperationStart *wrapper =
444 static_cast<AsyncOperationStart *>(msg);
445 msg = wrapper->get_action();
446 delete wrapper;
447 }
448 delete msg;
449 }
|
450 mday 1.39
|
451 kumpf 1.131 msg = op->removeResponse();
452 if (msg && (msg->getMask() & MessageMask::ha_async))
453 {
|
454 kumpf 1.133 if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)
|
455 kumpf 1.131 {
456 AsyncLegacyOperationResult *wrapper =
457 static_cast<AsyncLegacyOperationResult *>(msg);
458 msg = wrapper->get_result();
459 delete wrapper;
460 }
|
461 kumpf 1.133 else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)
|
462 kumpf 1.131 {
463 AsyncModuleOperationResult *wrapper =
464 static_cast<AsyncModuleOperationResult *>(msg);
465 msg = wrapper->get_result();
466 delete wrapper;
467 }
468 }
469 void (*callback)(Message *, void *, void *) = op->__async_callback;
470 void *handle = op->_callback_handle;
471 void *parm = op->_callback_parameter;
472 op->release();
473 return_op(op);
474 callback(msg, handle, parm);
475 }
476 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
477 {
478 // note that _callback_node may be different from op
479 // op->_callback_response_q is a "this" pointer we can use for
480 // static callback methods
481 op->_async_callback(
482 op->_callback_node, op->_callback_response_q, op->_callback_ptr);
483 kumpf 1.131 }
|
484 mday 1.22 }
485
|
486 mday 1.1
|
487 kumpf 1.131 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
|
488 mday 1.1 {
|
489 kumpf 1.131 if (operation != 0)
490 {
|
491 kumpf 1.104
492 // ATTN: optimization
|
493 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
494 kumpf 1.131 operation->lock();
|
495 kumpf 1.104
|
496 kumpf 1.131 Message *rq = operation->_request.get();
|
497 kumpf 1.104
|
498 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
499 kumpf 1.104 // move this to the bottom of the loop when the majority of
500 // messages become async messages.
|
501 mday 1.31
|
502 kumpf 1.131 // divert legacy messages to handleEnqueue
503 if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
504 {
505 operation->_request.release();
506 operation->unlock();
507 // delete the op node
508 operation->release();
509 return_op(operation);
510
511 handleEnqueue(rq);
512 return;
513 }
514
515 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
516 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
517 (operation->_state & ASYNC_OPSTATE_COMPLETE))
518 {
519 operation->unlock();
520 _handle_async_callback(operation);
521 }
522 else
523 kumpf 1.131 {
524 PEGASUS_ASSERT(rq != 0);
525 operation->unlock();
526 _handle_async_request(static_cast<AsyncRequest *>(rq));
527 }
528 }
529 return;
|
530 mday 1.1 }
531
532 void MessageQueueService::_handle_async_request(AsyncRequest *req)
533 {
|
534 kumpf 1.131 if (req != 0)
535 {
536 req->op->processing();
537
|
538 kumpf 1.133 MessageType type = req->getType();
539 if (type == ASYNC_HEARTBEAT)
|
540 kumpf 1.131 handle_heartbeat_request(req);
|
541 kumpf 1.133 else if (type == ASYNC_IOCTL)
|
542 kumpf 1.131 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
|
543 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_START)
|
544 kumpf 1.131 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
|
545 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_STOP)
|
546 kumpf 1.131 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
|
547 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_PAUSE)
|
548 kumpf 1.131 handle_CimServicePause(static_cast<CimServicePause *>(req));
|
549 kumpf 1.133 else if (type == ASYNC_CIMSERVICE_RESUME)
|
550 kumpf 1.131 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
|
551 kumpf 1.133 else if (type == ASYNC_ASYNC_OP_START)
|
552 kumpf 1.131 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
553 else
554 {
555 // we don't handle this request message
556 _make_response(req, async_results::CIM_NAK);
557 }
558 }
|
559 mday 1.1 }
560
|
561 mday 1.17
562 Boolean MessageQueueService::_enqueueResponse(
|
563 kumpf 1.131 Message* request,
564 Message* response)
|
565 mday 1.17 {
|
566 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
567 "MessageQueueService::_enqueueResponse");
|
568 mday 1.25
|
569 kumpf 1.131 if (request->getMask() & MessageMask::ha_async)
570 {
571 if (response->getMask() & MessageMask::ha_async)
572 {
573 _completeAsyncResponse(
574 static_cast<AsyncRequest *>(request),
575 static_cast<AsyncReply *>(response),
576 ASYNC_OPSTATE_COMPLETE, 0);
577 PEG_METHOD_EXIT();
578 return true;
579 }
580 }
|
581 mday 1.63
|
582 kumpf 1.131 if (request->_async != 0)
583 {
584 Uint32 mask = request->_async->getMask();
585 PEGASUS_ASSERT(mask &
586 (MessageMask::ha_async | MessageMask::ha_request));
587
588 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
589 AsyncOpNode *op = async->op;
590 request->_async = 0;
591 // the legacy request is going to be deleted by its handler
592 // remove it from the op node
593
594 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
595
596 AsyncLegacyOperationResult *async_result =
597 new AsyncLegacyOperationResult(
598 op,
599 response);
600 _completeAsyncResponse(
601 async,
602 async_result,
603 kumpf 1.131 ASYNC_OPSTATE_COMPLETE,
604 0);
605 PEG_METHOD_EXIT();
606 return true;
607 }
|
608 kumpf 1.104
|
609 kumpf 1.131 // ensure that the destination queue is in response->dest
610 PEG_METHOD_EXIT();
611 return SendForget(response);
|
612 mday 1.17 }
613
|
614 kumpf 1.131 void MessageQueueService::_make_response(Message* req, Uint32 code)
|
615 mday 1.1 {
|
616 kumpf 1.131 cimom::_make_response(req, code);
|
617 mday 1.1 }
618
619
|
620 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
|
621 kumpf 1.131 AsyncRequest* request,
622 AsyncReply* reply,
|
623 kumpf 1.104 Uint32 state,
624 Uint32 flag)
|
625 mday 1.5 {
|
626 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
627 "MessageQueueService::_completeAsyncResponse");
|
628 kumpf 1.37
|
629 kumpf 1.131 cimom::_completeAsyncResponse(request, reply, state, flag);
|
630 kumpf 1.37
|
631 kumpf 1.131 PEG_METHOD_EXIT();
|
632 mday 1.5 }
633
634
|
635 kumpf 1.104 void MessageQueueService::_complete_op_node(
|
636 kumpf 1.131 AsyncOpNode* op,
|
637 kumpf 1.104 Uint32 state,
638 Uint32 flag,
639 Uint32 code)
|
640 mday 1.32 {
|
641 kumpf 1.131 cimom::_complete_op_node(op, state, flag, code);
|
642 mday 1.32 }
643
|
644 mday 1.5
|
645 kumpf 1.131 Boolean MessageQueueService::accept_async(AsyncOpNode* op)
|
646 mday 1.5 {
|
647 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
648 return false;
649 if (_polling_thread == NULL)
650 {
651 _polling_thread = new Thread(
652 polling_routine,
653 reinterpret_cast<void *>(_get_polling_list()),
654 false);
655 ThreadStatus tr = PEGASUS_THREAD_OK;
656 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
657 {
658 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
659 Threads::yield();
660 else
661 throw Exception(MessageLoaderParms(
662 "Common.MessageQueueService.NOT_ENOUGH_THREAD",
663 "Could not allocate thread for the polling thread."));
664 }
665 }
|
666 kumpf 1.104 // ATTN optimization remove the message checking altogether in the base
|
667 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
668 kumpf 1.131 op->lock();
669 Message *rq = op->_request.get();
670 Message *rp = op->_response.get();
671 op->unlock();
|
672 kumpf 1.104
|
673 kumpf 1.131 if ((rq != 0 && (true == messageOK(rq))) ||
674 (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
675 {
676 _incoming.enqueue_wait(op);
677 _polling_sem.signal();
678 return true;
679 }
680 return false;
|
681 mday 1.5 }
682
|
683 kumpf 1.131 Boolean MessageQueueService::messageOK(const Message* msg)
|
684 mday 1.5 {
|
685 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
686 return false;
687 return true;
|
688 mday 1.18 }
689
|
690 kumpf 1.131 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)
|
691 mday 1.1 {
|
692 kumpf 1.131 // default action is to echo a heartbeat response
|
693 kumpf 1.104
|
694 kumpf 1.131 AsyncReply *reply = new AsyncReply(
|
695 kumpf 1.133 ASYNC_HEARTBEAT,
|
696 kumpf 1.131 0,
697 req->op,
698 async_results::OK,
699 req->resp,
700 false);
701 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
702 mday 1.1 }
703
704
|
705 kumpf 1.131 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)
|
706 kumpf 1.104 {
|
707 mday 1.1 }
|
708 kumpf 1.104
|
709 kumpf 1.131 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
|
710 mday 1.1 {
|
711 kumpf 1.131 switch (req->ctl)
712 {
713 case AsyncIoctl::IO_CLOSE:
714 {
715 MessageQueueService *service =
716 static_cast<MessageQueueService *>(req->op->_service_ptr);
|
717 kumpf 1.81
718 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
719 kumpf 1.131 PEGASUS_STD(cout) << service->getQueueName() <<
720 " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
721 kumpf 1.81 #endif
|
722 kumpf 1.104
|
723 kumpf 1.131 // respond to this message. this is fire and forget, so we
724 // don't need to delete anything.
725 // this takes care of two problems that were being found
726 // << Thu Oct 9 10:52:48 2003 mdd >>
727 _make_response(req, async_results::OK);
728 // ensure we do not accept any further messages
|
729 kumpf 1.104
|
730 kumpf 1.131 // ensure we don't recurse on IO_CLOSE
731 if (_incoming_queue_shutdown.get() > 0)
732 break;
|
733 kumpf 1.104
|
734 kumpf 1.131 // set the closing flag
735 service->_incoming_queue_shutdown = 1;
736 // empty out the queue
737 while (1)
|
738 kumpf 1.104 {
|
739 kumpf 1.131 AsyncOpNode *operation;
740 try
741 {
742 operation = service->_incoming.dequeue();
743 }
744 catch (IPCException&)
745 {
746 break;
747 }
748 if (operation)
749 {
750 operation->_service_ptr = service;
751 service->_handle_incoming_operation(operation);
752 }
753 else
754 break;
755 } // message processing loop
756
757 // shutdown the AsyncQueue
758 service->_incoming.close();
759 return;
760 kumpf 1.131 }
|
761 kumpf 1.104
|
762 kumpf 1.131 default:
763 _make_response(req, async_results::CIM_NAK);
764 }
|
765 mday 1.1 }
|
766 mday 1.8
|
767 kumpf 1.131 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
|
768 mday 1.1 {
|
769 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
770 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received START" <<
771 PEGASUS_STD(endl);
|
772 kumpf 1.81 #endif
|
773 kumpf 1.104
|
774 kumpf 1.131 // clear the stoped bit and update
775 _capabilities &= (~(module_capabilities::stopped));
776 _make_response(req, async_results::OK);
777 // now tell the meta dispatcher we are stopped
778 update_service(_capabilities, _mask);
779 }
|
780 mday 1.10
|
781 kumpf 1.131 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
|
782 mday 1.1 {
|
783 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
784 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
785 kumpf 1.81 #endif
|
786 kumpf 1.131 // set the stopeed bit and update
787 _capabilities |= module_capabilities::stopped;
788 _make_response(req, async_results::CIM_STOPPED);
789 // now tell the meta dispatcher we are stopped
790 update_service(_capabilities, _mask);
|
791 mday 1.1 }
|
792 kumpf 1.104
|
793 kumpf 1.131 void MessageQueueService::handle_CimServicePause(CimServicePause* req)
|
794 mday 1.1 {
|
795 kumpf 1.131 // set the paused bit and update
796 _capabilities |= module_capabilities::paused;
797 update_service(_capabilities, _mask);
798 _make_response(req, async_results::CIM_PAUSED);
799 // now tell the meta dispatcher we are stopped
|
800 mday 1.1 }
|
801 kumpf 1.104
|
802 kumpf 1.131 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)
|
803 mday 1.1 {
|
804 kumpf 1.131 // clear the paused bit and update
805 _capabilities &= (~(module_capabilities::paused));
806 update_service(_capabilities, _mask);
807 _make_response(req, async_results::OK);
808 // now tell the meta dispatcher we are stopped
|
809 mday 1.1 }
|
810 kumpf 1.104
|
811 kumpf 1.131 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)
|
812 mday 1.1 {
|
813 kumpf 1.131 _make_response(req, async_results::CIM_NAK);
|
814 mday 1.1 }
815
|
816 kumpf 1.131 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)
|
817 mday 1.1 {
|
818 mday 1.14 }
819
|
820 mday 1.10
|
821 kumpf 1.131 void MessageQueueService::handle_AsyncLegacyOperationStart(
822 AsyncLegacyOperationStart* req)
|
823 mday 1.14 {
|
824 kumpf 1.131 // remove the legacy message from the request and enqueue it to its
825 // destination
826 Uint32 result = async_results::CIM_NAK;
|
827 kumpf 1.104
|
828 kumpf 1.131 Message* legacy = req->_act;
829 if (legacy != 0)
830 {
831 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
832 if (queue != 0)
833 {
834 if (queue->isAsync() == true)
835 {
836 (static_cast<MessageQueueService *>(queue))->handleEnqueue(
837 legacy);
838 }
839 else
840 {
841 // Enqueue the response:
842 queue->enqueue(req->get_action());
843 }
|
844 kumpf 1.104
|
845 kumpf 1.131 result = async_results::OK;
846 }
847 }
848 _make_response(req, result);
|
849 mday 1.14 }
850
|
851 kumpf 1.131 void MessageQueueService::handle_AsyncLegacyOperationResult(
852 AsyncLegacyOperationResult* rep)
|
853 mday 1.14 {
|
854 mday 1.1 }
855
|
856 kumpf 1.131 AsyncOpNode* MessageQueueService::get_op()
|
857 mday 1.1 {
|
858 kumpf 1.131 AsyncOpNode* op = new AsyncOpNode();
|
859 kumpf 1.104
|
860 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
861 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
862 kumpf 1.104
|
863 mday 1.1 return op;
864 }
865
|
866 kumpf 1.131 void MessageQueueService::return_op(AsyncOpNode* op)
|
867 mday 1.1 {
|
868 kumpf 1.131 PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
869 delete op;
|
870 mday 1.1 }
871
|
872 mday 1.18
|
873 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
874 kumpf 1.131 AsyncOpNode* op,
|
875 kumpf 1.104 Uint32 destination,
|
876 kumpf 1.131 void (*callback)(AsyncOpNode*, MessageQueue*, void*),
877 MessageQueue* callback_response_q,
878 void* callback_ptr)
879 {
880 PEGASUS_ASSERT(op != 0 && callback != 0);
881
882 // get the queue handle for the destination
883
884 op->lock();
885 // destination of this message
886 op->_op_dest = MessageQueue::lookup(destination);
887 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
888 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
889 // initialize the callback data
890 // callback function to be executed by recpt. of response
891 op->_async_callback = callback;
892 // the op node
893 op->_callback_node = op;
894 // the queue that will receive the response
895 op->_callback_response_q = callback_response_q;
896 // user data for callback
897 kumpf 1.131 op->_callback_ptr = callback_ptr;
898 // I am the originator of this request
899 op->_callback_request_q = this;
900
901 op->unlock();
902 if (op->_op_dest == 0)
903 return false;
|
904 kumpf 1.104
|
905 kumpf 1.131 return _meta_dispatcher->route_async(op);
|
906 mday 1.18 }
907
908
|
909 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
910 kumpf 1.131 Message* msg,
|
911 kumpf 1.104 Uint32 destination,
|
912 kumpf 1.131 void (*callback)(Message* response, void* handle, void* parameter),
913 void* handle,
914 void* parameter)
915 {
916 if (msg == NULL)
917 return false;
918 if (callback == NULL)
919 return SendForget(msg);
920 AsyncOpNode *op = get_op();
921 msg->dest = destination;
922 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
923 {
924 op->release();
925 return_op(op);
926 return false;
927 }
928 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
929 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
930 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
931 op->__async_callback = callback;
932 op->_callback_node = op;
933 kumpf 1.131 op->_callback_handle = handle;
934 op->_callback_parameter = parameter;
935 op->_callback_response_q = this;
936
937 if (!(msg->getMask() & MessageMask::ha_async))
938 {
939 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
940 op,
941 destination,
942 msg,
943 destination);
944 }
945 else
946 {
947 op->_request.reset(msg);
948 (static_cast<AsyncMessage *>(msg))->op = op;
949 }
950 return _meta_dispatcher->route_async(op);
951 }
952
953
954 kumpf 1.131 Boolean MessageQueueService::SendForget(Message* msg)
955 {
956 AsyncOpNode* op = 0;
957 Uint32 mask = msg->getMask();
958
959 if (mask & MessageMask::ha_async)
960 {
961 op = (static_cast<AsyncMessage *>(msg))->op;
962 }
963
964 if (op == 0)
965 {
966 op = get_op();
967 op->_request.reset(msg);
968 if (mask & MessageMask::ha_async)
969 {
970 (static_cast<AsyncMessage *>(msg))->op = op;
971 }
972 }
973 op->_op_dest = MessageQueue::lookup(msg->dest);
974 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
975 kumpf 1.131 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
976 | ASYNC_OPFLAGS_SIMPLE_STATUS);
977 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
978 if (op->_op_dest == 0)
979 {
980 op->release();
981 return_op(op);
982 return false;
983 }
984
985 // now see if the meta dispatcher will take it
986 return _meta_dispatcher->route_async(op);
987 }
988
989
990 AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
991 {
992 if (request == 0)
993 return 0;
994
995 Boolean destroy_op = false;
996 kumpf 1.131
997 if (request->op == 0)
998 {
999 request->op = get_op();
1000 request->op->_request.reset(request);
1001 destroy_op = true;
1002 }
1003
1004 request->block = false;
1005 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1006 SendAsync(
1007 request->op,
1008 request->dest,
1009 _sendwait_callback,
1010 this,
1011 (void *)0);
1012
1013 request->op->_client_sem.wait();
1014
1015 AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
1016 rpl->op = 0;
1017 kumpf 1.131
1018 if (destroy_op == true)
1019 {
1020 request->op->lock();
1021 request->op->_request.release();
1022 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1023 request->op->unlock();
1024 return_op(request->op);
1025 request->op = 0;
1026 }
1027 return rpl;
|
1028 mday 1.1 }
1029
1030
|
1031 kumpf 1.104 Boolean MessageQueueService::register_service(
1032 String name,
1033 Uint32 capabilities,
1034 Uint32 mask)
1035 {
|
1036 kumpf 1.131 RegisterCimService *msg = new RegisterCimService(
1037 0,
1038 true,
1039 name,
1040 capabilities,
1041 mask,
1042 _queueId);
1043 msg->dest = CIMOM_Q_ID;
1044
1045 Boolean registered = false;
1046 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1047
1048 if (reply != 0)
1049 {
1050 if (reply->getMask() & MessageMask::ha_async)
1051 {
1052 if (reply->getMask() & MessageMask::ha_reply)
|
1053 kumpf 1.104 {
|
1054 kumpf 1.131 if (reply->result == async_results::OK ||
1055 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1056 {
1057 registered = true;
1058 }
|
1059 kumpf 1.104 }
|
1060 kumpf 1.131 }
|
1061 kumpf 1.104
|
1062 kumpf 1.131 delete reply;
1063 }
1064 delete msg;
1065 return registered;
|
1066 mday 1.1 }
1067
1068 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1069 {
|
1070 kumpf 1.131 UpdateCimService *msg = new UpdateCimService(
1071 0,
1072 true,
1073 _queueId,
1074 _capabilities,
1075 _mask);
1076 Boolean registered = false;
1077
1078 AsyncMessage* reply = SendWait(msg);
1079 if (reply)
1080 {
1081 if (reply->getMask() & MessageMask::ha_async)
1082 {
1083 if (reply->getMask() & MessageMask::ha_reply)
|
1084 kumpf 1.104 {
|
1085 kumpf 1.131 if (static_cast<AsyncReply *>(reply)->result ==
1086 async_results::OK)
1087 {
1088 registered = true;
1089 }
|
1090 kumpf 1.104 }
|
1091 kumpf 1.131 }
1092 delete reply;
1093 }
1094 delete msg;
1095 return registered;
|
1096 mday 1.1 }
1097
1098
|
1099 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1100 mday 1.1 {
|
1101 kumpf 1.131 _meta_dispatcher->deregister_module(_queueId);
1102 return true;
|
1103 mday 1.1 }
1104
1105
|
1106 kumpf 1.104 void MessageQueueService::find_services(
1107 String name,
1108 Uint32 capabilities,
1109 Uint32 mask,
|
1110 kumpf 1.131 Array<Uint32>* results)
|
1111 mday 1.1 {
|
1112 kumpf 1.131 if (results == 0)
1113 {
1114 throw NullPointer();
1115 }
1116
1117 results->clear();
1118
1119 FindServiceQueue *req = new FindServiceQueue(
1120 0,
1121 _queueId,
1122 true,
1123 name,
1124 capabilities,
1125 mask);
1126
1127 req->dest = CIMOM_Q_ID;
1128
1129 AsyncMessage *reply = SendWait(req);
1130 if (reply)
1131 {
1132 if (reply->getMask() & MessageMask::ha_async)
1133 kumpf 1.131 {
1134 if (reply->getMask() & MessageMask::ha_reply)
|
1135 kumpf 1.104 {
|
1136 kumpf 1.133 if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)
|
1137 kumpf 1.131 {
1138 if ((static_cast<FindServiceQueueResult*>(reply))->result ==
1139 async_results::OK)
1140 *results =
1141 (static_cast<FindServiceQueueResult*>(reply))->qids;
1142 }
|
1143 kumpf 1.104 }
|
1144 kumpf 1.131 }
1145 delete reply;
1146 }
1147 delete req;
1148 }
1149
1150 void MessageQueueService::enumerate_service(
1151 Uint32 queue,
1152 message_module* result)
1153 {
1154 if (result == 0)
1155 {
1156 throw NullPointer();
1157 }
1158
1159 EnumerateService *req = new EnumerateService(
1160 0,
1161 _queueId,
1162 true,
1163 queue);
1164
1165 kumpf 1.131 AsyncMessage* reply = SendWait(req);
1166
1167 if (reply)
1168 {
1169 Boolean found = false;
1170
1171 if (reply->getMask() & MessageMask::ha_async)
1172 {
1173 if (reply->getMask() & MessageMask::ha_reply)
|
1174 kumpf 1.104 {
|
1175 kumpf 1.133 if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)
|
1176 kumpf 1.131 {
1177 if ((static_cast<EnumerateServiceResponse*>(reply))->
1178 result == async_results::OK)
1179 {
1180 if (found == false)
1181 {
1182 found = true;
1183
1184 result->put_name((static_cast<
1185 EnumerateServiceResponse*>(reply))->name);
1186 result->put_capabilities((static_cast<
1187 EnumerateServiceResponse*>(reply))->
1188 capabilities);
1189 result->put_mask((static_cast<
1190 EnumerateServiceResponse*>(reply))->mask);
1191 result->put_queue((static_cast<
1192 EnumerateServiceResponse*>(reply))->qid);
1193 }
1194 }
1195 }
|
1196 kumpf 1.104 }
|
1197 kumpf 1.131 }
1198 delete reply;
1199 }
1200 delete req;
|
1201 mday 1.1 }
1202
|
1203 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1204 {
1205 _polling_list_mutex.lock();
1206
1207 if (!_polling_list)
|
1208 kumpf 1.131 _polling_list = new PollingList;
|
1209 mike 1.120
1210 _polling_list_mutex.unlock();
|
1211 kumpf 1.104
|
1212 mike 1.120 return _polling_list;
|
1213 mday 1.1 }
1214
1215 PEGASUS_NAMESPACE_END
|