gpt4 book ai didi

apache-spark - 加入一个巨大而巨大的 Spark 数据框

转载 作者:行者123 更新时间:2023-12-04 04:45:01 24 4
gpt4 key购买 nike

我有两个数据帧,df1 有 600 万行,df2 有 10 亿行。

我尝试过标准的 df1.join(df2,df1("id")<=>df2("id2")) ,但内存不足。

df1 太大,无法放入广播连接。

我什至尝试过布隆过滤器,但它也太大了,无法放入广播中,但仍然有用。

我尝试过的唯一不会出错的方法是将 df1 分成 300,000 个行块并在 foreach 循环中与 df2 连接。但这比它可能需要的时间长一个数量级(可能是因为它太大而无法作为持久化,导致它重做分割到那个点)。重新组合结果也需要一段时间。

你是如何解决这个问题的?

一些注意事项:

df1 是 df2 的子集。 df1=df2.where("fin<1").selectExpr("id as id2").distinct() 我对 df2 中的所有行都感兴趣,这些行的 id 曾经有一个 fin<1,这意味着我不能一步完成。

df2 中大约有 2 亿个唯一 ID。

以下是一些相关的 Spark 设置:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000

我得到的错误是:
16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)


Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory

最佳答案

正如我所见,您有分区太大的问题(可能是由于数据较大)
您可以尝试几种方法:

  • 尝试将 spark.sql.shuffle.partitions 定义为 2048 甚至更多(默认为 200)。加入你的 df-s 时会有 shuffle。尝试使用此参数,以便更大数据的总量/此参数将约为 64Mb-100Mb(取决于文件格式)。一般来说,你应该在 spark UI 中看到每个任务(每个分区)处理“正常”的数据量(最大 64MB-100MB)
  • 如果第一个不起作用,我可以建议在 RDD api 中加入这个。将您的 df 转换为 RDD。然后通过 HashPartitioner(分区数)对两个 RDD 进行分区。什么时候应该像我之前描述的那样计算分区数。
  • 最近,spark 开发人员添加了新选项:您可以将巨大的表存储到 N 个存储桶中(即存储它以备加入)。存在的限制很少,但它可以完全消除混洗的大量数据。 bucketBy 仅支持 saveAsTable api 而不是 save 一个。在您使用 bucketBy 数据并将其分桶后,然后在下一次迭代中,您可以将此数据作为外部表加载,同时提供分桶规范(请参阅 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html )

    CREATE TABLE 巨大的
    --...在这里你必须指定架构
    使用 Parquet
    CLUSTERED BY (a,b,c) INTO N 个桶
    位置 'hdfs://your-path'

  • 然后,当您将巨大的表加载为分桶表时,您可以加载大表并将其重新分区为相同数量的存储桶和相同的列(df.repartition(N, a,b,c))

    关于apache-spark - 加入一个巨大而巨大的 Spark 数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35948714/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com