gpt4 book ai didi

spark-structured-streaming - Spark Structured streaming - dropDuplicates with watermark备选方案

转载 作者:行者123 更新时间:2023-12-05 07:28:01 26 4
gpt4 key购买 nike

我正在尝试使用带水印的 dropDuplicate 函数对流数据进行重复数据删除。我目前面临的问题是我必须为给定记录设置两个时间戳

  1. 一个是事件时间戳 - 从源创建记录的时间戳。
  2. 另一个是传输时间戳 - 来自负责流式传输数据的中间进程的时间戳。

重复是在中间阶段引入的,因此对于给定的记录重复,事件时间戳相同但传输时间戳不同。

对于水印,我喜欢使用传输时间戳,因为我知道在传输过程中重复的时间间隔不能超过 3 分钟。但我不能在 dropDuplicate 中使用它,因为它不会捕获重复项,因为重复项具有不同的传输时间戳。

这是一个例子,

Event 1:{ "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" }
Event 2 (duplicate): {"EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"}

在这种情况下,副本是在原始事件发生 3 分钟后的传输过程中创建的

我的代码如下,

streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring","transferTimestamp");

上述代码不会删除重复项,因为 transferTimestamp 对于事件及其副本是唯一的。但目前,这是唯一的方法,因为 Spark 强制我在设置水印时将水印列包含在 dropDuplicates 函数中。

我真的很想看到像下面这样的 dropDuplicate 实现,它对于任何至少一次语义流都是有效的,在这种情况下我不必在 dropDuplicates 中使用水印字段,并且仍然支持基于水印的状态驱逐。但目前情况并非如此

streamDataset.
.withWatermark("transferTimestamp", "4 minutes")
.dropDuplicates("eventstring");

我不能使用事件时间戳,因为它没有排序并且时间范围变化很​​大(延迟事件和垃圾事件)。

如果有人在这种情况下有重复数据删除的替代解决方案或想法,请告诉我。

最佳答案

对于您的用例,您不能直接使用 dropDuplicates API。您必须使用一些像 flatmapgroupwithstate 这样的 spark API 来使用任意有状态操作

关于spark-structured-streaming - Spark Structured streaming - dropDuplicates with watermark备选方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53622632/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com