gpt4 book ai didi

apache-spark - 结构化流 : watermark vs. 恰好一次语义

转载 作者:行者123 更新时间:2023-12-04 11:19:24 24 4
gpt4 key购买 nike

编程指南说结构化流使用适当的源/接收器保证端到端的恰好一次语义。

但是,当作业崩溃并且我们应用了水印时,我不明白这是如何工作的。

下面是我目前想象它如何工作的一个例子,请纠正我的任何误解。提前致谢!

示例:

Spark 作业:计算每 1 小时窗口中的 # 个事件,带有 1 小时水印。

留言:

  • A - 时间戳 10am
  • B - 时间戳 10:10am
  • C - 时间戳 10:20am
  • X - 时间戳中午 12 点
  • Y - 时间戳 12:50pm
  • Z - 时间戳晚上 8 点

  • 我们开始工作,从 Source 读取 A、B、C,在我们将它们写到我们的 Sink 之前,工作在上午 10:30 崩溃。

    下午 6 点,作业恢复并知道使用保存的检查点/WAL 重新处理 A、B、C。上午 10 点至 11 点窗口的最终计数为 3。

    接下来,它并行读取来自 Kafka、X、Y、Z 的新消息,因为它们属于不同的分区。 Z 首先被处理,因此最大事件时间戳被设置为晚上 8 点。当作业读取 X 和 Y 时,它们现在位于水印之后(晚上 8 点 - 1 小时 = 晚上 7 点),因此它们作为旧数据被丢弃。晚上 8 点到 9 点的最终计数为 1,作业在下午 12 点到 1 点窗口不报告任何内容。我们丢失了 X 和 Y 的数据。

    ---结束示例---

    这个场景准确吗?
    如果是这样,当从 Kafka-Sspark 正常流动时,1 小时水印可能足以处理延迟/乱序数据,但在 Spark 作业宕机/Kafka 连接长时间丢失时则不然。避免数据丢失的唯一选择是使用比您预期的工作持续时间更长的水印吗?

    最佳答案

    水印在小批量期间是一个固定值。在您的示例中,由于 X、Y 和 Z 在同一个小批量中处理,因此用于此记录的水印将为上午 9:20。完成后,小批量水印将更新到晚上 7 点。
    以下引用来自 design doc用于功能 SPARK-18124它实现了水印功能:

    To calculate the drop boundary in our trigger based execution, we have to do the following.

    • In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
    • After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger) - threshold

    可能模拟会更多描述:
    import org.apache.hadoop.fs.Path
    import java.sql.Timestamp
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.streaming.ProcessingTime

    val dir = new Path("/tmp/test-structured-streaming")
    val fs = dir.getFileSystem(sc.hadoopConfiguration)
    fs.mkdirs(dir)

    val schema = StructType(StructField("vilue", StringType) ::
    StructField("timestamp", TimestampType) ::
    Nil)

    val eventStream = spark
    .readStream
    .option("sep", ";")
    .option("header", "false")
    .schema(schema)
    .csv(dir.toString)

    // Watermarked aggregation
    val eventsCount = eventStream
    .withWatermark("timestamp", "1 hour")
    .groupBy(window($"timestamp", "1 hour"))
    .count

    def writeFile(path: Path, data: String) {
    val file = fs.create(path)
    file.writeUTF(data)
    file.close()
    }

    // Debug query
    val query = eventsCount.writeStream
    .format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .trigger(ProcessingTime("5 seconds"))
    .start()

    writeFile(new Path(dir, "file1"), """
    |A;2017-08-09 10:00:00
    |B;2017-08-09 10:10:00
    |C;2017-08-09 10:20:00""".stripMargin)

    query.processAllAvailable()
    val lp1 = query.lastProgress

    // -------------------------------------------
    // Batch: 0
    // -------------------------------------------
    // +---------------------------------------------+-----+
    // |window |count|
    // +---------------------------------------------+-----+
    // |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
    // +---------------------------------------------+-----+

    // lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
    // {
    // ...
    // "numInputRows" : 3,
    // "eventTime" : {
    // "avg" : "2017-08-09T10:10:00.000Z",
    // "max" : "2017-08-09T10:20:00.000Z",
    // "min" : "2017-08-09T10:00:00.000Z",
    // "watermark" : "1970-01-01T00:00:00.000Z"
    // },
    // ...
    // }


    writeFile(new Path(dir, "file2"), """
    |Z;2017-08-09 20:00:00
    |X;2017-08-09 12:00:00
    |Y;2017-08-09 12:50:00""".stripMargin)

    query.processAllAvailable()
    val lp2 = query.lastProgress

    // -------------------------------------------
    // Batch: 1
    // -------------------------------------------
    // +---------------------------------------------+-----+
    // |window |count|
    // +---------------------------------------------+-----+
    // |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
    // |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 |
    // |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 |
    // +---------------------------------------------+-----+

    // lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
    // {
    // ...
    // "numInputRows" : 3,
    // "eventTime" : {
    // "avg" : "2017-08-09T14:56:40.000Z",
    // "max" : "2017-08-09T20:00:00.000Z",
    // "min" : "2017-08-09T12:00:00.000Z",
    // "watermark" : "2017-08-09T09:20:00.000Z"
    // },
    // "stateOperators" : [ {
    // "numRowsTotal" : 3,
    // "numRowsUpdated" : 2
    // } ],
    // ...
    // }

    writeFile(new Path(dir, "file3"), "")

    query.processAllAvailable()
    val lp3 = query.lastProgress

    // -------------------------------------------
    // Batch: 2
    // -------------------------------------------
    // +---------------------------------------------+-----+
    // |window |count|
    // +---------------------------------------------+-----+
    // |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
    // |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 |
    // |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 |
    // +---------------------------------------------+-----+

    // lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
    // {
    // ...
    // "numInputRows" : 0,
    // "eventTime" : {
    // "watermark" : "2017-08-09T19:00:00.000Z"
    // },
    // "stateOperators" : [ ],
    // ...
    // }

    query.stop()
    fs.delete(dir, true)
    注意 Batch 0 是如何从水印开始的 1970-01-01 00:00:00而第 1 批以水印开始 2017-08-09 09:20:00 (批次 0 的最大事件时间减去 1 小时)。批次 2,虽然为空,但使用了水印 2017-08-09 19:00:00 .

    关于apache-spark - 结构化流 : watermark vs. 恰好一次语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45579100/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com