gpt4 book ai didi

apache-spark - Spark 是否知道 DataFrame 的分区键?

转载 作者:行者123 更新时间:2023-12-03 21:28:19 27 4
gpt4 key购买 nike

我想知道 Spark 是否知道 parquet 文件的分区键并使用此信息来避免洗牌。

上下文:

运行 Spark 2.0.1 运行本地 SparkSession。我有一个 csv 数据集,我将其保存为磁盘上的 Parquet 文件,如下所示:

val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")

我正在按列创建 42 个分区 numerocarte .这应该将多个 numerocarte 分组。到同一个分区。我不想在 write 上做 partitionBy("numerocarte")时间,因为我不希望每张卡有一个分区。这将是数以百万计的人。

之后在另一个脚本中我读到了 SomeFile.parquet parquet 文件并对其进行一些操作。特别是我正在运行 window function在它上面分区是在 parquet 文件被重新分区的同一列上完成的。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
sum(col("dollars").over(w))

之后 read我可以看到 repartition按预期工作,DataFrame df2有 42 个分区,每个分区都有不同的卡。

问题:
  • Spark 是否知道数据框 df2由列 numerocarte 划分?
  • 如果它知道,那么窗口函数中就不会有洗牌了。真的?
  • 如果它不知道,它将在窗口函数中进行随机播放。真的?
  • 如果它不知道,我如何告诉 Spark 数据已经被右列分区了?
  • 如何检查 DataFrame 的分区键?有这个命令吗?我知道如何检查分区数,但如何查看分区键?
  • 当我在每一步之后打印文件中的分区数时,read 之后我有 42 个分区和 withColumn 之后的 200 个分区这表明 Spark 重新分区了我的 DataFrame .
  • 如果我有两个使用同一列重新分区的不同表,连接会使用该信息吗?
  • 最佳答案

    我正在回答我自己的问题以供将来引用什么有效。

    根据@user8371915 的建议,bucketBy 有效!

    我正在保存我的 DataFrame df :

    df.write
    .bucketBy(250, "userid")
    .saveAsTable("myNewTable")

    然后当我需要加载这个表时:
    val df2 = spark.sql("SELECT * FROM myNewTable")

    val w = Window.partitionBy("userid")

    val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
    df3.explain

    我确认当我在 df2 上执行窗口功能时由 userid 划分没有洗牌!谢谢@user8371915!

    我在调查它时学到的一些东西
  • myNewTable 看起来像一个普通的 Parquet 文件,但它不是。您可以使用 spark.read.format("parquet").load("path/to/myNewTable") 正常阅读。但是 DataFrame以这种方式创建的不会保留原来的分区!您必须使用 spark.sql select正确分区 DataFrame .
  • 您可以使用 spark.sql("describe formatted myNewTable").collect.foreach(println) 查看表格内部.这将告诉您哪些列用于分桶以及有多少个桶。
  • 利用分区的窗口函数和连接通常也需要排序。您可以在写入时使用 .sortBy() 对存储桶中的数据进行排序。并且排序也将保留在配置单元表中。 df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
  • 在本地模式下工作时,表 myNewTable保存到 spark-warehouse我本地 Scala SBT 项目中的文件夹。通过 spark-submit 使用 mesos 以集群模式保存时,保存到hive仓库。对我来说,它位于 /user/hive/warehouse .
  • 当做spark-submit您需要添加到您的 SparkSession两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083").enableHiveSupport() .否则,您创建的配置单元表将不可见。
  • 如果要将表保存到特定数据库,请执行 spark.sql("USE your database")装桶前。

  • 2018 年 5 月 2 日更新

    我在使用 spark 分桶和创建 Hive 表时遇到了一些问题。请引用 Why is Spark saveAsTable with bucketBy creating thousands of files? 中的问题、回复和评论

    关于apache-spark - Spark 是否知道 DataFrame 的分区键?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48459208/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com