gpt4 book ai didi

apache-spark - 外部覆盖后 Spark 和 Hive 表架构不同步

转载 作者:行者123 更新时间:2023-12-04 03:09:34 28 4
gpt4 key购买 nike

我在使用 Spark 2.1.0 和 Hive 2.1.1 的 Mapr 集群上的 Spark 和 Hive 之间的 Hive 表架构不同步时遇到问题。

我需要尝试专门针对托管表解决此问题,但可以使用非托管/外部表重现该问题。

步骤概述

  • 使用 saveAsTable将数据帧保存到给定的表。
  • 使用 mode("overwrite").parquet("path/to/table")覆盖先前保存的表的数据。我实际上是通过 Spark 和 Hive 外部的进程修改数据,但这会重现相同的问题。
  • 使用 spark.catalog.refreshTable(...)刷新元数据
  • spark.table(...).show() 查询表.原始数据帧和覆盖数据帧之间相同的任何列都将正确显示新数据,但不会显示仅在新表中的任何列。

  • 例子

    db_name = "test_39d3ec9"
    table_name = "overwrite_existing"
    table_location = "<spark.sql.warehouse.dir>/{}.db/{}".format(db_name, table_name)

    qualified_table = "{}.{}".format(db_name, table_name)
    spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(db_name))

    另存为托管表

    existing_df = spark.createDataFrame([(1, 2)])
    existing_df.write.mode("overwrite").saveAsTable(table_name)

    请注意,使用以下内容保存为非托管表会产生相同的问题:

    existing_df.write.mode("overwrite") \
    .option("path", table_location) \
    .saveAsTable(qualified_table)

    查看表的内容

    spark.table(table_name).show()
    +---+---+
    | _1| _2|
    +---+---+
    | 1| 2|
    +---+---+

    直接覆盖 Parquet 文件

    new_df = spark.createDataFrame([(3, 4, 5, 6)], ["_4", "_3", "_2", "_1"])
    new_df.write.mode("overwrite").parquet(table_location)

    使用 parquet reader 查看内容,内容显示正确

    spark.read.parquet(table_location).show()
    +---+---+---+---+
    | _4| _3| _2| _1|
    +---+---+---+---+
    | 3| 4| 5| 6|
    +---+---+---+---+

    刷新表的 spark 元数据并作为表再次读入。将更新相同列的数据,但不会显示其他列。

    spark.catalog.refreshTable(qualified_table)
    spark.table(qualified_table).show()
    +---+---+
    | _1| _2|
    +---+---+
    | 6| 5|
    +---+---+

    我还尝试在调用 spark.catalog.refreshTable 之前更新 hive 中的架构在 hive shell 中使用以下命令:

    ALTER TABLE test_39d3ec9.overwrite_existing REPLACE COLUMNS (`_1` bigint, `_2` bigint, `_3` bigint, `_4` bigint);

    运行 ALTER 命令后,我运行 describe 并在 hive 中正确显示

    DESCRIBE test_39d3ec9.overwrite_existing
    OK
    _1 bigint
    _2 bigint
    _3 bigint
    _4 bigint

    在运行 alter 命令之前,它只按预期显示原始列

    DESCRIBE test_39d3ec9.overwrite_existing
    OK
    _1 bigint
    _2 bigint

    然后我跑了 spark.catalog.refreshTable但这并没有影响 spark 对数据的看法。

    补充说明

    在 spark 方面,我使用 PySpark 进行了大部分测试,但也在 spark-shell (scala) 和 sparksql shell 中进行了测试。在 Spark shell 中,我也尝试使用 HiveContext但没有用。

    import org.apache.spark.sql.hive.HiveContext
    import spark.sqlContext.implicits._
    val hiveObj = new HiveContext(sc)
    hiveObj.refreshTable("test_39d3ec9.overwrite_existing")

    在 hive shell 中执行 ALTER 命令后,我在 Hue 中验证了架构也在那里发生了变化。

    我还尝试使用 spark.sql("ALTER ...") 运行 ALTER 命令但是我们使用的 Spark 版本 (2.1.0) 不允许它,并且基于此问题,它看起来直到 Spark 2.2.0 才可用: https://issues.apache.org/jira/browse/SPARK-19261

    我也再次通读了 spark 文档,特别是本节: https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#hive-metastore-parquet-table-conversion

    基于这些文档, spark.catalog.refreshTable应该管用。 spark.sql.hive.convertMetastoreParquet的配置通常是 false ,但我把它切换到 true用于测试,它似乎没有任何影响。

    任何帮助将不胜感激,谢谢!

    最佳答案

    我在 CDH 5.11.x 包中使用 spark 2.2.0 时遇到了类似的问题。

    spark.write.mode("overwrite").saveAsTable()当我发出 spark.read.table().show不会显示任何数据。

    在检查时,我发现这是 CDH spark 2.2.0 版本的一个已知问题。解决方法是在执行 saveAsTable 命令后运行以下命令。

    spark.sql("ALTER TABLE qualified_table set SERDEPROPERTIES ('path'='hdfs://{hdfs_host_name}/{table_path}')")

    spark.catalog.refreshTable("qualified_table")

    例如:如果您的表 LOCATION
    就像 hdfs://hdfsHA/user/warehouse/example.db/qualified_table
    然后分配 'path'='hdfs://hdfsHA/user/warehouse/example.db/qualified_table'

    这对我有用。试一试。我想现在你的问题已经解决了。如果没有你可以试试这个方法。

    解决方法来源: https://www.cloudera.com/documentation/spark2/2-2-x/topics/spark2_known_issues.html

    关于apache-spark - 外部覆盖后 Spark 和 Hive 表架构不同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49201436/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com