gpt4 book ai didi

apache-spark - 使用 pyspark 将函数应用于 groupBy 数据

转载 作者:行者123 更新时间:2023-12-04 02:52:12 24 4
gpt4 key购买 nike

在另一列上分组时,我试图从 csv 中获取字数。我的 csv 有三列:id、message 和 user_id。我读了这个,然后拆分消息并存储一个unigrams列表:

+-----------------+--------------------+--------------------+
| id| message| user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+

接下来,给定我的数据框 df , 我想按 user_id 分组然后获取每个 unigram 的计数。作为简单的第一遍,我尝试按 user_id 分组并获取分组消息字段的长度:

from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf

df = self.session.read.csv(self.corptable, header=True,
mode="DROPMALFORMED",)

# split my messages ....
# message is now ArrayType(StringType())

grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())

我收到以下错误:
pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"

不知道如何解决这个错误。一般来说,在对另一列进行分组时,如何将函数应用于一列?我是否总是必须创建用户定义的函数? Spark 非常新。

编辑 :这是我解决这个问题的方法,给定一个单独的 Python 文件中的标记器:

group_field = "user_id"
message_field = "message"

context = SparkContext()
session = SparkSession\
.builder\
.appName("dlastk")\
.getOrCreate()

# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))

df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]

# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))

# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)

# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])

# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")

数据看起来像:
# after read
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...| T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+

# after tokenize
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+

# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...

# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....

# after flat RDD to DF
+--------------------+---------+---+--------------------+
| _1| _2| _3| _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...| face| 3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...| was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...| how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+

最佳答案

一种自然的方法是将单词分组到一个列表中,然后使用 python 函数 Counter()生成字数统计。对于这两个步骤,我们将使用 udf的。首先,将展平由 collect_list() 产生的嵌套列表的那个。多个数组:

unpack_udf = udf(
lambda l: [item for sublist in l for item in sublist]
)

其次,生成字数元组的一个,或者在我们的例子中 struct的:

from pyspark.sql.types import *
from collections import Counter

# We need to specify the schema of the return object
schema_count = ArrayType(StructType([
StructField("word", StringType(), False),
StructField("count", IntegerType(), False)
]))

count_udf = udf(
lambda s: Counter(s).most_common(),
schema_count
)

把它们放在一起:

from pyspark.sql.functions import collect_list

(df.groupBy("id")
.agg(collect_list("message").alias("message"))
.withColumn("message", unpack_udf("message"))
.withColumn("message", count_udf("message"))).show(truncate = False)
+-----------------+------------------------------------------------------+
|id |message |
+-----------------+------------------------------------------------------+
|10100718890699676|[[oecd,1], [the,1], [with,1], [at,1]] |
|10100720363468236|[[what,3], [me,1], [sad,1], [to,1], [does,1], [the,1]]|
+-----------------+------------------------------------------------------+

数据:

df = sc.parallelize([(10100720363468236,["what", "sad", "to", "me"]),
(10100720363468236,["what", "what", "does", "the"]),
(10100718890699676,["at", "the", "oecd", "with"])]).toDF(["id", "message"])

关于apache-spark - 使用 pyspark 将函数应用于 groupBy 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40983095/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com