gpt4 book ai didi

python - 将没有返回值的 Python Lambda 函数转换为 Pyspark

转载 作者:太空宇宙 更新时间:2023-11-04 04:07:00 25 4
gpt4 key购买 nike

我在 Python 中有一个可用的 lambda 函数,它计算数据集 1 中的每个字符串与数据集 2 中的字符串之间的最高相似度。在迭代期间,它将字符串、最佳匹配和相似度以及一些其他信息写入 bigquery。没有返回值,因为该函数的目的是将一行插入到 bigquery 数据集中。这个过程需要相当长的时间,这就是为什么我想使用 Pyspark 和 Dataproc 来加速这个过程。

将 pandas 数据帧转换为 spark 很容易。我无法注册我的 udf,因为它没有返回值,而 pyspark 需要一个。此外,我不明白如何将 python 中的“应用”函数映射到 pyspark 变体。所以基本上我的问题是如何转换下面的 python 代码以在 spark 数据帧上工作。

以下代码适用于常规 Python 环境:

def embargomatch(name, code, embargo_names):
find best match
insert best match and additional information to bigquery

customer_names.apply(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names),axis=1)

因为 pyspark 需要返回类型,所以我将“return 1”添加到 udf 并尝试了以下操作:


customer_names = spark.createDataFrame(customer_names)

from pyspark.sql.types import IntegerType
embargo_match_udf = udf(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names), IntegerType())

现在我无法尝试应用选择功能,因为我不知道要提供什么参数。

最佳答案

我怀疑您对如何将多列传递给 udf 感到困惑——这里是该问题的一个很好的答案:Pyspark: Pass multiple columns in UDF .

与其基于包装函数的 lambda 创建 udf,不如考虑通过直接基于 embargomatch 创建 udf 来简化。

embargo_names = ...

# The parameters here are the columns passed into the udf
def embargomatch(name, customer_code):
pass
embargo_match_udf = udf(embargomatch, IntegerType())
customer_names.select(embargo_match_udf(array('name', 'customer_code')).alias('column_name'))

话虽这么说,但怀疑您的 udf 没有返回任何内容——我通常将 udfs 视为向数据帧添加列的一种方式,但不会产生副作用。如果您想将记录插入到 bigquery 中,请考虑执行以下操作:

customer_names.select('column_name').write.parquet('gs://some/path')
os.system("bq load --source_format=PARQUET [DATASET].[TABLE] gs://some/path")

关于python - 将没有返回值的 Python Lambda 函数转换为 Pyspark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57113621/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com