gpt4 book ai didi

python - 如何根据 PySpark 中的数组值进行过滤?

转载 作者:太空狗 更新时间:2023-10-29 19:37:21 26 4
gpt4 key购买 nike

我的架构:

|-- Canonical_URL: string (nullable = true)
|-- Certifications: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Certification_Authority: string (nullable = true)
| | |-- End: string (nullable = true)
| | |-- License: string (nullable = true)
| | |-- Start: string (nullable = true)
| | |-- Title: string (nullable = true)
|-- CompanyId: string (nullable = true)
|-- Country: string (nullable = true)
|-- vendorTags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- score: double (nullable = true)
| | |-- vendor: string (nullable = true)

我尝试了以下查询以从 vendorTags

中选择嵌套字段
df3 = sqlContext.sql("select vendorTags.vendor from globalcontacts")

如何在 PySpark 中查询 where 子句中的嵌套字段

df3 = sqlContext.sql("select vendorTags.vendor from globalcontacts where vendorTags.vendor = 'alpha'")

df3 = sqlContext.sql("select vendorTags.vendor from globalcontacts where vendorTags.score > 123.123456")

像这样的..

我尝试了上面的查询只是为了得到下面的错误

df3 = sqlContext.sql("select vendorTags.vendor from globalcontacts where vendorTags.vendor = 'alpha'")
16/03/15 13:16:02 INFO ParseDriver: Parsing command: select vendorTags.vendor from globalcontacts where vendorTags.vendor = 'alpha'
16/03/15 13:16:03 INFO ParseDriver: Parse Completed
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 583, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve '(vendorTags.vendor = cast(alpha as double))' due to data type mismatch: differing types in '(vendorTags.vendor = cast(alpha as double))' (array<string> and double).; line 1 pos 71"

最佳答案

对于基于相等的查询,您可以使用array_contains:

df = sc.parallelize([(1, [1, 2, 3]), (2, [4, 5, 6])]).toDF(["k", "v"])
df.createOrReplaceTempView("df")

# With SQL
sqlContext.sql("SELECT * FROM df WHERE array_contains(v, 1)")

# With DSL
from pyspark.sql.functions import array_contains
df.where(array_contains("v", 1))

如果你想使用更复杂的谓词,你必须explode 或使用 UDF,例如像这样:

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf

def exists(f):
return udf(lambda xs: any(f(x) for x in xs), BooleanType())

df.where(exists(lambda x: x > 3)("v"))

在 Spark 2.4 中。或者以后也可以使用高阶函数

from pyspark.sql.functions import expr

df.where(expr("""aggregate(
transform(v, x -> x > 3),
false,
(x, y) -> x or y
)"""))

df.where(expr("""
exists(v, x -> x > 3)
"""))

Python 包装器应该在 3.1 ( SPARK-30681 ) 中可用。

关于python - 如何根据 PySpark 中的数组值进行过滤?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36014082/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com