gpt4 book ai didi

scala - 非法状态异常 : _spark_metadata/0 doesn't exist while compacting batch 9

转载 作者:行者123 更新时间:2023-12-03 14:48:44 25 4
gpt4 key购买 nike

我们使用 Spark Structured Streaming 实现了流应用程序,它尝试从 Kafka 主题读取数据并将其写入 HDFS 位置。
有时应用程序失败并出现异常:

_spark_metadata/0 doesn't exist while compacting batch 9
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
我们无法解决此问题。
我找到的唯一解决方案是删除检查点位置文件,一旦我们再次运行应用程序,这将使作业从头开始读取主题/数据。然而,这对于生产应用来说并不是一个可行的解决方案。
有没有人在不删除检查点的情况下解决此错误,以便我可以从上次运行失败的地方继续?
应用示例代码:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

[...] // do some processing

dfProcessed.writeStream
.format("csv")
.option("format", "append")
.option("path",hdfsPath)
.option("checkpointlocation","")
.outputmode(append)
.start

最佳答案

错误信息

_spark_metadata/n.compact doesn't exist when compacting batch n+10
当你出现时可以出现
  • 将一些数据处理到启用检查点的 FileSink 中,然后
  • 停止您的流媒体作业,然后
  • 更改 FileSink 的输出目录,同时保持相同的 checkpointLocation,然后
  • 重新启动流作业

  • 解决方案
    由于您不想删除检查点文件,您可以简单地从 复制丢失的 spark 元数据文件。旧 文件接收器输出路径到 新品 输出路径。请参阅下文以了解什么是“丢失的 Spark 元数据文件”。
    背景
    要理解为什么会这样 IllegalStateException正在抛出,我们需要了解在提供的文件输出路径中幕后发生了什么。让 outPathBefore是此路径的名称。当您的流作业正在运行并处理数据时,该作业会创建一个文件夹 outPathBefore/_spark_metadata .在该文件夹中,您将找到一个以微批次标识符命名的文件,其中包含数据已写入的文件(分区文件)列表,例如:
    /home/mike/outPathBefore/_spark_metadata$ ls
    0 1 2 3 4 5 6 7
    在这种情况下,我们有 8 个微批次的详细信息。其中一个文件的内容看起来像
    /home/mike/outPathBefore/_spark_metadata$ cat 0
    v1
    {"path":"file:///tmp/file/before/part-00000-99bdc705-70a2-410f-92ff-7ca9c369c58b-c000.csv","size":2287,"isDir":false,"modificationTime":1616075186000,"blockReplication":1,"blockSize":33554432,"action":"add"}
    默认情况下,每十个微批次,这些文件都会被压缩,这意味着文件 0、1、2、...、9 的内容将存储在名为 9.compact 的压缩文件中。 .
    此过程在随后的十个批次中连续,即在微批次 19 中,作业聚合了最后 10 个文件,即 9.compact, 10, 11, 12, ..., 19。
    现在,假设您的流作业一直运行到微批处理 15,这意味着该作业已创建以下文件:
    /home/mike/outPathBefore/_spark_metadata/0
    /home/mike/outPathBefore/_spark_metadata/1
    ...
    /home/mike/outPathBefore/_spark_metadata/8
    /home/mike/outPathBefore/_spark_metadata/9.compact
    /home/mike/outPathBefore/_spark_metadata/10
    ...
    /home/mike/outPathBefore/_spark_metadata/15
    在第十五个微批处理之后,您停止了流式作业并将文件接收器的输出路径更改为 outPathAfter .当您保持相同的 checkpointLocation 时,流作业将继续使用 micro-batch 16。但是,它现在在新的输出路径中创建元数据文件:
    /home/mike/outPathAfter/_spark_metadata/16
    /home/mike/outPathAfter/_spark_metadata/17
    ...
    现在,这就是抛出异常的地方:当达到微批次 19 时,该作业会尝试压缩 spark 元数据文件夹中的第 10 个最新文件。但是,它只能找到文件 16, 17, 18 但它没有找到 9.compact, 10 等。因此错误消息说:
    java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
    文档
    结构化流编程指南在 Recovery Semantics after Changes in a Streaming Query 上进行了解释:

    "Changes to output directory of a file sink are not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")"


    Databricks 还在文章 Streaming with File Sink: Problems with recovery if you change checkpoint or output directories 中写了一些细节。

    关于scala - 非法状态异常 : _spark_metadata/0 doesn't exist while compacting batch 9,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56390492/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com