gpt4 book ai didi

scala - 如何在 kafka 主题中仅存储最新的键值

转载 作者:行者123 更新时间:2023-12-03 16:51:20 25 4
gpt4 key购买 nike

我有一个有数据流的主题。我需要的是从该主题创建一个单独的主题,该主题仅具有给定键的最新值集。

我认为 KTable 的全部目的是存储给定键的最新值,而不是存储整个事件流。但是我似乎无法让它发挥作用。运行下面的代码会生成 keystore ,但该 keystore (maintopiclatest) 中有一个事件流(不仅仅是最新的值)。因此,如果我在主题中两次发送包含 1000 条记录的请求,而不是看到 1000 条记录,而是看到 2000 条记录。

var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();

var stream = kStreamBuilder.stream("maintopic",
Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));

var table = stream
.groupByKey()
.reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));

另一个问题是,如果我想将 KTable 存储在一个新主题中,我不知道该怎么做。为了做到这一点,似乎我必须将它转回一个 Stream 以便我可以在它上面调用“.to”。但是,其中包含整个事件流,而不仅仅是最新的值。

最佳答案

这不是 KTable 的工作方式。

KTable 本身有一个内部状态存储,每个键只存储一条记录。但是,KTable 会不断更新并受到所谓的 的约束。流表对偶 . KTable 的每次更新都会作为变更日志记录向下游发送:https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables .因此,每个输入记录都会产生一个输出记录。

因为它是流处理,所以没有“每个值的最后一个键”。

I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.



您希望 KTable 在哪个时间点发出更新?这个问题没有答案,因为输入流在概念上是无限的。

关于scala - 如何在 kafka 主题中仅存储最新的键值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54640721/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com