gpt4 book ai didi

apache-spark - Spark + Parquet + Snappy : Overall compression ratio loses after spark shuffles data

转载 作者:行者123 更新时间:2023-12-04 20:29:52 28 4
gpt4 key购买 nike

社区!

请帮助我了解如何使用 Spark 获得更好的压缩比?

让我描述一下案例:

  • 我有数据集,我们称它为 HDFS 上的 产品 ,它是使用 Sqoop ImportTool as-parquet-file 使用编解码器 snappy 导入的。作为导入的结果,我有 100 个文件,总共 46 GB du,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数略高于 80 亿,有 84 列
  • 我正在使用 snappy 使用 Spark 进行简单的读取/重新分区/写入,结果我得到:

  • ~ 100 GB 输出大小,具有相同的文件数、相同的编解码器、相同的计数和相同的列。

    代码片段:
    val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

    productDF
    .repartition(100)
    .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
    .option("compression", "snappy")
    .parquet("/processed/product/20180215/04-37/read_repartition_write/general")
  • 使用 parquet-tools 我查看了来自摄取和处理的随机文件,它们如下所示:

  • 摄取:
    creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
    extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

    and almost all columns looks like
    AVAILABLE: OPTIONAL INT64 R:0 D:1

    row group 1: RC:3640100 TS:36454739 OFFSET:4

    AVAILABLE: INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]

    处理:
    creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
    extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"

    AVAILABLE: OPTIONAL INT64 R:0 D:1
    ...

    row group 1: RC:6660100 TS:243047789 OFFSET:4

    AVAILABLE: INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]

    另一方面,如果没有重新分区或使用合并 - 大小仍然接近摄取数据大小。
  • 展望 future ,我做了以下工作:
  • 读取数据集并将其写回
    productDF
    .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
    .option("compression", "none")
    .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
  • 读取数据集,重新分区并将其写回
    productDF
    .repartition(500)
    .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
    .option("compression", "none")
    .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")

  • 结果: 80 GB 没有和 283 GB 重新分区,输出文件数相同

    80GB parquet 元示例:
    AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]

    283 GB Parquet 元示例:
    AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]

    看起来,即使没有未压缩的数据, Parquet 本身(带有编码?)也大大减少了数据的大小。如何 ? :)

    我试图读取未压缩的 80GB,重新分区并写回 - 我有 283GB
  • 对我来说,第一个问题是为什么 Spark 重新分区/洗牌后我的尺寸变大了?
  • 第二个是如何有效地在 spark 中打乱数据以有利于 parquet 编码/压缩(如果有的话)?

  • 一般来说,我不希望我的数据大小在 Spark 处理后增长,即使我没有改变任何东西。

    另外,我找不到 snappy 是否有任何可配置的压缩率,例如-1 ... -9?据我所知,gzip 有这个,但是在 Spark/Parquet 编写器中控制这个速率的方法是什么?

    感谢任何帮助!

    谢谢!

    最佳答案

    当您调用 repartition(n)在数据帧上,您正在进行循环分区。在重新分区之前存在的任何数据局部性都消失了,熵增加了。因此,运行长度和字典编码器以及压缩编解码器实际上并没有太多可使用的地方。

    所以当你重新分区时,你需要使用 repartition (n, col)版本。给它一个可以保留数据局部性的好列。

    此外,由于您可能正在为下游作业优化您的 sqooped 表,因此您可以 sortWithinPartition用于更快的扫描。
    df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)

    关于apache-spark - Spark + Parquet + Snappy : Overall compression ratio loses after spark shuffles data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48847660/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com