gpt4 book ai didi

java - 由于 kafka 流存储正在等待运行,应用程序无法启动

转载 作者:行者123 更新时间:2023-12-01 16:35:31 26 4
gpt4 key购买 nike

我有一个使用 kafka 流的 Spring Boot 应用程序(kafka docker 镜像:wurstmeister/kafka:2.12-2.1.1,kafka 依赖项:org.apache.kafka:kafka-streams :2.4.1 )。在应用程序启动期间,我检查主题 my-topic 是否已创建,如果没有 - 应用程序将创建它。在该应用程序创建 KTable 后,如下所示:

streamsBuilder.table("my-topic", Consumed.with(Serdes.String(), Serdes.String()), Materialized.as("my-topic-store"))

此外,我创建商店以便查询它:

while(true)
try{
return kafkaStreams.store("my-topic-store", QueryableStoreTypes.keyValueStore())
} catch (InvalidStateStoreException e) {
log.info("Waiting for store {} is RUNNING", "my-topic-store");
Thread.sleep(1000);
}
}

我的应用程序部署在k8s中。当新版本的应用程序准备就绪时,k8s 启动新应用程序并缩小旧应用程序的规模。问题是当新应用程序启动时,我在日志中只看到多行,例如:“等待商店 my-topic-store 正在运行。

我试图深入研究这个问题。从kafka文档来看,1个分区只能由1个消费者读取,1个消费者可以从多个分区读取。如果新的消费者到来并且所有分区都已“被占用”,则该消费者将变得空闲。在我们的例子中,当新应用程序启动时,意味着新消费者即将到来,它会变得空闲,因为旧消费者的旧应用程序仍在工作,因此新消费者无法监听kafka分区。我应该注意到,该应用程序为 kafka 流配置了 5 个线程,每个主题有 23 个主题,每个主题有 1 个分区(我尝试将分区号从 1 更改为 5,但没有帮助)。应用程序重新部署是在完全没有负载的情况下进行的。

最佳答案

您(在评论中)描述的是预期行为。

当您启动新应用程序时,它将加入消费者组。因为只有一个分区,所以新应用程序不会分配任何工作(没有理由重新分配工作,因为这只是一种昂贵的状态迁移;请注意,从重新平衡的角度来看,您的应用程序进行了扩展;未知您是否计划停止现有的应用程序)。

当您最终停止旧应用程序时,工作(和状态)将重新分配。

另请注意,启动新实例永远不会停止任何现有实例。相反,正如已经提到的,它被认为是应用程序的扩展。

升级应用程序的推荐方法是首先停止旧实例,然后在同一服务器上重新启动新实例,以便它可以从磁盘获取旧实例的状态。这避免了昂贵的状态迁移。

关于java - 由于 kafka 流存储正在等待运行,应用程序无法启动,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61956845/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com