gpt4 book ai didi

java - 动态消费来自kafka主题的消息

转载 作者:太空宇宙 更新时间:2023-11-04 09:14:25 25 4
gpt4 key购买 nike

我需要使用生产者动态创建的主题中的消息。我已经在消费者 @KafkaListener(topicPattern = "topicname_.*"中使用了主题模式方法,并且还设置了metadata.max.age.ms=3000。但显然除非我将 offset.auto.reset 设置为最早,否则我无法实现这一点。在我们的要求中,offset.auto.reset 必须设置为最新以避免重复出现问题。

关于如何实现同样的目标有什么想法吗?

最佳答案

从顶层来说,Kafka 的设计理念不允许发生这样的事情,因为在运行时添加主题会导致代理硬性重新平衡,因此必须避免这种情况,但如果我们能够在每次将新主题添加到列表时重新启动消费者组,我们就可以优雅地避免这种危险。

最新的 Spring-Kafka 集成及其用法包括注释 @KafkaListener,它通过使用传递给它的容器工厂创建 KafkaListenerContainer,将 POJO 监听器转换为 Kafka 消费者。该消费者收听硬编码为主题、主题表达式、主题模式等字符串数组的主题。这限制了我们的设计,只能通过 Java DSL 将这些主题作为键获取,而不是直接硬编码。但是,键仍然硬编码在作为参数传递给 @KafkaListener 的数组中。

示例:

@KafkaListener(topics = {“${kafka.topics.receipt.cancel.name}”}, containerFactory = “kafkaContainerFactory”)

注意:注释属性 KafkaListener.topics 的值必须是数组初始值设定项,因此硬编码是强制性的。

关于java - 动态消费来自kafka主题的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59245850/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com