gpt4 book ai didi

scala - Spark 数据帧 saveAsTable 正在使用单个任务

转载 作者:行者123 更新时间:2023-12-02 01:30:13 34 4
gpt4 key购买 nike

我们有一个初始阶段可以适当扩展的管道 - 每个使用几十个 worker 。

最后阶段之一是

dataFrame.write.format(outFormat).mode(saveMode).
partitionBy(partColVals.map(_._1): _*).saveAsTable(tname)

对于这个阶段,我们最终只有一个 worker 。这显然对我们不起作用——事实上,worker 用完了磁盘空间——而且速度很慢。

enter image description here

为什么该命令最终只会在单个工作人员/单个任务上运行?

更新 输出格式为 parquet .分区列的数量不影响结果(尝试一列和几列)。

另一个更新 不符合以下任何条件(如下面的答案所假定):
  • coalescepartitionBy声明
  • window/解析函数
  • Dataset.limit
  • sql.shuffle.partitions
  • 最佳答案

    该问题不太可能与 saveAsTable 有任何关系。 .

    阶段中的单个任务表示输入数据( DatasetRDD )只有一个分区。这与有多个任务但一个或多个任务的执行时间明显更长的情况形成对比,这通常对应于包含正偏斜键的分区。此外,您应该混淆具有低 CPU 利用率的单个任务场景。前者通常是 IO 吞吐量不足的结果(高 CPU 等待时间是最明显的迹象),但在极少数情况下可以追溯到使用具有低级同步原语的共享对象。

    由于标准数据源不会在写入时混洗数据(包括使用 partitionBybucketBy 选项的情况),因此可以安全地假设数据已在上游代码中的某处重新分区。通常这意味着发生了以下情况之一:

  • 数据已使用 coalesce(1) 显式移动到单个分区或 repartition(1) .
  • 数据已隐式移动到单个分区,例如:
  • Dataset.limit
  • 缺少窗口定义的窗口函数应用 PARTITION BY条款。
    df.withColumn(
    "row_number",
    row_number().over(Window.orderBy("some_column"))
    )
  • sql.shuffle.partitions选项设置为 1,上游代码包括对 Dataset 的非本地操作.
  • Dataset是应用全局聚合函数的结果(没有 GROUP BY caluse)。这通常不是问题,除非函数是非归约的(collect_list 或类似的)。

  • 虽然没有证据表明这是这里的问题,但在一般情况下,您也应该有可能,数据只包含一个分区,一直到源。这通常在获取输入时发生 using JDBC source ,但第 3 方格式可以表现出相同的行为。

    要确定问题的根源,您应该检查输入 Dataset 的执行计划。 ( explain(true) ) 或检查 Spark Web UI 的 SQL 选项卡。

    关于scala - Spark 数据帧 saveAsTable 正在使用单个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34733732/

    34 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com