gpt4 book ai didi

apache-spark - 为什么 Spark 两次处理相同的数据?

转载 作者:行者123 更新时间:2023-12-04 05:20:55 25 4
gpt4 key购买 nike

当最简单的 Spark 应用程序似乎完成了两次相同的工作时,我陷入了一种奇怪的境地。

我所做的

应用程序本身执行查询:

SELECT date, field1, field2, ..., field10
FROM table1
WHERE field1 = <some number>
AND date BETWEEN date('2018-05-01') AND date('2018-05-30')
ORDER BY 1

并将结果存储到 HDFS 中。
table1 表是一堆存储在 HDFS 上的 parquet 文件,分区如下
/root/date=2018-05-01/hour=0/data-1.snappy.parquet
/root/date=2018-05-01/hour=0/data-2.snappy.parquet
...
/root/date=2018-05-01/hour=1/data-1.snappy.parquet
...
/root/date=2018-05-02/hour=0/data-1.snappy.parquet
...
etc.

所有 Parquet 文件的大小都从 700M 到 2G,并且具有相同的架构:10 个 intbigint 类型的非空字段。

应用程序的结果很小——只有几千行。

我的 spark 应用程序以集群模式在 YARN 上运行。基本 Spark 参数是
spark.driver.memory=2g
spark.executor.memory=4g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.submit.deployMode=cluster

在执行期间,几个容器被抢占,没有错误和失败发生。一次尝试就完成了整个应用程序。

奇怪的事

Spark UI 截图:
  • main screen
  • stage 2
  • stage 4

  • 可以看出,第 2 阶段和第 4 阶段都处理了相同数量的输入行,但第 4 阶段也做了一些改组(那些是结果行)。失败的任务是那些容器被抢占的任务。

    所以看起来我的应用程序处理了两次相同的文件。

    我不知道这怎么可能以及发生了什么。请帮助我理解为什么 Spark 会做这么奇怪的事情。

    实际体力计划:
    == Physical Plan ==
    Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://hadoop/root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB, false, CSV, Map(path -> /root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB), Overwrite, [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
    +- Coalesce 16
    +- *(2) Sort [date#10 ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(date#10 ASC NULLS FIRST, 200)
    +- *(1) Project [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
    +- *(1) Filter (isnotnull(field1#1L) && (field1#1L = 1234567890))
    +- *(1) FileScan parquet default.table1[field0#0L,field1#1L,field2#2L,field3#3L,field4#4,field5#5,field6#6L,field7#7,date#10,hour#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hadoop/table1], PartitionCount: 714, PartitionFilters: [(date#10 >= 17652), (date#10 <= 17682)], PushedFilters: [IsNotNull(field1), EqualTo(field1,1234567890)], ReadSchema: struct<field0:bigint,field1:bigint,field2:bigint,field3:bigint,field4:int,field5:int,field6:bigint,field7:...

    以下是第 2 阶段和第 4 阶段的 DAG:
  • stage 2
  • stage 4
  • 最佳答案

    我遇到了这个完全相同的问题,事实证明这种行为是完全正常的。

    我在一个 Spark 作业中观察到这种行为,它只是从 HDFS 读取数据,进行一些轻量级处理,并使用 orderBy在写回 HDFS 之前对列进行排序的方法。在 Spark UI 中,我看到两个作业将扫描整个 6 TB 表,就像您所做的一样。第一个作业使用很少的内存,没有写入 shuffle 记录,也没有向 HDFS 写入任何记录。

    事实证明,根本原因是在实际对数据进行排序之前,Spark 执行了一个采样操作,帮助它定义了一个 RangePartitioner。它用于对其排序算法进行数据分区:它需要知道定义排序键的列中数据的大致范围才能定义一个好的RangePartitioner .

    这个操作在这个博客上提到:

    https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/

    这个 StackOverflow 帖子:

    How does Spark achieve sort order?

    以及霍顿·卡劳 (Holden Karau) 和雷切尔·沃兰 (Rachel Warran) 合着的伟大著作“高性能 Spark ”,第 1 页。 143.

    就我而言,我知道键的范围,所以我想到原则上我应该能够定义 RangePartitioner先验。然而,我在 Spark 源代码中挖掘了它的 sort方法,但我没有找到任何可以明确传递范围的解决方法。

    关于apache-spark - 为什么 Spark 两次处理相同的数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51083673/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com