gpt4 book ai didi

python - pySpark forEachPartition - 代码在哪里执行

转载 作者:行者123 更新时间:2023-11-30 21:55:53 33 4
gpt4 key购买 nike

我正在使用 2.3 版本的 pySpark(在我当前的开发系统中无法更新到 2.4),并且有以下关于 foreachPartition 的问题.

首先介绍一点背景信息:据我了解,pySpark-UDF 强制 Python 代码在 Python 实例中的 Java 虚拟机 (JVM) 之外执行,从而降低了性能。由于我需要将一些 Python 函数应用于我的数据并希望最大限度地减少开销成本,因此我有一个想法,至少将一组可处理的数据加载到驱动程序中并将其作为 Pandas-DataFrame 进行处理。无论如何,这都会导致 Spark 失去并行性优势。然后我读到 foreachPartition 将一个函数应用于分区内的所有数据,因此允许并行处理。

我现在的问题是:

  1. 当我通过 foreachPartition 应用 Python 函数时,Python 执行是否在驱动程序进程内进行(因此分区数据通过网络传输到我的驱动程序) ?

  2. 数据是在 foreachPartition 中逐行处理(意味着每个 RDD 行都被一一传输到 Python 实例),还是一次性处理分区数据(举例来说,这意味着整个分区都会传输到实例并由一个 Python 实例作为一个整体进行处理)?

预先感谢您的输入!

<小时/>

编辑:

我之前使用过的驱动程序解决方案看起来像这样,取自SO here :

for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
# Do stuff on the partition

可以从 docs 中读取rdd.toLocalIterator() 提供必要的功能:

Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.

最佳答案

幸运的是,我偶然发现了 Mrinal 对 mapPartitions 的精彩解释(回答 here)。

mapPartitions 在 RDD 的每个分区上应用一个函数。因此,如果分区分布在不同的节点上,则可以使用并行化。在这些节点上创建处理 Python 函数所需的相应 Python 实例。虽然 foreachPartition 仅应用一个函数(例如,将数据写入 .csv 文件),但 mapPartitions 还会返回一个新的 RDD。因此,使用 foreachPartition 对我来说是错误的选择。

为了回答我的第二个问题:像 mapUDFs 这样的函数创建一个新的 Python 实例并从 DataFrame/RDD 逐行传递数据,导致大量的开销。 foreachPartitionmapPartitions(都是 RDD 函数)将整个分区传输到 Python 实例。

此外,使用生成器还可以减少迭代此传输的分区数据所需的内存量(分区作为迭代器对象处理,然后通过迭代此对象来处理每一行)。

示例可能如下所示:

def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

@partition: iterator-object of partition
"""

for row in partition:
yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

希望这可以帮助面临类似问题的人:)

关于python - pySpark forEachPartition - 代码在哪里执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55654982/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com