gpt4 book ai didi

java - Kafka - TimestampExtractor 的问题

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:13:13 24 4
gpt4 key购买 nike

我使用 org.apache.kafka:kafka-streams:0.10.0.1

我正在尝试使用一个基于时间序列的流,它似乎不会触发 KStream.Process() 来触发(“标点符号”)。 (引用here)

KafkaStreams 配置中,我传递了这个参数(以及其他参数):

config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());

这里,EventTimeExtractor 是一个自定义时间戳提取器(实现了 org.apache.kafka.streams.processor.TimestampExtractor),用于从 JSON 数据中提取时间戳信息。

当每条新记录被拉入时,我希望这会调用我的对象(派生自 TimestampExtractor)。所讨论的流是 2 * 10^6 条记录/分钟。我将 punctuate() 设置为 60 秒,它从不触发。我知道数据非常频繁地通过这个跨度,因为它拉动旧值来 catch 。

事实上,它根本不会被调用。

  • 这是在 KStream 记录上设置时间戳的错误方法吗?
  • 这是声明此配置的错误方法吗?

最佳答案

2017 年 11 月更新:Kafka 1.0 中的 Kafka Streams 现在支持具有流时间和处理时间(挂钟时间)行为的 punctuate()。因此,您可以选择自己喜欢的行为。

你的设置对我来说似乎是正确的。

您需要注意的是:从 Kafka 0.10.0 开始,punctuate() 方法在 stream-time 上运行(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间)。并且只有当有新的数据记录进来时流时间才会提前,流时间提前多少取决于这些新记录的相关时间戳。

例如:

  • 假设您已将 punctuate() 设置为每 1 分钟调用一次 = 60 * 1000(注意:流时间).现在,如果碰巧在接下来的 5 分钟内没有接收到数据,则根本不会调用 punctuate() —— 尽管您可能希望它被调用 5 次。为什么?同样,因为 punctuate() 依赖于 stream-time,而 stream-time 仅根据新接收的数据记录提前。

这是否会导致您所看到的行为?

展望 future :Kafka 项目中已经在讨论如何使 punctuate() 更灵活,例如不仅基于stream-time(默认为event-time)触发它,还基于processing-time

关于java - Kafka - TimestampExtractor 的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39535201/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com