gpt4 book ai didi

python - Pyspark 使用 sql.transform 使包含结构数组的列中的所有空字符串无效

转载 作者:行者123 更新时间:2023-12-05 03:28:42 29 4
gpt4 key购买 nike

我在 pyspark df 中有一列,其中包含如下所示的 map 数组:

[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]

df.printSchema() 返回列的以下内容:

 |-- constituencies: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- note: string (nullable = true)
| | |-- stateProvince: string (nullable = true)

我想使所有这些空字符串无效。所以我认为这将是一个用 F.transform(col, f)

解决的完美问题

所以我创建了这个函数,然后我在转换表达式中使用它,如下所示:

def nullify_vals(d):
def nullify_string(str_):
if str_.strip() == "":
return None
return str_.strip()

return (
dict((k, nullify_string(v)) for k, v in d.items())
)

请注意,以上内容在字典上测试时有效:

dd = {"my": "map", "is": "", "not": "   ", "entierly": "  empty , right?"}
d_cln = nullify_vals(dd)
d_cln["not"] is None # returns True

但是当我在 Pyspark 中使用它时,它给我一个错误:

import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))

TypeError: 'Column' object is not callable

这些是堆栈跟踪的最后几行:

---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))

File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
4214 def transform(col, f):
4215 """
4216 Returns an array of elements after applying a transformation to each element in the input array.
4217
(...)
4258 +--------------+
4259 """
-> 4260 return _invoke_higher_order_function("ArrayTransform", [col], [f])

File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
4206 expr = getattr(expressions, name)
4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
4211 return Column(sc._jvm.Column(expr(*jcols + jfuns)))

最佳答案

您的函数 nullify_vals 应该采用 StructType 类型的 Column 对象,因为您的数组元素是结构。但是你正在传递一个普通的 python 对象。

尝试像这样重写它:

from pyspark.sql import functions as F, Column

def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
for f in fields:
struct_col = struct_col.withField(
f,
F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
)

return struct_col

对于内部结构中的每个字段,我们使用列 withField 方法更新它,如果它等于空字符串,则将其设置为 null。

应用于您的输入示例:

json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))

您可以从数据框架构中获取选区结构字段列表:

constituencies_fields = df.selectExpr("inline(constituencies)").columns

df1 = df.withColumn(
"constituencies",
F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)

df1.show(truncate=False)
#+----------------------------------------+
#|constituencies |
#+----------------------------------------+
#|[{Fadden, null, null, null, Queensland}]|
#+----------------------------------------+

关于python - Pyspark 使用 sql.transform 使包含结构数组的列中的所有空字符串无效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71193469/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com