gpt4 book ai didi

java - 如何使用标点符号从状态存储中删除旧记录? (卡夫卡)

转载 作者:行者123 更新时间:2023-12-03 08:39:40 29 4
gpt4 key购买 nike

我使用 streamsBuilder.table("myTopic") 为某个主题创建了一个 Ktable,并将其具体化为状态存储,以便我可以使用交互式查询.

每小时,我都想从该状态存储(以及关联的变更日志主题)中删除其值在过去一小时内未更新的记录。

我相信使用 punctuator 可以实现这一点,但到目前为止我只使用过 DSL,因此不确定如何继续。如果有人能为我提供一个例子,我将非常感激。

谢谢

jack

最佳答案

可以将处理器 API 与 DSL 混合搭配,但无法处理 KTable。您需要转换为 KStream。或者,您可以使用与状态存储交互的处理器创建新的拓扑。

您需要将该状态存储在某处 - 如何确定记录是否早于一小时。一种选择是为状态存储中的每条记录添加时间戳。

在处理器的 init 方法中,您可以调用调度(标点符号)来迭代状态存储中的记录并删除旧记录:

context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
myStateStore.all().forEachRemaining(keyValue -> {
if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
myStateStore.delete(keyValue.key);
}
});
});

关于java - 如何使用标点符号从状态存储中删除旧记录? (卡夫卡),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62997090/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com