gpt4 book ai didi

java - 处理批处理记录后如何提交kafka offset

转载 作者:行者123 更新时间:2023-11-30 02:01:48 24 4
gpt4 key购买 nike

我正在使用 spring-kafka 并使用来自 kafka 主题的批处理记录,并通过 AbstractMessageListenerContainer.AckMode.BATCH 提交偏移量。

在我的例子中,处理批处理记录需要时间(大约 20 秒),并且消费者线程会等待批处理完成,然后再次进行轮询(在此轮询中提交偏移量)。在这种情况下,我将把记录的 List 分配给一个线程(名称:ProcessThread),该线程将处理所有记录并将结果返回给消费者线程,然后消费者线程将记录结果。 (在所有这个过程中,消费者线程将等待,直到从ProcessThread获取结果,这导致性能低下。

有什么方法可以让ProcessThread负责向kafka提交偏移量吗?这样消费者线程就不需要等待,并且对于每个轮询它将创建一个新的processThread

就我而言,我的主题有 20 个分区和 10 个 Pod,每个 Pod 有 2 个消费者线程(Spring Kafka 并发消费者),每个轮询 100 条记录(使用 Spring Boot 线程 @Async 处理所有这些记录)

通过上述配置,我可以在 2 小时内处理 100 万条记录,但我需要将其缩短到至少 40 分钟。

感谢任何帮助😊

最佳答案

您应该至少升级到 1.3.7。当前版本是2.1.10。其中没有单独的线程。只有消费者线程可以发送偏移量。消费者不是线程安全的。

您不应该切换到单独的线程。使用更高的并发和更多的分区。

关于java - 处理批处理记录后如何提交kafka offset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52617111/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com