gpt4 book ai didi

scala - Spark Streaming dropDuplicates

转载 作者:可可西里 更新时间:2023-11-01 15:26:35 24 4
gpt4 key购买 nike

Spark 2.1.1 (scala api) 从 s3 位置流式传输 json 文件。

我想根据在 json 中为每条记录找到的 ID 列(“event_id”)对所有传入记录进行重复数据删除。我不在乎保留了哪份记录,即使记录只是部分重复。我正在使用追加模式,因为数据只是通过 spark.sql() 方法被丰富/过滤,没有分组依据/窗口聚合。然后我使用追加模式将 Parquet 文件写入 s3。

根据文档,我应该能够使用不加水印的 dropDuplicates 来进行重复数据删除(显然这在长时间运行的生产中无效)。但是,这失败并出现错误:

用户类抛出异常:org.apache.spark.sql.AnalysisException:当流数据帧/数据集上有流聚合时不支持附加输出模式

这个错误看起来很奇怪,因为我没有进行聚合(除非 dropDuplicates 或 sparkSQL 算作聚合?)。

我知道重复项不会在 3 天之外出现,所以我再次尝试添加水印(通过在删除重复项之前立即使用 .withWatermark())。不过好像要等到3天了再写数据。 (即因为今天是 7 月 24 日,所以只有 7 月 21 日同一时间的数据才会写入输出)。

由于没有聚合,我想在批处理处理后立即写入每一行,并简单地丢弃具有前 3 天发生的事件 ID 的所有行。有没有一种简单的方法可以做到这一点?

谢谢

最佳答案

在我的例子中,我曾经通过 DStream 以两种方式实现这一点:

一种方式:

  1. 加载 tmp_data(包含 3 天的唯一数据,见下文)
  2. 接收batch_data并使用tmp_data执行leftOuterJoin
  3. 在step2做filter并输出新的唯一数据
  4. 通过第 2 步的结果用新的唯一数据更新 tmp_data 并删除旧数据(超过 3 天)
  5. tmp_data 保存到 HDFS 或其他任何地方
  6. 一次又一次地重复上面的内容

另一种方式:

  1. 在 mysql 上创建一个表并在 event_id 上设置 UNIQUE INDEX
  2. 接收batch_data并保存event_id + event_time + whatever到mysql
  3. mysql会自动忽略重复

关于scala - Spark Streaming dropDuplicates,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45291335/

24 4 0