gpt4 book ai didi

nested - Pyspark - 使用 collect_list 时保留空值

转载 作者:行者123 更新时间:2023-12-04 20:29:46 25 4
gpt4 key购买 nike

根据接受的答案 pyspark collect_set or collect_list with groupby ,当你做 collect_list在某列上,null此列中的值被删除。我已经检查过,这是真的。

但在我的情况下,我需要保留空列——我怎样才能做到这一点?

我没有找到任何关于这种 collect_list 变体的信息。功能。

解释为什么我想要空值的背景上下文:

我有一个数据框 df如下:

cId   |  eId  |  amount  |  city
1 | 2 | 20.0 | Paris
1 | 2 | 30.0 | Seoul
1 | 3 | 10.0 | Phoenix
1 | 3 | 5.0 | null

我想使用以下映射将其写入 Elasticsearch 索引:
"mappings": {
"doc": {
"properties": {
"eId": { "type": "keyword" },
"cId": { "type": "keyword" },
"transactions": {
"type": "nested",
"properties": {
"amount": { "type": "keyword" },
"city": { "type": "keyword" }
}
}
}
}
}

为了符合上面的嵌套映射,我转换了我的 df 以便对于 eId 和 cId 的每个组合,我有一个这样的交易数组:
df_nested = df.groupBy('eId','cId').agg(collect_list(struct('amount','city')).alias("transactions"))
df_nested.printSchema()
root
|-- cId: integer (nullable = true)
|-- eId: integer (nullable = true)
|-- transactions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- amount: float (nullable = true)
| | |-- city: string (nullable = true)

保存 df_nested作为一个 json 文件,我得到了 json 记录:
{"cId":1,"eId":2,"transactions":[{"amount":20.0,"city":"Paris"},{"amount":30.0,"city":"Seoul"}]}
{"cId":1,"eId":3,"transactions":[{"amount":10.0,"city":"Phoenix"},{"amount":30.0}]}

如您所见 - 当 cId=1eId=3 ,我的数组元素之一,其中 amount=30.0没有 city属性,因为这是一个 null在我的原始数据中( df )。当我使用 collect_list 时,空值被删除功能。

但是,当我尝试使用上述索引将 df_nested 写入 elasticsearch 时,由于模式不匹配而出错。这基本上就是为什么我想在应用 collect_list 后保留我的空值的原因。功能。

最佳答案

    from pyspark.sql.functions import create_map, collect_list, lit, col, to_json, from_json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext, SparkSession, types, Row
from pyspark.sql import functions as f
import os

app_name = "CollList"
conf = SparkConf().setAppName(app_name)
spark = SparkSession.builder.appName(app_name).config(conf=conf).enableHiveSupport().getOrCreate()

df = spark.createDataFrame([[1, 2, 20.0, "Paris"], [1, 2, 30.0, "Seoul"],
[1, 3, 10.0, "Phoenix"], [1, 3, 5.0, None]],
["cId", "eId", "amount", "city"])
print("Actual data")
df.show(10,False)
```
Actual data
+---+---+------+-------+
|cId|eId|amount|city |
+---+---+------+-------+
|1 |2 |20.0 |Paris |
|1 |2 |30.0 |Seoul |
|1 |3 |10.0 |Phoenix|
|1 |3 |5.0 |null |
+---+---+------+-------+
```
#collect_list that skips null columns
df1 = df.groupBy(f.col('city'))\
.agg(f.collect_list(f.to_json(f.struct([f.col(x).alias(x) for x in (c for c in df.columns if c != 'cId' and c != 'eId' )])))).alias('newcol')
print("Collect List Data - Missing Null Columns in the list")
df1.show(10, False)
```
Collect List Data - Missing Null Columns in the list
+-------+-------------------------------------------------------------------------------------------------------------------+
|city |collect_list(structstojson(named_struct(NamePlaceholder(), amount AS `amount`, NamePlaceholder(), city AS `city`)))|
+-------+-------------------------------------------------------------------------------------------------------------------+
|Phoenix|[{"amount":10.0,"city":"Phoenix"}] |
|null |[{"amount":5.0}] |
|Paris |[{"amount":20.0,"city":"Paris"}] |
|Seoul |[{"amount":30.0,"city":"Seoul"}] |
+-------+-------------------------------------------------------------------------------------------------------------------+
```
my_list = []
for x in (c for c in df.columns if c != 'cId' and c != 'eId' ):
my_list.append(lit(x))
my_list.append(col(x))

grp_by = ["eId","cId"]
df_nested = df.withColumn("transactions", create_map(my_list))\
.groupBy(grp_by)\
.agg(collect_list(f.to_json("transactions")).alias("transactions"))

print("collect list after create_map")
df_nested.show(10,False)
```
collect list after create_map
+---+---+--------------------------------------------------------------------+
|eId|cId|transactions |
+---+---+--------------------------------------------------------------------+
|2 |1 |[{"amount":"20.0","city":"Paris"}, {"amount":"30.0","city":"Seoul"}]|
|3 |1 |[{"amount":"10.0","city":"Phoenix"}, {"amount":"5.0","city":null}] |
+---+---+--------------------------------------------------------------------+
```

关于nested - Pyspark - 使用 collect_list 时保留空值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49395458/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com