gpt4 book ai didi

apache-kafka - FlinkKafkaConsumer 中事件时间顺序的保证

转载 作者:行者123 更新时间:2023-12-03 08:51:43 25 4
gpt4 key购买 nike

TL;DR:目前 Flink 中保证事件时间顺序的最佳解决方案是什么?

我使用 Flink 1.8.0 和 Kafka 2.2.1。我需要通过事件时间戳保证事件的正确顺序。我每 1 秒生成一次周期性水印。我将 FlinkKafkaConsumer 与 AscendingTimestampExtractor 结合使用:

val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
override def extractAscendingTimestamp(element: T): Long =
timestampExtractor(element)
})
.addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)

然后处理:

myStream
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)

我意识到,对于在同一毫秒或几毫秒后发生的无序事件,Flink 不会纠正顺序。我在文档中发现的内容:

the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows

因此我准备了额外的处理步骤来保证事件时间顺序:

myStream
.timeWindowAll(Time.milliseconds(100))
.apply((window, input, out: Collector[MyEvent]) => input
.toList.sortBy(_.getTimestamp)
.foreach(out.collect) // this windowing guarantee correct order by event time
)(TypeInformation.of(classOf[MyEvent]))
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)

但是,我发现这个解决方案很丑陋,它看起来像是一个解决方法。我也关心per-partition watermarks of KafkaSource

理想情况下,我想将顺序保证放在 KafkaSource 中,并为每个 kafka 分区保留它,就像每个分区的水印一样。可以这样做吗? 目前Flink中保证事件时间顺序的最佳方案是什么?

最佳答案

Flink 不保证按事件时间顺序处理记录。分区内的记录将按其原始顺序进行处理,但是当两个或多个分区合并到新分区时(由于流的重新分区或合并),Flink 会随机将这些分区的记录合并到新分区中。其他一切都会效率低下并导致更高的延迟。

例如,如果您的作业有一个从两个 Kafka 分区读取的源任务,则两个分区的记录将以某种随机的锯齿形模式合并。

但是,Flink 保证所有事件都根据生成的水印正确处理。这意味着水印永远不会超过记录。例如,如果您的 Kafka 源生成每个分区的水印,则即使多个分区的记录合并后,水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它保证了输入数据的完整性。

这是按时间戳对记录进行排序的先决条件。你可以用一个翻滚的 window 来做到这一点。但是,您应该注意的是

  1. 一个窗口将在单个任务中执行(即,它不是并行的)。如果每个键的顺序足够,您应该使用常规的翻滚窗口,甚至更好地实现 KeyedProcessFunction,这样效率更高。
  2. 由于重新分区或更改并行性而重新组织流时,顺序将被破坏。

关于apache-kafka - FlinkKafkaConsumer 中事件时间顺序的保证,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58539379/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com