gpt4 book ai didi

python - Spark pandas_udf 并不更快

转载 作者:行者123 更新时间:2023-12-02 03:15:08 24 4
gpt4 key购买 nike

我面临着繁重的数据转换。简而言之,我有数据列,每个数据列都包含与一些序数相对应的字符串。例如,。我的目标是将这些字符串映射到整数以保留顺序。在本例中,为LOW -> 0MID -> 1HIGH -> 2

这是一个生成此类数据的简单函数:

def fresh_df(N=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=N)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=N)

pdf = pd.DataFrame({
"feat1": feat1,
"feat2": feat2
})
return spark.createDataFrame(pdf)

我的第一个方法是:

feat1_dict = {"HI": 1, "MID": 2, "LO": 3}
feat2_dict = {"SMALL": 0, "MEDIUM": 1, "LARGE": 2}

mappings = {
"feat1": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),
"feat2": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])
}

for col in df.columns:
col_map = mappings[col]
df = df.withColumn(col+"_mapped", col_map[df[col]])

这按预期工作,但实际上它变得很慢,我想优化该过程。我读到pandas_udf这给了我希望。下面是修改后的代码:

feats_dict = {
"feat1": feat1_dict,
"feat2": feat2_dict
}

for col_name in df.columns:
@F.pandas_udf('integer', F.PandasUDFType.SCALAR)
def map_map(col):
return col.map(feats_dict[col_name])
df = df.withColumn(col_name + "_mapped", map_map(df[col_name]))

唉!比较这两个版本时,执行时间没有任何改进。我在 Spark 本地实例(使用 docker)和 5 节点 EMR 集群(使用默认配置)上对两者进行了比较。

我创建了一个notebook您可以在其中看到所有代码。一般来说,我使用了以下导入:

import numpy as np
import pandas as pd

from itertools import chain
from pyspark.sql import functions as F

我错过了什么?为什么这个过程如此缓慢,为什么使用 pandas_udf 时没有任何改进?

最佳答案

为什么这么慢?因为 Spark 在 JVM 中运行,而 pyspark 则不在 JVM 中运行(因为它是一个 Python 进程),为了使该进程成为可能,需要将所有数据序列化和反序列化移动到 JVM。

您可以使用 whenotherwise 函数映射值,避免序列化和反序列化过程,从而提高性能。

import numpy as np
import pandas as pd
import pyspark.sql.functions as f
from pyspark.shell import spark


def fresh_df(n=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=n)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=n)

pdf = pd.DataFrame({
"feat1": feat1,
"feat2": feat2
})
return spark.createDataFrame(pdf)


df = fresh_df()
df = df.withColumn('feat1_mapped', f
.when(df.feat1 == f.lit('HI'), 1)
.otherwise(f.when(df.feat1 == f.lit('MID'), 2).otherwise(3)))

df = df.withColumn('feat2_mapped', f
.when(df.feat2 == f.lit('SMALL'), 0)
.otherwise(f.when(df.feat2 == f.lit('MEDIUM'), 1).otherwise(2)))
df.show(n=20)

输出

+-----+------+------------+------------+
|feat1| feat2|feat1_mapped|feat2_mapped|
+-----+------+------------+------------+
| LO| SMALL| 3| 0|
| LO|MEDIUM| 3| 1|
| MID|MEDIUM| 2| 1|
| MID| SMALL| 2| 0|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| LO| SMALL| 3| 0|
| MID| LARGE| 2| 2|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| LO| LARGE| 3| 2|
| HI|MEDIUM| 1| 1|
| LO| SMALL| 3| 0|
| HI|MEDIUM| 1| 1|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| HI| SMALL| 1| 0|
| HI| LARGE| 1| 2|
| MID| LARGE| 2| 2|
+-----+------+------------+------------+

关于python - Spark pandas_udf 并不更快,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56930974/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com