gpt4 book ai didi

java - 卡夫卡流: Store is not ready

转载 作者:太空宇宙 更新时间:2023-11-04 10:24:26 25 4
gpt4 key购买 nike

我们最近将 Kafka 升级到了 v1.1,将 Confluence 升级到了 v4.0。但是在升级后,我们遇到了有关状态存储的持续问题。我们的应用程序启动流的集合,并在 100 次尝试后终止应用程序之前检查状态存储是否准备就绪。但升级后,至少有一个流将具有 Store is not ready : the state store, <your stream>, may have migrated to another instance流本身有 RUNNING状态和消息将流过,但商店的状态仍然显示为未就绪。所以我不知道会发生什么。

  • 我不应该检查商店状态吗?
  • 由于我们的应用程序有很多流(~15),因此将开始它们同时引起问题?
  • 我们是否应该进行硬重启——目前我们将其作为服务运行在Linux上

我们在具有 3 个代理的集群中运行 Kafka。下面是示例流(不是完整代码):

public BaseStream createStreamInstance() {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

MessagePayLoadParser<Note> noteParser = new MessagePayLoadParser<Note>(Note.class);
GenericJsonSerde<Note> noteSerde = new GenericJsonSerde<Note>(Note.class);

StreamsBuilder builder = new StreamsBuilder();

//below reducer will use sets to combine
//value1 in the reducer is what is already present in the store.
//value2 is the incoming message and for notes should have max 1 item in it's list (since its 1 attachment 1 tag per row, but multiple rows per note)
Reducer<Note> reducer = new Reducer<Note>() {
@Override
public Note apply(Note value1, Note value2) {
value1.merge(value2);
return value1;
}
};

KTable<Long, Note> noteTable = builder
.stream(this.subTopic, Consumed.with(jsonSerde, jsonSerde))
.map(noteParser::parse)
.groupByKey(Serialized.with(Serdes.Long(), noteSerde))
.reduce(reducer);

noteTable.toStream().to(this.pubTopic, Produced.with(Serdes.Long(), noteSerde));

this.stream = new KafkaStreams(builder.build(), this.properties);
return this;
}

最佳答案

这里有一些悬而未决的问题,比如 Matthias 发表评论的问题,但会尝试回答/为您的实际问题提供帮助:

  • 我不应该检查商店状态吗?重新平衡通常就是这种情况。但在这种情况下,您不应该看到分区的线程继续消耗,但处理应该“转移”到接管的另一个线程完成。确保实际上那个线程是继续处理该分区的线程,而不是新线程。检查 kafka-consumer-groups 实用程序以跟踪那里的消费者(线程)。
  • 由于我们的应用程序有很多流(约 15 个),同时启动它们会导致问题吗?不会,重新平衡是自动的。
  • 我们是否不应该进行硬重启——目前我们将其作为 Linux 上的服务运行 您是否将状态存储保存在某个特定的非默认目录中?您应该正确配置状态存储目录,并确保它可访问,并且对应用程序重新启动不敏感。不确定如何执行硬重启,但一些异常处理代码应该覆盖它,关闭您的流应用程序。

关于java - 卡夫卡流: Store is not ready,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50700822/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com