gpt4 book ai didi

java - 如何使用RMQ和Spring Cloud Stream创建基于分区的消费者

转载 作者:行者123 更新时间:2023-12-02 13:19:02 25 4
gpt4 key购买 nike

如果我有生产者创建的 3 个分区,并且如果我在 CF 中部署 3 个实例,则每个实例都会选择一个队列并使用文档中的索引处理消息,我就能够使用云流和 Rabbit mq 开发示例消费者。

现在的问题是,如果我有10个分区,似乎我需要10个实例,那就浪费资源了,我们可以让一个消费者监听多个分区吗?我之所以拥有基于分区的生产者,是因为对我来说,处理消息的顺序很重要。

最佳答案

这是一种方法...

@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {

public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}

@StreamListener("input1")
public void foo1(String in) {
doFoo(in);
}

@StreamListener("input2")
public void foo2(String in) {
doFoo(in);
}

protected void doFoo(String in) {
System.out.println(in);
}

public interface TwoInputs {

@Input("input1")
SubscribableChannel input1();

@Input("input2")
SubscribableChannel input2();

}

}

spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0

spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1

这将从 answer to your other question 中生产者创建的 2 个分区中进行消耗.

目前还没有办法让 @StreamListener 直接监听 2 个分区。

编辑

这是另一种方法,使用 exchange->exchange 绑定(bind)...

制作人

@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args).close();
}

@Autowired
private MessageChannel output;

@Autowired
private AmqpAdmin admin;

@Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
private int partitionCount;

@Value("${spring.cloud.stream.bindings.output.destination}")
private String destination;

@Override
public void run(String... args) throws Exception {
for (int i = 0; i < this.partitionCount; i++) {
String partition = this.destination + "-" + i;
TopicExchange exchange = new TopicExchange(partition);
this.admin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
.with(partition);
this.admin.declareBinding(binding);
}

output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}

}

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2

消费者

@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {

public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}

@StreamListener(Sink.INPUT)
public void foo1(String in) {
System.out.println(in);
}

}

spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1

来自主交换器的分区被路由到分区交换器,并且消费者获得交换器列表以将其队列绑定(bind)到。

您可以在命令行上传递该列表。

关于java - 如何使用RMQ和Spring Cloud Stream创建基于分区的消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43661064/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com