gpt4 book ai didi

apache-kafka-streams - 卡夫卡流 DSL : Aggregating a KStream to a GlobalKTable

转载 作者:行者123 更新时间:2023-12-02 16:45:02 25 4
gpt4 key购买 nike

我有一个输入流:

KStream<String, X> inputStream = ...

我想以输出到 GlobalKTable<String, Y> 的方式进行操作(过滤然后聚合)然后我可以使用:

KeyValueIterator<String, Y> = streams.store("y-global-store", QueryableStoreTypes.keyValueStore()).all()

Streams DSL 可以支持吗?如果输出表是 KTable 似乎是可能的, 但鉴于我将在这家商店中拥有少量数据,我想使用 GlobalKTable


这是我的处理器,它转换 KStream<String, X>KTable<String, Y>

KTable<String, Y> outputTable = inputStream
.filter(...)
.groupByKey(Grouped.with(Serdes.String(), ySerde))
.aggregate(
initializeWithNull(),
aggregateXToAY(),
Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-global-store")
.withKeySerde(stringSerde)
.withValueSerde(tagRecordSerde)
)

但是这不会创建 GlobalKTable ,我错过了什么?

最佳答案

Stream DSL 不支持从 KStream 构建 GlobalKTable。似乎创建 GlobalKTable 的唯一方法是使用 StreamsBuilder。 globalTable("input_topic_for_globalktable")

我认为 DSL 不支持以这种方式创建 GlobalKtable 的原因是每个应用程序实例都包含整个 GlobalKTable 状态,因此默认情况下禁用日志记录(它不会将变更日志记录到变更日志主题),因此它使用输入主题直接用于恢复状态过程(容错),该主题必须启用日志压缩。

一个解决方案是您必须在作为 outputTable KTable 的输出之前为这个输入主题准备数据:

outputTable.toStream().to("input_topic_for_globalktable");

或者直接使用 outputTable KTable 的变更日志主题(我认为这个解决方案更好,因为你不需要为新主题额外的磁盘空间),它的名字是:

<your-application_id>-y-global-store-changelog

关于apache-kafka-streams - 卡夫卡流 DSL : Aggregating a KStream to a GlobalKTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60725921/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com