gpt4 book ai didi

scala - Kafka Consumer 读取到它开始的时间,然后永远挂起

转载 作者:行者123 更新时间:2023-12-05 04:16:25 27 4
gpt4 key购买 nike

我已经设置了一个包含 1 个生产者和 1 个消费者的 Kafka 解决方案,并验证了所有连接是否正确(我可以生成消息并使用它们)。 ZK Server & Kakfa Server 已启动并稳定。

如前所述,我的问题是消费者会从中断的地方正常读取消息,但只会读取在它开始读取之前创建的消息。之后,直到我杀死消费者并重新启动他,新消息才会被读取。

相关消费者 Scala 代码

  val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
val filterSpec = new Whitelist("some-valid-topic")

val stream: KafkaStream[String, String] =
consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

log.info(s"Consumer started. Listening to topics [$filterSpec].")

def read() = stream map digest

摘要采用 MessageAndMetadata 并从中获得乐趣

def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
log.info(s"processing the message [$messageAndMeta]")

属性是

properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)

我可以用它重现的时间轴

  • 产生 5 条消息
  • 启动消费者
  • 消费者阅读了 5 条消息
  • 再生成 15 条消息
  • 消费者忽略新消息并永远挂起
  • 杀死并重启消费者
  • 消费者阅读了 15 条消息,然后再次永远挂起

有什么想法吗?谢谢。

最佳答案

问题是我忽略了一个让我的 Consumer 崩溃的 ConsumerTimeoutException,我把它误认为是“Consumer 永远挂起”。

来自消费者配置文档:

By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption.

我将此设置为几秒钟,之后它会抛出。通过将它设置为 -1,我得到了想要的行为,尽管理想的解决方案(对于我的用例)是按照这个项目的方式实现一些东西:https://github.com/kciesielski/reactive-kafka

This thread pointed me in the right direction

希望对其他人有帮助。

关于scala - Kafka Consumer 读取到它开始的时间,然后永远挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27894081/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com