gpt4 book ai didi

apache-kafka - 滑动窗口中Kafka KStream相关消息事件

转载 作者:行者123 更新时间:2023-12-04 12:03:42 24 4
gpt4 key购买 nike

我们有一种情况,我认为 Kafka Streams 可以提供帮助,但我找不到任何说明如何操作的文档或示例。

我发现了一个类似的问题,但它没有任何实现建议:Kafka Streams wait function with depending objects

我想做什么:

我想将来自 Kafka 主题的相关记录关联到单个对象中,并将该新对象发布到单独的输出主题。例如,可能有五个通过唯一键相互关联的消息记录 - 我想从这些相关对象构建一个新对象,并将其生成到一个新主题。

我希望聚合一小时滑动窗口内的所有相关事件。换句话说,一旦 ID 为“123”的消息 A 到达消费者,应用程序必须至少等待一个小时,以便 ID 为“123”的剩余记录到达。在所有记录到达或一小时过去后,这些记录将过期。

最后,一小时内收集到的所有相关消息都用于创建一个新对象,然后将其发送到另一个 Kafka 主题。

我遇到的问题。

Kafka 中的滑动窗口似乎仅在将两个流连接在一起时才起作用。我们将只有一个流连接到该主题 - 我不知道为什么需要两个流或我们将如何实现这一点。我在网上找不到任何例子。
我在 Kafka 中看到的所有流函数在收集相同键的事件时都只是聚合/减少到一个简单的值。例如,某个键出现的次数或将某个值相加

这是一些伪代码来描述我在说什么。如果功能存在,函数名称/语义将有所不同。

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)

问题(并感谢您的帮助):
  • 我怎么能在一个流中使用滑动窗口?
  • 如何自定义 KStream/KTable 函数来收集时间窗口内的所有相关事件并将新对象生成到另一个主题?
  • 确认/偏移管理如何与滑动窗口流一起工作?
  • 这能保证 Exactly Once 交货吗?供引用:https://www.confluent.io/blog/enabling-exactly-kafka-streams/
  • 最佳答案

    Apache Kafka 2.7 中添加了对聚合的滑动窗口支持。
    参见 https://issues.apache.org/jira/browse/KAFKA-5636

    关于apache-kafka - 滑动窗口中Kafka KStream相关消息事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49119379/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com