gpt4 book ai didi

scala - 流静态加入 : How to refresh (unpersist/persist) static Dataframe periodically

转载 作者:行者123 更新时间:2023-12-03 21:07:00 25 4
gpt4 key购买 nike

我正在构建一个 Spark Structured Streaming 应用程序,我正在其中执行批处理流连接。批处理数据的源会定期更新。
因此,我计划定期对该批处理数据进行持久化/非持久化。
下面是我用来持久化和取消持久化批处理数据的示例代码。
流动:

  • 读取批量数据
  • 持久化批处理数据
  • 每隔一小时,取消持久化数据并读取批处理数据并再次持久化。

  • 但是,我没有看到批处理数据每小时刷新一次。
    代码:
    var batchDF = handler.readBatchDF(sparkSession)
    batchDF.persist(StorageLevel.MEMORY_AND_DISK)
    var refreshedTime: Instant = Instant.now()

    if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
    refreshedTime = Instant.now()
    batchDF.unpersist(false)
    batchDF = handler.readBatchDF(sparkSession)
    .persist(StorageLevel.MEMORY_AND_DISK)
    }
    有没有更好的方法可以在 Spark 结构化流媒体作业中实现这种情况?

    最佳答案

    您可以通过使用 Structured Streaming 提供的流调度功能来实现这一点。
    您可以通过创建一个定期刷新静态数据帧的人工“速率”流来触发静态数据帧的刷新(非持久化 -> 加载 -> 持久化)。这个想法是:

  • 最初加载静态数据帧并保持为 var
  • 定义一个刷新静态数据框的方法
  • 使用按所需时间间隔(例如 1 小时)触发的“速率”流
  • 读取实际流数据并使用静态数据帧执行连接操作
  • 在该速率流中有一个 foreachBatch调用在步骤 2 中创建的 refresher 方法的接收器。

  • 以下代码在 Spark 3.0.1、Scala 2.12.10 和 Delta 0.7.0 上运行良好。
      // 1. Load the staticDataframe initially and keep as `var`
    var staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()

    // 2. Define a method that refreshes the static Dataframe
    def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
    }

    // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
    val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

    // 4. Read actual streaming data and perform join operation with static Dataframe
    // As an example I used Kafka as a streaming source
    val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

    val joinDf = streamingDf.join(staticDf, "id")

    val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

    // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
    staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds")) // or e.g. 1 hour
    .start()
    举一个完整的例子,增量表被创建并更新为新值,如下所示:
      val deltaPath = "file:///tmp/delta/table"

    import spark.implicits._
    val df = Seq(
    (1L, "static1"),
    (2L, "static2")
    ).toDF("id", "deltaField")

    df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)

    关于scala - 流静态加入 : How to refresh (unpersist/persist) static Dataframe periodically,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66154867/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com