gpt4 book ai didi

python - 为 Pyspark 中的每一行计算列中不同的子字符串出现次数?

转载 作者:行者123 更新时间:2023-12-04 07:27:47 25 4
gpt4 key购买 nike

我的数据集看起来像这样,我在 col1 中有一组以逗号分隔的字符串值。和 col2 , 和 col3是连接在一起的两列。

+===========+========+===========
|col1 |col2 |col3
+===========+========+===========
|a,b,c,d |a,c,d |a,b,c,d,a,c,d
|e,f,g |f,g,h |e,f,g,f,g,h
+===========+========+===========
基本上,我想要做的是获取 col3 中以逗号分隔的所有值。并附加另一列,其中包含每个值及其计数。
基本上,我试图在 col4 中获得这种输出:
+===========+========+==============+======================
|col1 |col2 |col3 |col4
+===========+========+==============+======================
|a,b,c,d |a,c,d |a,b,c,d,a,c,d |a: 2, b: 1, c: 2, d: 2
|e,f,g |f,g,h |e,f,g,f,g,h |e: 1, f: 2, g: 2, h: 1
+===========+========+==============+======================
我已经想出了如何将列连接在一起以到达 col3 ,但我在访问 col4 时遇到了一些麻烦.这是我离开的地方,我有点不确定从哪里开始:
from pyspark.sql.functions import concat, countDistinct


df = df.select(concat(df.col1, df.col2).alias('col3'), '*')
df.agg(countDistinct('col3')).show()

+---------------------+ |count(DISTINCT col3)| +---------------------+ | 2| +---------------------+
问题:如何动态统计col3中以逗号分隔的子串并制作最后一列,显示数据集中所有行的每个子字符串的频率?

最佳答案

使用 UDF
这是使用 udfs 执行此操作的一种方法。
首先进行数据生成。

from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import concat_ws, udf

data = [("a,b,c,d", "a,c,d", "a,b,c,d,a,c,d"),
("e,f,g", "f,g,h", "e,f,g,f,g,h")
]
schema = StructType([
StructField("col1",StringType(),True),
StructField("col2",StringType(),True),
StructField("col3",StringType(),True),
])
df = spark.createDataFrame(data=data,schema=schema)
然后使用一些原生的 python 函数,如 Counter 和 json 来完成任务。
from collections import Counter
import json

@udf(StringType())
def count_occurances(row):
return json.dumps(dict(Counter(row.split(','))))

df.withColumn('concat', concat_ws(',', df.col1, df.col2, df.col3))\
.withColumn('counts', count_occurances('concat')).show(2, False)
结果是
+-------+-----+-------------+---------------------------+--------------------------------+
|col1 |col2 |col3 |concat |counts |
+-------+-----+-------------+---------------------------+--------------------------------+
|a,b,c,d|a,c,d|a,b,c,d,a,c,d|a,b,c,d,a,c,d,a,b,c,d,a,c,d|{"a": 4, "b": 2, "c": 4, "d": 4}|
|e,f,g |f,g,h|e,f,g,f,g,h |e,f,g,f,g,h,e,f,g,f,g,h |{"e": 2, "f": 4, "g": 4, "h": 2}|
+-------+-----+-------------+---------------------------+--------------------------------+
使用原生pyspark函数的解决方案
此解决方案比使用 udf 稍微复杂一些,但由于缺少 udf,性能可能更高。这个想法是连接三个字符串列并分解它们。为了知道每个分解的行来自哪里,我们添加了一个索引。双重分组将帮助我们获得所需的结果。最后,我们将结果连接回原始帧以获得所需的模式。
from pyspark.sql.functions import concat_ws, monotonically_increasing_id, split, explode, collect_list
df = df.withColumn('index', monotonically_increasing_id())

df.join(
df
.withColumn('concat', concat_ws(',', df.col1, df.col2, df.col3))\
.withColumn('arr_col', split('concat', ','))\
.withColumn('explode_col', explode('arr_col'))\
.groupBy('index', 'explode_col').count()\
.withColumn('concat_counts', concat_ws(':', 'explode_col', 'count'))\
.groupBy('index').agg(concat_ws(',', collect_list('concat_counts')).alias('grouped_counts')), on='index').show()
结果是
+-----------+-------+-----+-------------+---------------+
| index| col1| col2| col3| grouped_counts|
+-----------+-------+-----+-------------+---------------+
|42949672960|a,b,c,d|a,c,d|a,b,c,d,a,c,d|a:4,b:2,c:4,d:4|
|94489280512| e,f,g|f,g,h| e,f,g,f,g,h|h:2,g:4,f:4,e:2|
+-----------+-------+-----+-------------+---------------+
请注意,我们在 udf 部分创建的 json 通常比 grouped_counts 中的简单字符串更方便使用。列使用 native pyspark 函数。

关于python - 为 Pyspark 中的每一行计算列中不同的子字符串出现次数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68132560/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com