gpt4 book ai didi

apache-kafka - 如何通过自动确认按主题和分区同时处理 Reactor Kafka 流?

转载 作者:行者123 更新时间:2023-12-05 05:20:29 26 4
gpt4 key购买 nike

我正在尝试使用具有自动确认功能的 Reactor Kafka 实现 Kafka 主题分区的并发处理。此处的文档使这看起来是可能的:

http://projectreactor.io/docs/kafka/milestone/reference/#concurrent-ordered

这与我正在尝试的唯一区别是我使用的是自动确认。

我有以下代码(相关方法是receiveAuto):

public class KafkaFluxFactory<K, V> {

private final Map<String, Object> properties;

public KafkaFluxFactory(Map<String, Object> properties) {
this.properties = properties;
}

public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) {
return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition))
.flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler));
}

private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) {
return new TopicPartition(record.topic(), record.partition());
}
}

当我使用它通过并行调度程序 (Schedulers.newParallel("debug", 10)) 从 Kafka 创建消费者记录通量时,我看到它们最终都在同一个线程。

关于我可能做错了什么的想法?

最佳答案

经过相当多的反复试验以及对我想要完成的事情的一些重新思考,我意识到我试图在一段代码中解决两个问题。

我需要的两件事是:

  1. Kafka 分区的顺序处理
  2. 能够并行处理每个分区

在尝试用这段代码解决这两个问题时,我限制了下游用户配置并行化级别的能力。因此,我更改了返回 GroupedFluxes 的 Flux 的方法,它为下游用户提供了确定什么是可并行化的正确粒度:

public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAuto(Collection<String> topics) {
return KafkaReceiver.create(createReceiverOptions(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition));
}

在下游,用户可以使用他们希望的任何调度程序并行化每个发出的 GroupedFlux:

public <V> void work(Flux<GroupedFlux<TopicPartition, V>> flux) {
flux.doOnNext(groupPublisher -> groupPublisher
.publishOn(Schedulers.elastic())
.subscribe(this::doWork))
.subscribe();
}

这具有按顺序并与其他 GroupedFlux 并行处理每个 TopicPartition-GroupedFlux 的所需行为。

关于apache-kafka - 如何通过自动确认按主题和分区同时处理 Reactor Kafka 流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44573611/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com