gpt4 book ai didi

apache-spark - 读取拼花文件时刷新 Dataframe 的元数据

转载 作者:行者123 更新时间:2023-12-04 04:17:03 27 4
gpt4 key购买 nike

我正在尝试将 Parquet 文件作为将定期更新的数据框读取(路径为 /folder_name 。每当新数据出现时,旧 Parquet 文件路径( /folder_name )将被重命名为临时路径,然后我们将两者结合起来新数据和旧数据将存储在旧路径中(/folder_name)

发生的情况是假设我们有一个 Parquet 文件 hdfs://folder_name/part-xxxx-xxx.snappy.parquet更新前和更新后更改为 hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet
发生的问题是当我尝试在更新完成时读取 Parquet 文件时

sparksession.read.parquet("filename") => 它采用旧路径 hdfs://folder_name/part-xxxx-xxx.snappy.parquet (路径存在)

当对数据帧调用操作时,它会尝试从 hdfs://folder_name/part-xxxx-xxx.snappy.parquet 读取数据但是由于更新,文件名发生了变化,我遇到了以下问题

java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet底层文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或通过重新创建所涉及的数据集/数据帧来显式地使 Spark 中的缓存无效。

我正在使用 Spark 2.2

谁能帮助我如何刷新元数据?

最佳答案

当您尝试读取不存在的文件时会发生该错误。

如果我错了,请纠正我,但我怀疑您在保存新数据帧时覆盖了所有文件(使用 .mode("overwrite") )。在此进程运行时,您正在尝试读取已删除的文件并抛出该异常 - 这使得该表在一段时间内(更新期间)不可用。

据我所知,没有你想要的“刷新元数据”的直接方法。

解决这个问题的两种(几种可能的)方法:

1 - 使用附加模式

如果您只想将新数据帧附加到旧数据帧,则无需创建临时文件夹并覆盖旧文件夹。您可以将保存模式从覆盖更改为追加。通过这种方式,您可以将分区添加到现有 Parquet 文件中,而无需重写现有分区。

df.write
.mode("append")
.parquet("/temp_table")

这是迄今为止最简单的解决方案,无需读取已存储的数据。但是,如果您必须更新旧数据(例如:如果您正在执行更新插入),这将不起作用。为此,您有选项 2:

2 - 使用 Hive View

您可以创建配置单元表并使用 View 指向最新的(和可用的)表。

以下是此方法背后逻辑的示例:

第 1 部分
  • 如果查看 <table_name>不存在我们创建一个名为的新表<table_name>_alpha0存储新数据
  • 创建表后
    我们创建一个 View <table_name>select * from
    <table_name>_alpha0

  • 第 2 部分
  • 如果查看 <table_name>存在我们需要查看它指向哪个表 (<table_name>_alphaN)
  • 您对新数据执行所有您想要的操作,将其保存为名为 <table_name>_alpha(N+1) 的表。
  • 创建表后,我们更改 View <table_name>select * from <table_name>_alpha(N+1)

  • 和一个代码示例:
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.types._
    import spark.implicits._


    //This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

    def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
    if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {

    val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
    .filter("col_name == 'View Text'")
    .rdd

    if(rdd_desc.isEmpty()) {
    None
    }
    else {
    Option(
    rdd_desc.first()
    .get(1)
    .toString
    .toLowerCase
    .stripPrefix("select * from ")
    )
    }
    }
    else
    None
    }

    //This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.

    def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
    val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
    val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")

    new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

    spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
    }

    //An example on how to use this:

    //SparkSession: spark
    val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
    val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
    val dbName = "test_db"
    val tableName = "alpha_test_table"

    println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
    println("Saving dataframe")

    saveDataframe(spark, dbName, tableName, df)

    println("Dataframe saved")
    println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
    spark.read.table(s"${dbName}.${tableName}").show

    val processed_df = df.unionByName(new_data) //Or other operations you want to do

    println("Saving new dataframe")
    saveDataframe(spark, dbName, tableName, processed_df)

    println("Dataframe saved")
    println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
    spark.read.table(s"${dbName}.${tableName}").show

    结果:
    Current table: Table does not exist
    Saving dataframe
    Dataframe saved
    Current table: test_db.alpha_test_table_alpha0
    +---+---------+
    | id| text|
    +---+---------+
    | 3| a|
    | 4|dataframe|
    | 1| I|
    | 2| am|
    +---+---------+

    Saving new dataframe
    Dataframe saved
    Current table: test_db.alpha_test_table_alpha1
    +---+---------+
    | id| text|
    +---+---------+
    | 3| a|
    | 4|dataframe|
    | 5| with|
    | 6| new|
    | 7| data|
    | 1| I|
    | 2| am|
    +---+---------+

    通过这样做,您可以保证 View 版本 <table_name>将永远可用。这也具有维护表的先前版本的优点(或不具有,取决于您的情况)。即 <table_name_alpha1> 的先前版本将是 <table_name_alpha0>
    3 - 奖金

    如果可以选择升级 Spark 版本,请查看 Delta Lake (最低 Spark 版本:2.4.2)

    希望这可以帮助 :)

    关于apache-spark - 读取拼花文件时刷新 Dataframe 的元数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58762158/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com