gpt4 book ai didi

apache-spark - Spark Parquet 装载机 : Reduce number of jobs involved in listing a dataframe's files

转载 作者:行者123 更新时间:2023-12-03 07:45:44 34 4
gpt4 key购买 nike

我正在通过

将 Parquet 数据加载到数据框中
spark.read.parquet('hdfs:///path/goes/here/...')

由于 parquet 分区,该路径中约有 50k 个文件。当我运行该命令时,spark 会生成数十个小作业,这些作业总体上需要几分钟才能完成。以下是 Spark UI 中作业的样子:

enter image description here

正如您所看到的,虽然每个作业都有大约 2100 个任务,但它们执行速度很快,大约需要 2 秒。启动如此多的“迷你作业”效率很低,导致此文件列出步骤需要大约 10 分钟(其中集群资源大部分处于空闲状态,并且集群主要处理分散的任务或管理作业/任务的开销)。

如何将这些任务合并为更少的作业,而每个作业又包含更多的任务? 也适用于 pyspark 的解决方案的奖励积分。

我正在 hadoop 2.8.3 上通过 pyspark 运行 Spark 2.2.1。

最佳答案

我相信您遇到了一个错误,我的一位前同事已为此提交了票证并提出了拉取请求。您可以查看here 。如果它适合您的问题,您最好的办法可能是对该问题进行投票并在邮件列表上发出一些关于它的声音。

您可能想要做的是调整 spark.sql.sources.parallelPartitionDiscovery.thresholdspark.sql.sources.parallelPartitionDiscovery.parallelism 配置参数(使用前者在链接票证中被引用)以适合您工作的方式。

您可以看看herehere查看如何使用配置 key 。为了完整起见,我将在此处分享相关片段。

spark.sql.sources.parallelPartitionDiscovery.threshold

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
}
}

spark.sql.sources.parallelPartitionDiscovery.parallelism

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)

此配置的阈值默认值为 32,并行度默认值为 10000(相关代码 here )。

<小时/>

就您而言,我想说您可能想要做的是设置阈值,以便进程在不生成并行作业的情况下运行。

注意

链接的源来自撰写本文时最新的可用标记版本 2.3.0。

关于apache-spark - Spark Parquet 装载机 : Reduce number of jobs involved in listing a dataframe's files,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49133228/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com