gpt4 book ai didi

pyspark - 在整行上使用 udf 过滤 Pyspark Dataframe

转载 作者:行者123 更新时间:2023-12-02 12:28:26 30 4
gpt4 key购买 nike

有没有办法选择整行作为列输入到 Pyspark 过滤器 udf 中?

我有一个复杂的过滤函数“my_filter”,我想将其应用于整个 DataFrame:

my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))

但是

col("*")

抛出错误,因为这不是有效的操作。

我知道我可以将数据帧转换为 RDD,然后使用 RDD 的过滤方法,但我不想将其转换为 RDD,然后再转换回数据帧。我的 DataFrame 具有复杂的嵌套类型,因此当我尝试再次将 RDD 转换为 DataFrame 时,模式推断失败。

最佳答案

您应该静态写入所有列。例如:

from pyspark.sql import functions as F

# create sample df
df = sc.parallelize([
(1, 'b'),
(1, 'c'),

]).toDF(["id", "category"])

#simple filter function
@F.udf(returnType=BooleanType())
def my_filter(col1, col2):
return (col1>0) & (col2=="b")

df.filter(my_filter('id', 'category')).show()

结果:

+---+--------+
| id|category|
+---+--------+
| 1| b|
+---+--------+

如果您有很多列并且您确定要对列进行排序:

cols = df.columns
df.filter(my_filter(*cols)).show()

产生相同的输出。

关于pyspark - 在整行上使用 udf 过滤 Pyspark Dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52051985/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com