gpt4 book ai didi

apache-kafka - 如何在两个 Kafka 流之间使用持久化的 StateStore

转载 作者:行者123 更新时间:2023-12-04 01:45:02 27 4
gpt4 key购买 nike

我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦:

  • 在应用程序启动时,(压缩的)主题 alpha 被加载到键值 StateStore 映射中
  • Kafka Stream 从另一个主题消费,使用 (.get) 上面的映射并最终在主题 alpha
  • 中生成一条新记录
  • 结果是内存映射应该与底层主题保持一致,即使流媒体重新启动也是如此。

我的方法如下:

val builder = new StreamsBuilderS()

val store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)

builder.addStateStore(store)

val loaderStreamer = new LoaderStreamer(store).startStream()

[...] // I wait a few seconds until the loading is complete and the stream os running

val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!

builder
.stream("another-topic")(Consumed.`with`(kSerde, vSerde))
.doMyAggregationsAndgetFromTheMapAbove
.transform(() => new StoreTransformer[K, V]("store"), "store")
.to("alpha")(Produced.`with`(kSerde, vSerde))

LoaderStreamer(商店):

[...]
val builders = new StreamsBuilderS()

builder.addStateStore(store)

builder
.table("alpha")(Consumed.`with`(kSerde, vSerde))

builder.build
[...]

StoreTransformer:

[...]
override def init(context: ProcessorContext): Unit = {
this.context = context
this.store =
context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}

override def transform(key: K, value: V): (K, V) = {
store.put(key, value)
(key, value)
}
[...]

...但我得到的是:

Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.

在尝试获取商店处理程序时。

关于如何实现这一点有什么想法吗?

谢谢!

最佳答案

您不能在两个 Kafka Streams 应用程序之间共享状态存储。

根据文档:https://docs.confluent.io/current/streams/faq.html#interactive-queries上述异常可能有两个原因:

  • 本地 KafkaStreams 实例尚未就绪,因此无法查询其本地状态存储。

  • 本地 KafkaStreams 实例已准备就绪,但特定状态存储刚刚在幕后迁移到另一个实例。

处理它的最简单方法是等到状态存储可查询:

public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}

整个示例可以在 confluent github 找到.

关于apache-kafka - 如何在两个 Kafka 流之间使用持久化的 StateStore,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55656508/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com