gpt4 book ai didi

Spring Cloud Stream RabbitMQ

转载 作者:行者123 更新时间:2023-12-02 21:06:30 32 4
gpt4 key购买 nike

我试图理解为什么我想将 Spring 云流与 RabbitMQ 一起使用。我看过 RabbitMQ Spring 教程 4 (https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html),这基本上就是我想做的。它创建一个附加了 2 个队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。

如果您查看教程,则整个过程非常简单,创建所有部件,将它们绑定(bind)在一起,然后就可以开始了。

我想知道使用 Sing Cloud Stream 会带来什么好处,以及这是否是它的用例。创建简单的交换很容易,甚至可以通过流直接定义目的地和组。所以我想为什么不进一步尝试用流处理教程案例。

我已经看到 Stream 有一个 BinderAwareChannelResolver ,它似乎做了同样的事情。但我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖性问题,但我似乎从根本上误解了一些东西,我想是这样的:

spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'

应该有窍门。

有没有人有一个源和接收器的最小示例,它基本上创建一个直接交换,将 2 个队列绑定(bind)到它,并根据将关键路由路由到这两个队列之一,如https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html

编辑:

下面是一组最小的代码,演示了如何执行我所要求的操作。我没有附加 build.gradle 因为它很简单(但如果有人感兴趣,请告诉我)

application.properties:设置生产者

spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type

Sources.class:设置生产者 channel

public interface Sources {

String OUTPUT = "output";

@Output(Sources.OUTPUT)
MessageChannel output();
}

StatusController.class:响应休息调用并使用特定路由键发送消息

/**
* Status endpoint for the health-check service.
*/
@RestController
@EnableBinding(Sources.class)
public class StatusController {

private int index;

private int count;

private final String[] keys = {"orange", "black", "green"};

private Sources sources;

private StatusService status;

@Autowired
public StatusController(Sources sources, StatusService status) {
this.sources = sources;
this.status = status;
}

/**
* Service available, service returns "OK"'.
* @return The Status of the service.
*/
@RequestMapping("/status")
public String status() {
String status = this.status.getStatus();

StringBuilder builder = new StringBuilder("Hello to ");
if (++this.index == 3) {
this.index = 0;
}
String key = keys[this.index];
builder.append(key).append(' ');
builder.append(Integer.toString(++this.count));
String payload = builder.toString();
log.info(payload);

// add kv pair - routingkeyexpression (which matches 'type') will then evaluate
// and add the value as routing key
Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
sources.output().send(msg);

// return rest call
return status;
}
}

消费者方面的事物,属性:

spring.cloud.stream.bindings.input.destination=tut.direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
spring.cloud.stream.bindings.inputer.destination=tut.direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black

Sinks.class:

public interface Sinks {

String INPUT = "input";

@Input(Sinks.INPUT)
SubscribableChannel input();

String INPUTER = "inputer";

@Input(Sinks.INPUTER)
SubscribableChannel inputer();
}

ReceiveStatus.class:接收状态:

@EnableBinding(Sinks.class)
public class ReceiveStatus {
@StreamListener(Sinks.INPUT)
public void receiveStatusOrange(String msg) {
log.info("I received a message. It was orange number: {}", msg);
}

@StreamListener(Sinks.INPUTER)
public void receiveStatusBlack(String msg) {
log.info("I received a message. It was black number: {}", msg);
}
}

最佳答案

Spring Cloud Stream 允许您使用 Spring Cloud Stream Binder 实现(Kafka、RabbitMQ、JMS)连接(通过 @EnableBinding)到外部消息系统,从而开发事件驱动的微服务应用程序粘合剂等)。显然,Spring Cloud Stream 使用 Spring AMQP 来实现 RabbitMQ 绑定(bind)器。

BinderAwareChannelResolver 适用于对生产者的动态绑定(bind)支持,我认为在您的情况下,它是关于配置交换并将消费者绑定(bind)到该交换。

例如,您需要有 2 个具有根据您的条件设置的适当 bindingRoutingKey 的消费者,以及一个具有您上面提到的属性(路由键表达式、目的地)的生产者(除了团体)。我注意到您已经为出站 channel 配置了groupgroup 属性仅适用于消费者(因此是入站)。

您可能还想检查这个:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57正如我看到一些关于使用routing-key-expression的讨论。具体请查看this一是使用表达式值。

关于Spring Cloud Stream RabbitMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45707046/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com