gpt4 book ai didi

java - Spark 流: avoid checkpointLocation check

转载 作者:行者123 更新时间:2023-12-04 11:29:34 25 4
gpt4 key购买 nike

我正在编写一个库来将 Apache Spark 与自定义环境集成。我正在实现自定义流媒体源和流媒体作家。

我正在开发的一些资源是不可恢复的,至少在应用程序崩溃之后是这样。如果应用程序重新启动,它需要重新加载所有数据。
因此,我们希望避免用户必须明确设置“checkpointLocation”选项。
但如果未提供该选项,我们会看到以下错误:

org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);

但是,如果我使用控制台流输出,一切正常。

有没有办法获得相同的行为?

注意:我们为流读取器/写入器使用 Spark v2 接口(interface)。

Spark 日志:
18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook

这就是我开始流式传输工作的方式:
spark.readStream().format("mysource").load()
.writeStream().format("mywriter").outputMode(OutputMode.Append()).start();

一切正常,相反,如果我运行:
spark.readStream().format("mysource").load()
.writeStream().format("console").outputMode(OutputMode.Append()).start();

我无法分享数据编写器的完整代码。无论如何,我做了这样的事情:
class MySourceProvider extends DataSourceRegister with StreamWriteSupport {
def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = {
new MyStreamWriter(...)
}
def shortName(): String = {
"mywriter"
}
}

class MyStreamWriter(...) extends StreamWriter {
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
def createWriterFactory(): DataWriterFactory[Row] = {
new MyDataWriterFactory()
}
}

最佳答案

您需要在代码中添加 checkpointLocation

option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory



示例:
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
writeStream.
format("console").
option("truncate", false).
option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Update).
start

Regardig 您的问题有以下三个选择:

.option("startingOffsets", "latest") // read data from the end of the stream


  • 最早 - 从流的开头开始阅读。这不包括已经从 Kafka 中删除的数据,因为它超过了保留期(“过期”数据)。
  • latest — 现在开始,仅处理查询开始后到达的新数据。
  • per-partition assignment - 指定每个分区的精确偏移量,允许对处理应该从哪里开始进行细粒度控制。例如,如果我们想准确地找到其他系统或查询中断的地方,那么可以利用这个选项。

  • 如果找不到检查点位置的目录名称,createQuery 将报告 AnalysisException。
    checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)

    以下是 apache Spark 代码:
      private def createQuery(
    userSpecifiedName: Option[String],
    userSpecifiedCheckpointLocation: Option[String],
    df: DataFrame,
    extraOptions: Map[String, String],
    sink: BaseStreamingSink,
    outputMode: OutputMode,
    useTempCheckpointLocation: Boolean,
    recoverFromCheckpointLocation: Boolean,
    trigger: Trigger,
    triggerClock: Clock): StreamingQueryWrapper = {
    var deleteCheckpointOnStop = false
    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
    new Path(userSpecified).toUri.toString
    }.orElse {
    df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
    new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
    }
    }.getOrElse {
    if (useTempCheckpointLocation) {
    // Delete the temp checkpoint when a query is being stopped without errors.
    deleteCheckpointOnStop = true
    Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
    } else {
    throw new AnalysisException(
    "checkpointLocation must be specified either " +
    """through option("checkpointLocation", ...) or """ +
    s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
    }
    }

    关于java - Spark 流: avoid checkpointLocation check,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50936964/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com