gpt4 book ai didi

apache-spark - Spark Partitionby 无法按预期扩展

转载 作者:行者123 更新时间:2023-12-03 16:22:16 27 4
gpt4 key购买 nike

输入:
输入数据集包含以 Parquet 形式存储的多个文件中的 1000 万笔交易。包括所有文件在内的整个数据集的大小范围为 6 到 8GB。

问题说明:
根据客户 ID 对交易进行分区,这将为每个客户 ID 创建一个文件夹,每个文件夹包含该特定客户完成的所有交易。

HDFS 对根目录中可以创建的子目录数量有 640 万个硬性限制,因此使用客户 ID 的最后两位数字(范围从 00、01、02...到 99)来创建顶级目录和每个顶级目录将包含所有以该特定两位数字结尾的客户 ID。

示例输出目录结构:

00/cust_id=100900/part1.csv
00/cust_id=100800/part33.csv
01/cust_id=100801/part1.csv
03/cust_id=100803/part1.csv

代码:

// Reading input file and storing in cache
val parquetReader = sparksession.read
.parquet("/inputs")
.persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory

// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
var idEndPattern = customerIdEndingPattern + ""
if (customerIdEndingPattern < 10) {
idEndPattern = "0" + customerIdEndingPattern
}

parquetReader
.filter(col("customer_id").endsWith(idEndPattern))
.repartition(945, col("customer_id"))
.write
.partitionBy("customer_id")
.option("header", "true")
.mode("append")
.csv("/" + idEndPattern)
customerIdEndingPattern = customerIdEndingPattern + 1
}

Spark 配置:
亚马逊 EMR 5.29.0(Spark 2.4.4 和 Hadoop 2.8.5)

1 个主节点和 10 个从节点,每个节点都有 96 个 vCore 和 768GB RAM(Amazon AWS R5.24xlarge 实例)。硬盘是 EBS,具有 3000 IOPS 的半身像,持续 30 分钟。
            'spark.hadoop.dfs.replication': '3',
'spark.driver.cores':'5',
'spark.driver.memory':'32g',
'spark.executor.instances': '189',
'spark.executor.memory': '32g',
'spark.executor.cores': '5',
'spark.executor.memoryOverhead':'8192',
'spark.driver.memoryOverhead':'8192',
'spark.default.parallelism':'945',
'spark.sql.shuffle.partitions' :'945',
'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
'spark.dynamicAllocation.enabled': 'false',
'spark.memory.fraction':'0.8',
'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
'spark.memory.storageFraction':'0.2',
'spark.task.maxFailures': '6',
'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"

缩放问题:
  • 从 10 到一直到 40 个从站进行实验(相应地调整 spark 配置),但结果仍然相同,作业需要 2 小时以上才能完成(如第一张图片所示,每个作业需要超过一分钟,而 while 循环运行 99次)。此外,来自远程执行程序的读取几乎不存在(这很好),大多数是本地进程。
  • 分区似乎工作正常(请参阅第二张图片)每个实例有 5 个 RDD 块,并且始终运行 5 个任务(每个实例有 5 个内核,每个从节点有 19 个实例)。 GC 也进行了优化。
  • 在 while 循环中写入的每个 partitionby 任务都需要一分钟或更长时间才能完成。

  • 指标:

    几个工作的样本持续时间我们总共有 99 个工作

    Duration for each of the jobs(totally 99)

    分区似乎没问题

    Partition seems okay

    1 个作业的摘要基本上是一个分区执行

    Summary of 1 job

    完整作业完成后的一些实例摘要,因此 RDD 块为零,第一行是驱动程序。
    enter image description here


    所以问题是如何进一步优化它以及为什么它没有扩大规模?有没有更好的方法来解决它?我已经达到最高性能了吗?假设我可以访问更多的硬件资源,还有什么我可以做得更好的吗?欢迎任何建议。

    最佳答案

    触摸每条记录 100 次是非常低效的,即使数据可以缓存在内存中并且不会被下游逐出。更何况一个人坚持是昂贵的

    相反,您可以添加一个虚拟列

    import org.apache.spark.sql.functions.substring

    val df = sparksession.read
    .parquet("/inputs")
    .withColumn("partition_id", substring($"customer_id", -2, 2))

    并稍后用于分区

    df
    .write
    .partitionBy("partition_id", "customer_id")
    .option("header", "true")
    .mode("append")
    .csv("/")

    avoid to many small files您可以先使用更长的后缀重新分区

    val nParts: Int = ???
    val suffixLength: Int = ??? // >= suffix length used for write partitions

    df
    .repartitionByRange(
    nParts,
    substring($"customer_id", -suffixLength, suffixLength)
    .write
    .partitionBy("partition_id", "customer_id")
    .option("header", "true")
    .mode("append")
    .csv("/")

    此类更改将允许您一次性处理所有数据,而无需任何显式缓存。

    关于apache-spark - Spark Partitionby 无法按预期扩展,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60165484/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com