gpt4 book ai didi

arrays - PySpark:替换 ArrayType(String) 中的值

转载 作者:行者123 更新时间:2023-12-04 04:09:05 24 4
gpt4 key购买 nike

我目前有以下代码:

def _join_intent_types(df):
mappings = {
'PastNews': 'ContextualInformation',
'ContinuingNews': 'News',
'KnownAlready': 'OriginalEvent',
'SignificantEventChange': 'NewSubEvent',
}
return df.withColumn('Categories', posexplode('Categories').alias('i', 'val'))\
.when(col('val').isin(mappings), mappings[col('i')])\
.otherwise(col('val'))

但我不确定我的语法是否正确。我想做的是对一列列表进行操作,例如:

['EmergingThreats', 'Factoid', 'KnownAlready']

并用提供的字典中的映射替换该数组中的字符串,即

['EmergingThreats', 'Factoid', 'OriginalEvent']

我知道这可以通过 UDF 实现,但我担心这会对性能和可伸缩性产生怎样的影响。

原始表格的示例:

+------------------+-----------------------------------------------------------+
|postID |Categories |
+------------------+-----------------------------------------------------------+
|266269932671606786|[EmergingThreats, Factoid, KnownAlready] |
|266804609954234369|[Donations, ServiceAvailable, ContinuingNews] |
|266250638852243457|[EmergingThreats, Factoid, ContinuingNews] |
|266381928989589505|[EmergingThreats, MultimediaShare, Factoid, ContinuingNews]|
|266223346520297472|[EmergingThreats, Factoid, KnownAlready] |
+------------------+-----------------------------------------------------------+

我希望代码用新映射替换这些数组中的字符串,前提是它们存在于字典中。如果没有,请保持原样:

+------------------+-------------------------------------------------+          
|postID |Categories |
+------------------+-------------------------------------------------+
|266269932671606786|[EmergingThreats, Factoid, OriginalEvent] |
|266804609954234369|[Donations, ServiceAvailable, News] |
|266250638852243457|[EmergingThreats, Factoid, News] |
|266381928989589505|[EmergingThreats, MultimediaShare, Factoid, News]|
|266223346520297472|[EmergingThreats, Factoid, OriginalEvent] |
+------------------+-------------------------------------------------+

最佳答案

使用 explode + collect_list is expensive .这是未经测试的,但应该适用于 Spark 2.4+:

from pyspark.sql.functions import expr

for k, v in mappings.items()
df = df.withColumn(
'Categories',
expr('transform(sequence(0,size(Categories)-1), x -> replace(Categories[x], {k}, {v}))'.format(k=k, v=v))
)

您还可以将映射转换为 CASE/WHEN 语句,然后将其应用于 SparkSQL 转换函数:

sql_epxr = "transform(Categories, x -> CASE x {} ELSE x END)".format(" ".join("WHEN '{}' THEN '{}'".format(k,v) for k,v in mappings.items()))
# this yields the following SQL expression:
# transform(Categories, x ->
# CASE x
# WHEN 'PastNews' THEN 'ContextualInformation'
# WHEN 'ContinuingNews' THEN 'News'
# WHEN 'KnownAlready' THEN 'OriginalEvent'
# WHEN 'SignificantEventChange' THEN 'NewSubEvent'
# ELSE x
# END
# )

df.withColumn('Categories', expr(sql_epxr)).show(truncate=False)

对于旧版本的 spark,udf 可能是首选。

关于arrays - PySpark:替换 ArrayType(String) 中的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61268325/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com