gpt4 book ai didi

pyspark - pyspark 数据帧中数组的访问字段

转载 作者:行者123 更新时间:2023-12-04 21:32:05 27 4
gpt4 key购买 nike

我正在开发基于一组 ORC 文件的 spark 数据框的 sql 查询。程序是这样的:

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("test").getOrCreate()
sdf = spark_session.read.orc("../data/")
sdf.createOrReplaceTempView("test")

现在我有一个名为“test”的表。如果我做这样的事情:
spark_session.sql("select count(*) from test")

那么结果会很好。但是我需要在查询中获取更多列,包括数组中的一些字段。
In [8]: sdf.take(1)[0]["person"]
Out[8]:
[Row(name='name', value='tom'),
Row(name='age', value='20'),
Row(name='gender', value='m')]

我尝试过类似的事情:
spark_session.sql("select person.age, count(*) from test group by person.age")

但这不起作用。我的问题是:如何访问“person”数组中的字段?

谢谢!

编辑:

sdf.printSchema() 的结果
In [3]: sdf.printSchema()
root
|-- person: integer (nullable = true)
|-- customtags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- value: string (nullable = true)

错误信息:
AnalysisException: 'No such struct field age in name, value; line 16 pos 8'

最佳答案

我不知道如何仅使用 PySpark-SQL 来做到这一点,但这里有一种使用 PySpark DataFrames 来做到这一点的方法。

基本上,我们可以将 struct 列转换为 MapType()使用 create_map()功能。然后我们可以使用字符串索引直接访问字段。

考虑以下示例:

定义架构

schema = StructType([
StructField('person', IntegerType()),
StructField(
'customtags',
ArrayType(
StructType(
[
StructField('name', StringType()),
StructField('value', StringType())
]
)
)
)
]
)

创建示例数据帧

data = [
(
1,
[
{'name': 'name', 'value': 'tom'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'm'}
]
),
(
2,
[
{'name': 'name', 'value': 'jerry'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'm'}
]
),
(
3,
[
{'name': 'name', 'value': 'ann'},
{'name': 'age', 'value': '20'},
{'name': 'gender', 'value': 'f'}
]
)
]
df = sqlCtx.createDataFrame(data, schema)
df.show(truncate=False)
#+------+------------------------------------+
#|person|customtags |
#+------+------------------------------------+
#|1 |[[name,tom], [age,20], [gender,m]] |
#|2 |[[name,jerry], [age,20], [gender,m]]|
#|3 |[[name,ann], [age,20], [gender,f]] |
#+------+------------------------------------+

将结构列转换为映射

from operator import add
import pyspark.sql.functions as f

df = df.withColumn(
'customtags',
f.create_map(
*reduce(
add,
[
[f.col('customtags')['name'][i],
f.col('customtags')['value'][i]] for i in range(3)
]
)
)
)\
.select('person', 'customtags')

df.show(truncate=False)
#+------+------------------------------------------+
#|person|customtags |
#+------+------------------------------------------+
#|1 |Map(name -> tom, age -> 20, gender -> m) |
#|2 |Map(name -> jerry, age -> 20, gender -> m)|
#|3 |Map(name -> ann, age -> 20, gender -> f) |
#+------+------------------------------------------+

这里的问题是你必须先验地知道 ArrayType() 的长度。 (在本例中为 3),因为我不知道动态循环的方法。这也假设数组的所有行的长度都相同。

我不得不使用 reduce(add, ...)这里是因为 create_map()期望 (key, value) 形式的元素对.

按 map 列中的字段分组

df.groupBy((f.col('customtags')['name']).alias('name')).count().show()
#+-----+-----+
#| name|count|
#+-----+-----+
#| ann| 1|
#|jerry| 1|
#| tom| 1|
#+-----+-----+

df.groupBy((f.col('customtags')['gender']).alias('gender')).count().show()
#+------+-----+
#|gender|count|
#+------+-----+
#| m| 2|
#| f| 1|
#+------+-----+

关于pyspark - pyspark 数据帧中数组的访问字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48834188/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com