gpt4 book ai didi

python - Spark 非确定性 Pandas UDF 会出现什么问题

转载 作者:行者123 更新时间:2023-12-04 04:31:16 26 4
gpt4 key购买 nike

我正在编写一个需要为基于某些标准匹配的某些组生成 UUID 的过程。我让我的代码正常工作,但我担心在我的 UDF 中创建 UUID 的潜在问题(从而使其不确定)。这是一些代码的简化示例来说明:

from uuid import uuid1

from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf

spark = (
SparkSession.builder.master("local")
.appName("Word Count")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])


@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
df["uuid"] = str(uuid1())
return df


>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age| uuid|
+----+---+--------------------+
| j| 3|1f8f48ac-0da8-430...|
| h| 3|1f8f48ac-0da8-430...|
| a| 2|d5206d03-bcce-445...|
+----+---+--------------------+

这目前适用于 AWS Glue 上超过 20 万条记录的一些数据处理,我还没有发现任何错误。

我用 uuid1因为它使用节点信息来生成 UUID,从而确保没有 2 个节点生成相同的 id。

我的一个想法是将UDF注册为非确定性的:
udf = pandas_udf(
create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()

但这给了我以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
`age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
+- LogicalRDD [name#0, age#1L], false

我的问题是:
  • 这可能会遇到哪些潜在问题?
  • 如果它确实有潜在的问题,有什么说法可以让我确定这一点?
  • 为什么不能将 GROUPED_MAP 函数标记为非确定性函数?
  • 最佳答案

    您的函数是非确定性的,但 Spark 将其视为确定性,即 "Due to optimization, duplicate invocations maybe eliminated" .但是,每次调用 pandas_udf将是唯一的输入(按键分组的行),因此对 pandas_udf 的重复调用的优化不会被触发。因此,asNondeterministic抑制这种优化的方法对于 pandas_udf 来说是多余的。的 GROUPED_MAP类型。在我看来,这解释了为什么 GroupedData.apply函数未编码为接受 pandas_udf标记为非确定性。这是没有意义的,因为没有抑制的优化机会。

    关于python - Spark 非确定性 Pandas UDF 会出现什么问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61847837/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com