gpt4 book ai didi

apache-spark - spark sql中的有状态udfs,或者如何在spark sql中获得mapPartitions的性能优势?

转载 作者:行者123 更新时间:2023-12-03 16:54:18 27 4
gpt4 key购买 nike

在转换导致创建或加载昂贵资源的情况下,在映射分区上使用映射可以显着提高性能(例如 - 对外部服务进行身份验证或创建数据库连接)。

mapPartition 允许我们每个分区初始化一次昂贵的资源,而标准映射是每行初始化一次。

但是如果我使用数据帧,我应用自定义转换的方式是通过指定用户定义的函数逐行操作 - 所以我失去了使用 mapPartitions 为每个块执行一次繁重工作的能力。

在 spark-sql/dataframe 中是否有解决方法?

更具体地说 :

我需要对一堆文档执行特征提取。我有一个输入文档并输出一个向量的函数。

计算本身涉及初始化与外部服务的连接。我不想或不需要为每个文档初始化它。这在规模上具有非平凡的开销。

最佳答案

一般来说,你有三个选择:

  • 转换 DataFrameRDD并申请 mapPartitions直接地。由于您使用 Python udf您已经破坏了某些优化并支付 serde 成本并使用 RDD平均不会让它变得更糟。
  • 懒惰 initialize required resources (另见 How to run a function on all Spark workers before processing data in PySpark? )。
  • 如果数据可以使用 Arrow 进行序列化,请使用矢量化 pandas_udf (Spark 2.3 及更高版本)。不幸的是,您不能直接通过 VectorUDT 使用它,因此您必须展开向量并稍后折叠,因此这里的限制因素是向量的大小。此外,您必须小心控制分区的大小。

  • 请注意,使用 UserDefinedFunctions可能需要 promoting objects to non-deterministic变种。

    关于apache-spark - spark sql中的有状态udfs,或者如何在spark sql中获得mapPartitions的性能优势?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49558146/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com