gpt4 book ai didi

python - 将新的键/值对添加到 Spark MapType 列

转载 作者:行者123 更新时间:2023-11-28 17:10:03 25 4
gpt4 key购买 nike

我有一个带有 MapType 字段的 Dataframe。

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> fields = StructType([
... StructField('timestamp', TimestampType(), True),
... StructField('other_field', StringType(), True),
... StructField('payload', MapType(
... keyType=StringType(),
... valueType=StringType()),
... True), ])
>>> import datetime
>>> rdd = sc.parallelize([[datetime.datetime.now(), 'this should be in', {'akey': 'aValue'}]])
>>> df = rdd.toDF(fields)
>>> df.show()
+--------------------+-----------------+-------------------+
| timestamp| other_field| payload|
+--------------------+-----------------+-------------------+
|2018-01-10 12:56:...|this should be in|Map(akey -> aValue)|
+--------------------+-----------------+-------------------+

我想将 other_field 添加为 payload 字段中的键。

我知道我可以使用 udf:

>>> def _add_to_map(name, value, map_field):
... map_field[name] = value
... return map_field
...
>>> add_to_map = udf(_add_to_map, MapType(StringType(),StringType()))
>>> df.select(add_to_map(lit('other_field'), 'other_field', 'payload')).show(1, False)
+------------------------------------------------------+
|PythonUDF#_add_to_map(other_field,other_field,payload)|
+------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue) |
+------------------------------------------------------+

有没有没有 udf 的方法?

最佳答案

如果您提前知道 key ,这里有一种不使用 udf 的方法。使用 create_map 函数。至于这是否更高效,我不知道。

from pyspark.sql.functions import col, lit, create_map

df.select(
create_map(
lit('other_field'),
col('other_field'),
lit('akey'),
col('payload')['akey']
)
).show(n=1, truncate=False)

输出:

+-----------------------------------------------------+
|map(other_field, other_field, akey, payload['akey']) |
+-----------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue)|
+-----------------------------------------------------+

更新

这是一种无需对字典键进行硬编码的方法。不幸的是,它涉及一个 collect() 操作。

模拟一些数据

首先,让我们修改您的原始数据框,在 MapType() 字段中再包含一个键值对。

from pyspark.sql.functions import col, lit, create_map
import datetime
rdd = sc.parallelize(
[
[
datetime.datetime.now(),
'this should be in',
{'akey': 'aValue', 'bkey': 'bValue'}
]
]
)
df = rdd.toDF(fields)
df.show(n=1, truncate=False)

创建以下内容:

+--------------------------+-----------------+-----------------------------------+
|timestamp |other_field |payload |
+--------------------------+-----------------+-----------------------------------+
|2018-01-10 17:37:58.859603|this should be in|Map(bkey -> bValue, akey -> aValue)|
+--------------------------+-----------------+-----------------------------------+

获取 map 的键

使用 explode()collect(),您可以这样获取 key :

from pyspark.sql.functions import explode

keys = [
x['key'] for x in (df.select(explode("payload"))
.select("key")
.distinct()
.collect())
]

创建一个包含所有字段的新 map

现在像上面那样使用create_map,但是使用来自keys 的信息来动态创建键值对。我使用 reduce(add, ...) 因为 create_map 期望输入按顺序是键值对 - 我想不出另一种方法来展平列表.

from operator import add
df.select(
create_map
(
*([lit('other_field'), col('other_field')] + reduce(add, [[lit(k), col('payload').getItem(k)] for k in keys]))
)
).show(n=1, truncate=False)

最终结果:

+---------------------------------------------------------------------------+
|map(other_field, other_field, akey, payload['akey'], bkey, payload['bkey'])|
+---------------------------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue, bkey -> bValue) |
+---------------------------------------------------------------------------+

引用资料

  1. pyspark: Create MapType Column from existing columns

  2. PySpark converting a column of type 'map' to multiple columns in a dataframe

关于python - 将新的键/值对添加到 Spark MapType 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48196203/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com