gpt4 book ai didi

apache-spark - 为什么Spark saveAsTable 和bucketBy 创建了数千个文件?

转载 作者:行者123 更新时间:2023-12-02 07:04:52 30 4
gpt4 key购买 nike

上下文

Spark 2.0.1,集群模式下的spark-submit。我正在从 hdfs 读取 Parquet 文件:

val spark = SparkSession.builder
.appName("myApp")
.config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
.config("spark.sql.sources.bucketing.enabled", true)
.enableHiveSupport()
.getOrCreate()

val df = spark.read
.format("parquet")
.load("hdfs://XXX.XX.X.XX/myParquetFile")

我正在保存 df到一个包含 50 个桶的 Hive 表,按 userid 分组:

df0.write
.bucketBy(50, "userid")
.saveAsTable("myHiveTable")

现在,当我查看我的 hdfs 上的 hive 仓库时 /user/hive/warehouse有一个名为 myHiveTable 的文件夹。里面有一堆part-*.parquet文件。我预计会有 50 个文件。但不,有3201文件!!!!每个分区有64个文件,为什么?对于我保存为配置单元表的不同文件,每个分区有不同数量的文件。所有文件都非常小,每个只有几十Kb!

让我补充一下,不同的 userid 的数量关于1 000 000myParquetFile .

问题

为什么文件夹里有3201个文件而不是50个!它们是什么?

当我将该表读回 DataFrame 并打印分区数时:

val df2 = spark.sql("SELECT * FROM myHiveTable") 
println(df2.rdd.getNumPartitions)

分区数正确为50,我确认数据按userid正确分区.

对于我的一个 3Tb 大型数据集,我创建了一个包含 1000 个分区的表,这实际上创建了约百万个文件!它超出了目录项限制 1048576 并给出 org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException

问题

创建的文件数量取决于什么?

问题

有没有办法限制创建的文件数量?

问题

我应该担心这些文件吗?这是否会影响 df2 的性能拥有所有这些文件?人们总是说我们不应该创建太多分区,因为这会带来问题。

问题

我找到了此信息HIVE Dynamic Partitioning tips文件的数量可能与映射器的数量有关。建议使用distribute by插入到配置单元表时。我怎样才能在 Spark 中做到这一点?

问题

如果问题确实如上面的链接所示,这里How to control the file numbers of hive table after inserting data on MapR-FS他们建议使用诸如 hive.merge.mapfiles 之类的选项或hive.merge.mapredfiles在 MapReduce 作业后合并所有小文件。 Spark 中有此选项吗?

最佳答案

请使用spark sql,它将使用HiveContext将数据写入Hive表,因此它将使用您在表架构中配置的存储桶数量。

 SparkSession.builder().
config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("hive.execution.engine","tez").
config("hive.exec.max.dynamic.partitions","400").
config("hive.exec.max.dynamic.partitions.pernode","400").
config("hive.enforce.bucketing","true").
config("optimize.sort.dynamic.partitionining","true").
config("hive.vectorized.execution.enabled","true").
config("hive.enforce.sorting","true").
enableHiveSupport().getOrCreate()

spark.sql(s"insert into hiveTableName partition (partition_column) select * from myParquetFile")

spark 的存储桶实现不遵守指定数量的存储桶大小。每个分区都写入一个单独的文件,因此每个存储桶最终都会有很多文件。

请引用此链接https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil

enter image description here希望这会有所帮助。

拉维

关于apache-spark - 为什么Spark saveAsTable 和bucketBy 创建了数千个文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48585744/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com