gpt4 book ai didi

apache-kafka - 与 KafkaStreams 的窗口结束外连接

转载 作者:行者123 更新时间:2023-12-05 00:13:47 27 4
gpt4 key购买 nike

我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。
"1-new" , "1-old" , "2-new" , "2-old" .键是唯一的,但有些可能会丢失。

现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同 key ID 的新旧消息。

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })

val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })

val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)

joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})

现在我想知道 中缺少的新/旧 key 时间窗口因为某些原因。是否可以使用 KafkaStreams API 实现?

在我的情况下,当 key "1-old"收到和 "1-new"不是只有在这种情况下 2 分钟内我想报告 ID 1因为可疑。

最佳答案

DSL 可能不会给你你想要的。但是,您可以使用处理器 API。话虽如此,leftJoin实际上可以用来做“举重”。因此,在 leftJoin 之后您可以使用 .transform(...)带有附加状态以进一步“清理”数据。

每个old&null记录您收到,放入商店。如果您稍后收到 old&new您可以将其从商店中删除。此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描商店中“足够旧”的条目,以便您确定以后不会old&new将产生连接结果。对于这些条目,您发出 old&null并从商店中取出。

作为替代方案,您也可以省略连接,并在单个 transform() 中完成所有操作。与状态。为此,您需要KStream#merge()新旧流与呼transform()在合并的流上。

注意:除了注册标点符号之外,您还可以将“扫描逻辑”放入转换并在每次处理记录时执行它。

关于apache-kafka - 与 KafkaStreams 的窗口结束外连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48196450/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com