1 mike 1.61 //%2006/////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.51 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.49 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.51 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.53 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.60 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mday 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 kumpf 1.41 //
|
21 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "Cimom.h"
35
|
36 chip 1.6 #include <iostream>
|
37 kumpf 1.33 #include <Pegasus/Common/Constants.h>
|
38 kumpf 1.31 #include <Pegasus/Common/Tracer.h>
|
39 konrad.r 1.58 #include <Pegasus/Common/MessageLoader.h>
|
40 joyce.j 1.52 #include <Pegasus/Common/AutoPtr.h>
|
41 mday 1.1
|
42 kumpf 1.68 PEGASUS_USING_STD;
43
|
44 mday 1.1 PEGASUS_NAMESPACE_BEGIN
45
|
46 mday 1.15 const Uint32 CIMOM_Q_ID = 1;
47
|
48 mday 1.26 Uint32 module_capabilities::async = 0x00000001;
49 Uint32 module_capabilities::remote = 0x00000002;
50 Uint32 module_capabilities::trusted = 0x00000004;
51 Uint32 module_capabilities::paused = 0x00000008;
52 Uint32 module_capabilities::stopped = 0x00000010;
53 Uint32 module_capabilities::module_controller = 0x00000020;
|
54 mday 1.12
|
55 mday 1.1
56
|
57 david.dillard 1.54 const String & message_module::get_name() const { return _name ; }
58 Uint32 message_module::get_capabilities() const { return _capabilities ; }
59 Uint32 message_module::get_mask() const { return _mask ; }
60 Uint32 message_module::get_queue() const { return _q_id ; }
|
61 mday 1.1 void message_module::put_name(String & name) { _name.clear(); _name = name; }
|
62 kumpf 1.68 void message_module::put_capabilities(Uint32 capabilities)
63 {
64 _capabilities = capabilities;
65 }
|
66 mday 1.1 void message_module::put_mask(Uint32 mask) { _mask = mask; }
67 void message_module::put_queue(Uint32 queue) { _q_id = queue; }
68
69
|
70 kumpf 1.68 Boolean message_module::operator==(Uint32 q) const
|
71 mday 1.1 {
|
72 kumpf 1.68 return (this->_q_id == q);
|
73 mday 1.1 }
74
|
75 kumpf 1.68 Boolean message_module::operator==(const message_module *mm) const
|
76 mday 1.1 {
|
77 kumpf 1.68 return (this == mm);
|
78 mday 1.1 }
79
|
80 kumpf 1.68 Boolean message_module::operator==(const String& name) const
|
81 mday 1.1 {
|
82 kumpf 1.68 return (name == this->_name);
|
83 mday 1.1 }
84
|
85 mday 1.5 Boolean cimom::route_async(AsyncOpNode *op)
86 {
|
87 kumpf 1.68 if (_die.get() > 0)
88 return false;
|
89 mday 1.5
|
90 kumpf 1.68 if (_routed_queue_shutdown.get() > 0)
91 return false;
|
92 mday 1.11
|
93 kumpf 1.68 _routed_ops.enqueue_wait(op);
|
94 chip 1.6
|
95 kumpf 1.68 return true;
|
96 mday 1.5 }
97
|
98 david.dillard 1.54 void cimom::_shutdown_routed_queue()
|
99 mday 1.10 {
|
100 kumpf 1.68 if (_routed_queue_shutdown.get() > 0)
101 return;
|
102 mday 1.11
|
103 kumpf 1.68 AutoPtr<AsyncIoctl> msg(new AsyncIoctl(
104 0,
105 CIMOM_Q_ID,
106 CIMOM_Q_ID,
107 true,
108 AsyncIoctl::IO_CLOSE,
109 0,
110 0));
111 msg->op = get_cached_op();
112
113 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
114 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK |
115 ASYNC_OPFLAGS_SIMPLE_STATUS);
116 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
117 msg->op->_op_dest = _global_this;
118 msg->op->_request.reset(msg.get());
119
120 _routed_ops.enqueue_wait(msg->op);
121 _routing_thread.join();
122 msg.release();
|
123 mday 1.10 }
124
|
125 mday 1.5
|
126 mike 1.66 ThreadReturnType PEGASUS_THREAD_CDECL cimom::_routing_proc(void *parm)
|
127 mday 1.5 {
|
128 kumpf 1.68 Thread* myself = reinterpret_cast<Thread *>(parm);
129 cimom* dispatcher = reinterpret_cast<cimom *>(myself->get_parm());
130 AsyncOpNode *op = 0;
131
132 while (dispatcher->_die.get() == 0)
133 {
134 try
135 {
136 op = dispatcher->_routed_ops.dequeue_wait();
137 }
138 catch (ListClosed &)
139 {
140 break;
141 }
142
143 if (op == 0)
144 {
145 break;
146 }
147 else
148 {
149 kumpf 1.68 Uint32 capabilities = 0;
150 Uint32 code = 0;
|
151 mday 1.5
|
152 mday 1.16 // ATTN: optimization
|
153 kumpf 1.68 // <<< Sun Feb 17 18:26:39 2002 mdd >>>
154 // once the opnode is enqueued on the cimom's list, the cimom owns it
155 // and no one is allowed to write to it but the cimom.
156 // services are only allowed to read status bits
157 // this can eliminate the need for the lock/unlock
158 // unless reading/writing status bits
159
160 op->lock();
161 MessageQueue *dest_q = op->_op_dest;
162 Uint32 dest_qid = dest_q->getQueueId();
163 op->unlock();
164
165 Boolean accepted = false;
166
167 if (dest_qid == CIMOM_Q_ID)
168 {
169 dispatcher->_handle_cimom_op(op, myself, dispatcher);
170 accepted = true;
171 }
172 else
173 {
174 kumpf 1.68 // ATTN: optimization
175 // <<< Sun Feb 17 18:29:26 2002 mdd >>>
176 // this lock/loop/unlock is really just a safety check to ensure
177 // the service is registered with the meta dispatcher.
178 // if speed is an issue we can remove this lookup
179 // because we have converted to MessageQueueService from
180 // MessageQueue, and because we register in the constructor,
181 // the safety check is unecessary
|
182 mday 1.18 //
|
183 kumpf 1.68 // << Tue Feb 19 11:40:37 2002 mdd >>
184 // moved the lookup to sendwait/nowait/forget/forward functions.
|
185 mday 1.16
|
186 kumpf 1.68 MessageQueueService *dest_svc = 0;
|
187 mday 1.42
|
188 kumpf 1.68 if (dest_q->get_capabilities() & module_capabilities::async)
189 {
190 dest_svc= static_cast<MessageQueueService *>(dest_q);
191 }
192
193 if (dest_svc != 0)
194 {
195 if (dest_svc->get_capabilities() &
196 module_capabilities::paused ||
197 dest_svc->get_capabilities() &
198 module_capabilities::stopped)
199 {
200 // the target is stopped or paused
201 // unless the message is a start or resume
202 // just handle it from here.
203 op->lock();
204 AsyncRequest *request =
205 static_cast<AsyncRequest *>(op->_request.get());
206 op->unlock();
207 code = request->getType();
208
209 kumpf 1.68 if (code != async_messages::CIMSERVICE_START &&
210 code != async_messages::CIMSERVICE_RESUME)
211 {
212 if (dest_svc->get_capabilities() &
213 module_capabilities::paused)
214 dispatcher->_make_response(
215 request, async_results::CIM_PAUSED);
216 else
217 dispatcher->_make_response(
218 request, async_results::CIM_STOPPED);
219 accepted = true;
220 }
221 else // deliver the start or resume message
222 if (dest_svc->_die.get() == 0)
223 accepted = dest_svc->accept_async(op);
224 }
225 else
226 if (dest_svc->_die.get() == 0)
227 accepted = dest_svc->accept_async(op);
228 }
229
230 kumpf 1.68 if (accepted == false)
231 {
232 // set completion code to NAK and flag completed
233 _complete_op_node(
234 op, ASYNC_OPSTATE_COMPLETE,
235 ASYNC_OPFLAGS_SIMPLE_STATUS,
236 async_results::CIM_NAK);
237 }
238 }
239 }
240 } // loop
|
241 kumpf 1.57
|
242 kumpf 1.69 return 0;
|
243 mday 1.5 }
244
245
|
246 david.dillard 1.54 cimom::cimom()
|
247 kumpf 1.68 : MessageQueue(PEGASUS_QUEUENAME_METADISPATCHER, true, CIMOM_Q_ID),
248 _modules(),
249 _routed_ops(),
250 _routing_thread(_routing_proc, this, false),
251 _die(0),
252 _routed_queue_shutdown(0)
253 {
254 _capabilities |= module_capabilities::async;
255
256 _global_this = static_cast<cimom *>(MessageQueue::lookup(CIMOM_Q_ID));
257
258 Time::gettimeofday(&_last_module_change);
259 _default_op_timeout.tv_sec = 30;
260 _default_op_timeout.tv_usec = 100;
261 ThreadStatus tr = PEGASUS_THREAD_OK;
262 while ((tr = _routing_thread.run()) != PEGASUS_THREAD_OK)
263 {
264 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
265 Threads::yield();
266 else
267 throw Exception(
268 kumpf 1.68 MessageLoaderParms("Common.Cimom.NOT_ENOUGH_THREADS",
269 "Cannot allocate thread for Cimom class"));
270 }
|
271 mday 1.1 }
272
273
|
274 david.dillard 1.54 cimom::~cimom()
|
275 mday 1.1 {
|
276 mday 1.2 // send STOP messages to all modules
277 // shutdown legacy queues; e.g., cim operation dispatcher etc.
|
278 kumpf 1.68 _die++;
279 if (_routed_queue_shutdown.get() == 0)
280 _routed_ops.close();
281 _routing_thread.join();
|
282 kumpf 1.45
|
283 kumpf 1.68 _modules.clear();
|
284 mday 1.1 }
|
285 mday 1.12
|
286 mday 1.16 void cimom::_make_response(Message *req, Uint32 code)
|
287 mday 1.12 {
|
288 kumpf 1.68 if (!(req->getMask() & MessageMask::ha_async))
289 {
290 // legacy message, just delete
291 delete req;
292 return;
293 }
294
295 if ((static_cast<AsyncRequest *>(req))->op->_flags &
296 ASYNC_OPFLAGS_FIRE_AND_FORGET)
297 {
298 // destructor empties request list
299 delete (static_cast<AsyncRequest *>(req))->op;
300 return;
301 }
302
303 AutoPtr<AsyncReply> reply;
304 if (!((static_cast<AsyncRequest *>(req))->op->_flags &
305 ASYNC_OPFLAGS_SIMPLE_STATUS))
306 {
307 reply.reset(new AsyncReply(
308 async_messages::REPLY,
309 kumpf 1.68 0,
310 (static_cast<AsyncRequest *>(req))->op,
311 code,
312 (static_cast<AsyncRequest *>(req))->resp,
313 false));
314 }
315 else
316 (static_cast<AsyncRequest *>(req))->op->_completion_code = code;
317 // sender does not want a reply message, just the
318 // _completion_code field in the AsyncOpNode.
319
320 _completeAsyncResponse(static_cast<AsyncRequest*>(req),
321 reply.get(), ASYNC_OPSTATE_COMPLETE, 0);
322 reply.release();
323 }
324
325 void cimom::_completeAsyncResponse(
326 AsyncRequest *request,
327 AsyncReply *reply,
328 Uint32 state,
329 Uint32 flag)
330 kumpf 1.68 {
331 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "cimom::_completeAsyncResponse");
332
333 PEGASUS_ASSERT(request != 0);
334
335 Boolean haveLock = false;
336
337 AsyncOpNode *op = request->op;
338 op->lock();
339 haveLock = true;
340
341 if ((op->_flags & ASYNC_OPFLAGS_CALLBACK ||
342 op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
343 (!(op->_flags & ASYNC_OPFLAGS_PSEUDO_CALLBACK)))
344 {
345 op->unlock();
346 haveLock = false;
347 if (reply != 0)
348 {
349 op->_response.reset(reply);
350 }
351 kumpf 1.68 _complete_op_node(op, state, flag, (reply ? reply->result : 0 ));
352 return;
353 }
354
355 if (op->_flags & ASYNC_OPFLAGS_FIRE_AND_FORGET)
356 {
357 // destructor empties request list
358 op->unlock();
359 haveLock = false;
360
361 op->release();
362 _global_this->cache_op(op);
363
364 PEG_METHOD_EXIT();
365 return;
366 }
367
368 op->_state |= (state | ASYNC_OPSTATE_COMPLETE);
369 op->_flags |= flag;
370 if ( op->_flags & ASYNC_OPFLAGS_SIMPLE_STATUS )
371 {
372 kumpf 1.68 PEGASUS_ASSERT(reply != 0 );
373
374 op->_completion_code = reply->result;
375 PEG_METHOD_EXIT();
376 delete reply;
377 }
378 else
379 {
380 if (reply != 0)
381 {
382 op->_response.reset(reply);
383 }
384 }
385
386 if (haveLock)
387 {
388 op->unlock();
389 haveLock = false;
390 }
391 op->_client_sem.signal();
392 PEG_METHOD_EXIT();
|
393 mday 1.1 }
394
|
395 mday 1.18 cimom *cimom::_global_this;
396
397
398 void cimom::_default_callback(AsyncOpNode *op, MessageQueue *q, void *ptr)
399 {
|
400 kumpf 1.68 PEGASUS_ASSERT(op != 0 && q != 0);
401 return;
|
402 mday 1.18 }
403
404
|
405 kumpf 1.68 void cimom::_complete_op_node(
406 AsyncOpNode *op,
407 Uint32 state,
408 Uint32 flag,
409 Uint32 code)
410 {
411 Uint32 flags;
412
413 op->lock();
414
415 op->_completion_code = code;
416 op->_state |= (state | ASYNC_OPSTATE_COMPLETE);
417 flags = (op->_flags |= flag);
418 op->unlock();
419 if ( flags & ASYNC_OPFLAGS_FIRE_AND_FORGET )
420 {
421 delete op;
422 return;
423 }
424
425 if ((flags & ASYNC_OPFLAGS_CALLBACK) &&
426 kumpf 1.68 (!(flags & ASYNC_OPFLAGS_PSEUDO_CALLBACK)))
427 {
428 // << Wed Oct 8 12:29:32 2003 mdd >>
429 // check to see if the response queue is stopped or paused
430 if (op->_callback_response_q == 0 ||
431 op->_callback_response_q->get_capabilities() &
432 module_capabilities::paused ||
433 op->_callback_response_q->get_capabilities() &
434 module_capabilities::stopped)
435 {
436 // delete, respondent is paused or stopped
437 delete op;
438 return;
439 }
440
441 // send this node to the response queue
442 op->_op_dest = op->_callback_response_q;
443 _global_this->route_async(op);
444 return;
445 }
446
447 kumpf 1.68 if ((flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
448 (!(flags & ASYNC_OPFLAGS_PSEUDO_CALLBACK)))
449 {
450 op->_op_dest = op->_callback_response_q;
451 _global_this->route_async(op);
452 return;
453 }
454 op->_client_sem.signal();
455 return;
|
456 mday 1.18 }
457
|
458 mday 1.1
|
459 david.dillard 1.54 void cimom::handleEnqueue()
|
460 mday 1.1 {
|
461 kumpf 1.68 Message* msg = dequeue();
|
462 mday 1.1
|
463 kumpf 1.68 if (!msg)
464 return;
|
465 chip 1.6
|
466 kumpf 1.68 return;
|
467 mday 1.1 }
468
|
469 chip 1.6
|
470 kumpf 1.68 void cimom::_handle_cimom_op(
471 AsyncOpNode *op,
472 Thread *thread,
473 MessageQueue *queue)
474 {
475 if (op == 0)
476 return;
477
478 Message* msg = op->getRequest();
479
480 if (msg == 0)
481 return;
482
483 Uint32 mask = msg->getMask();
484 if (!(mask & MessageMask::ha_async))
485 {
486 _make_response(msg, async_results::CIM_NAK);
|
487 kumpf 1.71 return;
|
488 kumpf 1.68 }
|
489 kumpf 1.71
|
490 kumpf 1.68 op->_thread_ptr = thread;
491 op->_service_ptr = queue;
492
493 if (mask & MessageMask::ha_request)
494 {
495 op->processing();
496
|
497 kumpf 1.71 Uint32 type = msg->getType();
|
498 kumpf 1.68 if (type == async_messages::REGISTER_CIM_SERVICE )
499 register_module(static_cast<RegisterCimService *>(msg));
500 else if (type == async_messages::UPDATE_CIM_SERVICE)
501 update_module(static_cast<UpdateCimService *>(msg ));
502 else if (type == async_messages::IOCTL)
503 ioctl(static_cast<AsyncIoctl *>(msg));
504 else if (type == async_messages::FIND_SERVICE_Q)
505 find_service_q(static_cast<FindServiceQueue *>(msg));
506 else if (type == async_messages::ENUMERATE_SERVICE)
507 enumerate_service(static_cast<EnumerateService *>(msg));
508 else if (type == async_messages::FIND_MODULE_IN_SERVICE)
509 _find_module_in_service(static_cast<FindModuleInService *>(msg));
510 else if (type == async_messages::REGISTERED_MODULE)
511 _registered_module_in_service(static_cast<RegisteredModule *>(msg));
512 else if (type == async_messages::DEREGISTERED_MODULE)
513 _deregistered_module_in_service(
514 static_cast<DeRegisteredModule *>(msg));
|
515 kumpf 1.71 else
516 _make_response(msg, async_results::CIM_NAK);
|
517 kumpf 1.68 }
|
518 kumpf 1.71 else
|
519 kumpf 1.68 {
520 _make_response(msg, async_results::CIM_NAK);
521 }
|
522 mday 1.1 }
523
524
525 void cimom::register_module(RegisterCimService *msg)
526 {
|
527 kumpf 1.68 // first see if the module is already registered
528 Uint32 result = async_results::OK;
|
529 mday 1.1
|
530 kumpf 1.68 if (0 != get_module_q(msg->name))
531 result = async_results::MODULE_ALREADY_REGISTERED;
532 else
533 {
534 AutoPtr<message_module> new_mod(new message_module(
535 msg->name,
536 msg->capabilities,
537 msg->mask,
538 msg->queue));
539
540 if (new_mod.get() == 0)
541 result = async_results::INTERNAL_ERROR;
542 else
543 {
544 try
545 {
546 _modules.insert_front(new_mod.get());
547 }
548 catch (IPCException&)
549 {
550 result = async_results::INTERNAL_ERROR;
551 kumpf 1.68 new_mod.reset();
552 }
553 }
554 new_mod.release();
555 }
556
557 AutoPtr<AsyncReply> reply(new AsyncReply(
558 async_messages::REPLY,
559 0,
560 msg->op,
561 result,
562 msg->resp,
563 msg->block));
564
565 _completeAsyncResponse(
566 static_cast<AsyncRequest *>(msg),
567 reply.get(),
568 ASYNC_OPSTATE_COMPLETE,
569 0);
570 reply.release();
571 return;
|
572 mday 1.1 }
573
574
|
575 mday 1.3 void cimom::deregister_module(Uint32 quid)
576 {
|
577 kumpf 1.68 _modules.lock();
|
578 mday 1.3
|
579 kumpf 1.68 message_module *temp = _modules.front();
580 while (temp != 0)
581 {
582 if (temp->_q_id == quid)
583 {
584 _modules.remove(temp);
585 break;
586 }
587 temp = _modules.next_of(temp);
588 }
589 _modules.unlock();
|
590 mday 1.1 }
591
592
|
593 kumpf 1.68 void cimom::update_module(UpdateCimService* msg)
594 {
595 Uint32 result = async_results::MODULE_NOT_FOUND;
596
597 _modules.lock();
598 message_module *temp = _modules.front();
599 while (temp != 0)
600 {
|
601 kumpf 1.69 if (temp->_q_id == msg->queue )
|
602 kumpf 1.68 {
603 temp->_capabilities = msg->capabilities;
604 temp->_mask = msg->mask;
605 Time::gettimeofday(&(temp->_heartbeat));
606 result = async_results::OK;
607 break;
608 }
609 temp = _modules.next_of(temp);
610 }
611 _modules.unlock();
612
613 AutoPtr<AsyncReply> reply(new AsyncReply(
614 async_messages::REPLY,
615 0,
616 msg->op,
617 result,
618 msg->resp,
619 msg->block));
620 _completeAsyncResponse(
621 static_cast<AsyncRequest *>(msg),
622 reply.get(),
623 kumpf 1.68 ASYNC_OPSTATE_COMPLETE,
624 0);
|
625 joyce.j 1.52 reply.release();
|
626 kumpf 1.68 return;
|
627 mday 1.1 }
628
629
|
630 kumpf 1.68 void cimom::ioctl(AsyncIoctl* msg)
|
631 mday 1.1 {
|
632 kumpf 1.68 switch(msg->ctl)
633 {
634 case AsyncIoctl::IO_CLOSE:
635 {
636 // save my bearings
637 Thread *myself = msg->op->_thread_ptr;
638 cimom *service = static_cast<cimom *>(msg->op->_service_ptr);
639
640 // respond to this message.
641 AutoPtr<AsyncReply> reply(new AsyncReply( async_messages::REPLY,
642 0,
643 msg->op,
644 async_results::OK,
645 msg->resp,
646 msg->block));
647 _completeAsyncResponse(static_cast<AsyncRequest *>(msg),
648 reply.get(),
649 ASYNC_OPSTATE_COMPLETE,
650 0);
651 reply.release();
652 // ensure we do not accept any further messages
653 kumpf 1.68
654 // ensure we don't recurse on IO_CLOSE
655 if (_routed_queue_shutdown.get() > 0)
656 break;
657
658 // set the closing flag
659 service->_routed_queue_shutdown = 1;
660
661 // empty out the queue
662 while (1)
663 {
664 AsyncOpNode *operation;
665 try
666 {
667 operation = service->_routed_ops.dequeue();
668 }
669 catch (IPCException&)
670 {
671 break;
672 }
673 if (operation)
674 kumpf 1.68 {
675 service->_handle_cimom_op(operation, myself, service);
676 }
677 else
678 break;
679 } // message processing loop
680
681 // shutdown the AsyncQueue
682 service->_routed_ops.close();
683 // exit the thread !
684 _die++;
685 return;
686 }
687
688 default:
689 {
690 Uint32 result = _ioctl(msg->ctl, msg->intp, msg->voidp);
691 AutoPtr<AsyncReply> reply(new AsyncReply(
692 async_messages::REPLY,
693 0,
694 msg->op,
695 kumpf 1.68 result,
696 msg->resp,
697 msg->block));
698 _completeAsyncResponse(static_cast<AsyncRequest *>(msg),
699 reply.get(),
700 ASYNC_OPSTATE_COMPLETE,
701 0);
702 reply.release();
703 }
704 }
|
705 mday 1.1 }
706
707
708 Uint32 cimom::_ioctl(Uint32 code, Uint32 int_param, void *pointer_param)
709 {
|
710 kumpf 1.68 return async_results::OK;
|
711 mday 1.1 }
712
713 // fill an array with queue IDs of as many registered services
714 // as match the request message parameters
|
715 kumpf 1.68 void cimom::find_service_q(FindServiceQueue* msg)
|
716 mday 1.1 {
|
717 kumpf 1.68 Array<Uint32> found;
|
718 mday 1.1
|
719 kumpf 1.68 _modules.lock();
720 message_module *ret = _modules.front();
721 while (ret != 0)
722 {
723 if (msg->name.size() > 0)
724 {
725 if (msg->name != ret->_name)
726 {
727 ret = _modules.next_of(ret);
728 continue;
729 }
730 }
731
732 if (msg->capabilities != 0)
733 {
734 if (! msg->capabilities & ret->_capabilities)
735 {
736 ret = _modules.next_of(ret);
737 continue;
738 }
739 }
740 kumpf 1.68 if (msg->mask != 0)
741 {
742 if (! msg->mask & ret->_mask)
743 {
744 ret = _modules.next_of(ret);
745 continue;
746 }
747 }
748
749 // if we get to here, we "found" this service
750
751 found.append(ret->_q_id);
752 ret = _modules.next_of(ret);
753 }
754 _modules.unlock();
755
756 AutoPtr<FindServiceQueueResult> reply(new FindServiceQueueResult(
757 msg->op,
758 async_results::OK,
759 msg->resp,
760 msg->block,
761 kumpf 1.68 found));
762
763 _completeAsyncResponse(
764 static_cast<AsyncRequest *>(msg),
765 reply.get(),
766 ASYNC_OPSTATE_COMPLETE,
767 0);
|
768 joyce.j 1.52 reply.release();
|
769 kumpf 1.68 return;
|
770 mday 1.1 }
771
772
|
773 chip 1.6 // given a service Queue ID, return all registation data for
774 // that service
|
775 kumpf 1.68 void cimom::enumerate_service(EnumerateService* msg)
|
776 mday 1.1 {
|
777 kumpf 1.68 AutoPtr<EnumerateServiceResponse> reply;
778 _modules.lock();
779 message_module *ret = _modules.front();
780
781 while (ret != 0)
782 {
783 if (ret->_q_id == msg->qid)
784 {
785 reply.reset(new EnumerateServiceResponse(
786 msg->op,
787 async_results::OK,
788 msg->resp,
789 msg->block,
790 ret->_name,
791 ret->_capabilities,
792 ret->_mask,
793 ret->_q_id));
794 break;
795 }
796 ret = _modules.next_of(ret);
797 }
798 kumpf 1.68 _modules.unlock();
799
800 if (reply.get() == 0)
801 {
802 reply.reset(new EnumerateServiceResponse(
803 msg->op,
804 async_results::MODULE_NOT_FOUND,
805 msg->resp,
806 msg->block,
807 String(),
808 0, 0, 0));
809 }
810
811 _completeAsyncResponse(
812 static_cast<AsyncRequest *>(msg),
813 reply.get(),
814 ASYNC_OPSTATE_COMPLETE,
815 0);
|
816 joyce.j 1.52 reply.release();
|
817 mday 1.5
|
818 kumpf 1.68 return;
|
819 mday 1.1 }
820
|
821 kumpf 1.68 Uint32 cimom::get_module_q(const String& name)
|
822 mday 1.1 {
|
823 kumpf 1.68 _modules.lock();
824 message_module *ret = _modules.front();
825 while (ret != 0)
826 {
827 if (ret->_name == name)
828 break;
829 ret = _modules.next_of(ret);
830 }
831
832 _modules.unlock();
833 if (ret != 0)
834 return ret->_q_id;
835 else
836 return 0;
|
837 mday 1.1 }
838
839
840
841 // returns true if the list of registered modules changes since the parameter
842 Boolean cimom::moduleChange(struct timeval last)
843 {
|
844 kumpf 1.68 if (last.tv_sec >= _last_module_change.tv_sec)
845 if (last.tv_usec >= _last_module_change.tv_usec)
846 return false;
847 return true;
|
848 mday 1.1 }
849
850
|
851 david.dillard 1.54 Uint32 cimom::getModuleCount()
|
852 mday 1.1 {
|
853 kumpf 1.72 return _modules.size();
|
854 mday 1.1 }
855
|
856 kumpf 1.68 Uint32 cimom::getModuleIDs(Uint32* ids, Uint32 count)
|
857 mday 1.1 {
|
858 kumpf 1.68 if (ids == 0)
859 return 0;
|
860 mday 1.1
|
861 kumpf 1.68 message_module *temp = 0;
862 _modules.lock();
863 temp = _modules.front();
864 while (temp != 0 && count > 0)
865 {
866 *ids = temp->_q_id;
867 ids++;
868 count--;
869 temp = _modules.next_of(temp);
870 }
871 _modules.unlock();
872
873 while (count > 0)
874 {
875 *ids = 0;
876 ids++;
877 count--;
878 }
879
|
880 kumpf 1.72 return _modules.size();
|
881 kumpf 1.68 }
882
883 AsyncOpNode* cimom::get_cached_op()
884 {
885 AutoPtr<AsyncOpNode> op(new AsyncOpNode());
886
887 op->_state = ASYNC_OPSTATE_UNKNOWN;
888 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL |
889 ASYNC_OPFLAGS_META_DISPATCHER;
890
891 return op.release();
892 }
893
894 void cimom::cache_op(AsyncOpNode* op)
895 {
896 PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
897 delete op;
898 }
899
900 void cimom::set_default_op_timeout(const struct timeval* buffer)
901 {
902 kumpf 1.68 if (buffer != 0)
903 {
904 _default_op_timeout.tv_sec = buffer->tv_sec;
905 _default_op_timeout.tv_usec = buffer->tv_usec;
906 }
907 }
908
909 void cimom::get_default_op_timeout(struct timeval* timeout) const
910 {
911 if (timeout != 0)
912 {
913 timeout->tv_sec = _default_op_timeout.tv_sec;
914 timeout->tv_usec = _default_op_timeout.tv_usec;
915 }
916 }
917
918 void cimom::_registered_module_in_service(RegisteredModule* msg)
919 {
920 Uint32 result = async_results::MODULE_NOT_FOUND;
921
922 _modules.lock();
923 kumpf 1.68 message_module *ret = _modules.front();
924 while (ret != 0)
925 {
926 if (ret->_q_id == msg->resp)
927 {
928 // see if the module is already registered
929 Uint32 i = 0;
930 for (; i < ret->_modules.size(); i++)
931 {
932 if (ret->_modules[i] == msg->_module)
933 {
934 result = async_results::MODULE_ALREADY_REGISTERED;
935 break;
936 }
937 }
938 if (result != async_results::MODULE_ALREADY_REGISTERED)
939 {
940 ret->_modules.append(msg->_module);
941 result = async_results::OK;
942 }
943 break;
944 kumpf 1.68 }
945 ret = _modules.next_of(ret);
946 }
947 _modules.unlock();
948 _make_response(msg, result);
949 }
950
951 void cimom::_deregistered_module_in_service(DeRegisteredModule* msg)
952 {
953 Uint32 result = async_results::MODULE_NOT_FOUND;
954
955 _modules.lock();
956 message_module *ret = _modules.front();
957 while (ret != 0)
958 {
959 if (ret->_q_id == msg->resp)
960 {
961 Uint32 i = 0;
962 for (; i < ret->_modules.size(); i++)
963 {
964 if (ret->_modules[i] == msg->_module)
965 kumpf 1.68 {
966 ret->_modules.remove(i);
967 result = async_results::OK;
968 break;
969 }
970 }
971 }
972 ret = _modules.next_of(ret);
973 }
974 _modules.unlock();
975 _make_response(msg, result);
976 }
977
978 void cimom::_find_module_in_service(FindModuleInService* msg)
979 {
980 Uint32 result = async_results::MODULE_NOT_FOUND;
981 Uint32 q_id = 0;
982
983 _modules.lock();
984 message_module *ret = _modules.front();
985 while (ret != 0)
986 kumpf 1.68 {
987 if (ret->get_capabilities() & module_capabilities::module_controller)
988 {
989 // see if the module is in this service
990 Uint32 i = 0;
991 for (; i < ret->_modules.size(); i++)
992 {
993 if (ret->_modules[i] == msg->_module)
994 {
995 result = async_results::OK;
996 q_id = ret->_q_id;
997 break;
998 }
999 }
1000 }
1001 ret = _modules.next_of(ret);
1002 }
1003 _modules.unlock();
1004
1005 FindModuleInServiceResponse *response = new FindModuleInServiceResponse(
1006 msg->op,
1007 kumpf 1.68 result,
1008 msg->resp,
1009 msg->block,
1010 q_id);
1011
1012 _complete_op_node(
1013 msg->op,
1014 ASYNC_OPSTATE_COMPLETE,
1015 0,
1016 result);
|
1017 mday 1.27 }
1018
|
1019 mday 1.1 PEGASUS_NAMESPACE_END
|