gpt4 book ai didi

python - PySpark 使用字典映射创建新列

转载 作者:太空狗 更新时间:2023-10-29 18:14:53 25 4
gpt4 key购买 nike

使用 Spark 1.6,我有一个 Spark DataFrame 列(命名为 col1),其值为 A、B、C、DS、DNS、E、F、G和 H。我想用下面的 dict 中的值创建一个新列(比如 col2)。我如何映射这个? (例如,“A”需要映射到“S”等)

dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

最佳答案

UDF 的低效解决方案(独立于版本):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def translate(mapping):
def translate_(col):
return mapping.get(col)
return udf(translate_, StringType())

df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S',
'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

df.withColumn("value", translate(mapping)("key"))

结果:

+-------+-----+
| key|value|
+-------+-----+
| DS| S|
| G| NS|
|INVALID| null|
+-------+-----+

更有效(Spark >= 2.0,Spark < 3.0)是创建一个 MapType 文字:

from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

df.withColumn("value", mapping_expr.getItem(col("key")))

同样的结果:

+-------+-----+
| key|value|
+-------+-----+
| DS| S|
| G| NS|
|INVALID| null|
+-------+-----+

但更高效的执行计划:

== Physical Plan ==
*Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
+- Scan ExistingRDD[key#15]

与 UDF 版本相比:

== Physical Plan ==
*Project [key#15, pythonUDF0#61 AS value#57]
+- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
+- Scan ExistingRDD[key#15]

Spark >= 3.0 中,getItem 应替换为 __getitem__ ([]),即:

from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

df.withColumn("value", mapping_expr[col("key")])

关于python - PySpark 使用字典映射创建新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42980704/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com