gpt4 book ai didi

rabbitmq - Spring & RabbitMQ - 在运行时注册队列

转载 作者:行者123 更新时间:2023-12-02 03:24:12 26 4
gpt4 key购买 nike

如何创建绑定(bind)到 Fanout 交换器的新队列并在运行时运行它?到目前为止我有这个:

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 600000L);

GenericBeanDefinition runtimeQueueBean = new GenericBeanDefinition();
runtimeQueueBean.setBeanClass(Queue.class);
runtimeQueueBean.setLazyInit(false);
runtimeQueueBean.setAbstract(false);
runtimeQueueBean.setAutowireCandidate(true);
ConstructorArgumentValues queueConstrArgs = new ConstructorArgumentValues();
queueConstrArgs.addIndexedArgumentValue(0, queueName);
queueConstrArgs.addIndexedArgumentValue(1, true);
queueConstrArgs.addIndexedArgumentValue(2, false);
queueConstrArgs.addIndexedArgumentValue(3, false);
queueConstrArgs.addIndexedArgumentValue(4, arguments);
runtimeQueueBean.setConstructorArgumentValues(queueConstrArgs);
this.context.registerBeanDefinition("nejm", runtimeQueueBean);


GenericBeanDefinition runtimeFanoutExchange = new GenericBeanDefinition();
runtimeFanoutExchange.setBeanClass(FanoutExchange.class);
runtimeFanoutExchange.setLazyInit(false);
runtimeFanoutExchange.setAbstract(false);
runtimeFanoutExchange.setAutowireCandidate(true);
ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, "staticCache");
runtimeFanoutExchange.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("staticCache", runtimeFanoutExchange);


GenericBeanDefinition runtimeBinding = new GenericBeanDefinition();
runtimeBinding.setBeanClass(Binding.class);
runtimeBinding.setLazyInit(false);
runtimeBinding.setAbstract(false);
runtimeBinding.setAutowireCandidate(true);
constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, queueName);
constructorArgumentValues.addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
constructorArgumentValues.addIndexedArgumentValue(2, "staticCache");
constructorArgumentValues.addIndexedArgumentValue(3, "");
runtimeBinding.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("bajnding", runtimeBinding);


GenericBeanDefinition runtimeMessageListenerAdapter = new GenericBeanDefinition();
runtimeMessageListenerAdapter.setBeanClass(MessageListenerAdapter.class);
runtimeMessageListenerAdapter.setLazyInit(false);
runtimeMessageListenerAdapter.setAbstract(false);
runtimeMessageListenerAdapter.setAutowireCandidate(true);
constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, this);
constructorArgumentValues.addIndexedArgumentValue(1, new RuntimeBeanReference("jackson2JsonMessageConverter"));
runtimeMessageListenerAdapter.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("mla2", runtimeMessageListenerAdapter);



GenericBeanDefinition runtimeContainerExchange = new GenericBeanDefinition();
runtimeContainerExchange.setBeanClass(SimpleMessageListenerContainer.class);
runtimeContainerExchange.setLazyInit(false);
runtimeContainerExchange.setAbstract(false);
runtimeContainerExchange.setAutowireCandidate(true);
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("connectionFactory", new RuntimeBeanReference("connectionFactory"));
propertyValues.addPropertyValue("queues", new RuntimeBeanReference("nejm"));
propertyValues.addPropertyValue("messageListener", new RuntimeBeanReference("mla2"));
runtimeContainerExchange.setPropertyValues(propertyValues);
this.context.registerBeanDefinition("defqueue", runtimeContainerExchange);

问题是队列/交换不是在运行时创建的,我必须手动启动监听器(除非我调用 this.context.start() - 但我不知道这是否是正确的方法)。

我的问题 - 是否有某种方法可以在运行时神奇地启动所有生成的 bean(类似于 this.context.refresh() - 这存在但不起作用或类似)?

更新:

这就是我目前的做法(这种方法有效,但不知道是否正确)

    Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 600000L);
Queue queue = new Queue(queueName, true, false, false, arguments);

FanoutExchange exchange = new FanoutExchange("staticCache");

Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, "staticCache", "", null);

rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(binding);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(this.connectionFactory);
container.setQueues(queue);
container.setMessageListener(new MessageListenerAdapter(this, this.converter));

container.start();

最佳答案

你不能那样做。 BeanDefinitionthis.context.registerBeanDefinition 用于应用程序上下文生命周期的解析阶段。

如果您的应用程序已经存在,应用程序上下文将不会接受任何 BeanDefinition

是的,您可以在运行时手动声明 Queue 及其 Binding 到交换器。您甚至可以手动创建 SimpleMessageListenerContainer 并使其正常工作。

什么对你有好处,你只需要手动使用他们的类来实例化。只需要提供容器环境(例如将 this.applicationContext 注入(inject)到 listenerContainer 对象)。

对于 Broker 的声明,您必须使用 applicationContext 中的 RabbitAdmin bean。

从另一方面看,没有理由手动启动一个新的listenerContainer。现有的可以在运行时随新的 Queue 一起提供。

关于rabbitmq - Spring & RabbitMQ - 在运行时注册队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31312768/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com