gpt4 book ai didi

python - Spark,关于reduceByKey的小问题

转载 作者:太空宇宙 更新时间:2023-11-03 17:15:05 24 4
gpt4 key购买 nike

我是 Spark 新手。我正在尝试实现 tf-idf。我需要计算每个单词在每个文档中出现的次数以及每个文档中的总单词数。

我想进行归约,可能还想进行另一个操作,但我还不知道如何做。这是我的输入:

对的形式为 (documentName , (word, wordCount)) 例如。

("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)),
("doc2",("have", 5))

键是文档,值是单词以及该单词在该文档中出现的次数。我想计算每个文档中的总字数,并可能计算该字数的百分比。

我想要的输出:

("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)),
("doc2", (("a", 5),10)), ("doc2", (("have", 5),10))

我得到的效果是

corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1]))

起点:

collect_of_docs = [(doc1,text1), (doc2, text2),....]

def count_words(x):
l = []
words = x[1].split()
for w in words:
l.append(((w, x[0]), 1))
return l

sc = SparkContext()
corpus = sc.parallelize(collect_of_docs)
input = (corpus
.flatMap(count_words)
.reduceByKey(add)
.map(lambda ((x,y), z) : (y, (x,z))))

如果可能的话,我只想用一个棘手的运算符进行一次归约操作。感谢任何帮助:) 提前致谢。

最佳答案

一般来说,仅仅为了稍后收集数据而使用 flatMap 是没有意义的。我假设您的数据或多或少看起来像这样:

collect_of_docs = sc.parallelize([
(1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
(2, "Mauris magna sem, vehicula sed dictum finibus, posuere id ipsum."),
(3, "Duis eleifend molestie dolor, quis fringilla eros facilisis ac.")])

首先,我们需要一些使用基本正则表达式和计数器的助手:

from __future__ import division  # If for some reason you use Python 2.x
import re
from collections import Counter

def count_words(doc, pattern=re.compile("\w+")):
"""Given a tuple (doc_id, text)
return a tuple (doc_id, tokens_count

>>> count_words((1, "Foo bar bar."))
(1, Counter({'Foo': 1, 'bar': 2}))
"""
(doc_id, text) = doc
return (doc_id, Counter(pattern.findall(text)))

def compute_tf(cnt):
"""Convert term counter to term frequency

>>> compute_tf(Counter({'Foo': 1, 'bar': 2}))
{'Foo': 0.3333333333333333, 'bar': 0.6666666666666666}
"""
n = sum(cnt.values())
return {k: v / n for (k, v) in cnt.items()}

最终结果:

tfs = (collect_of_docs
.map(count_words)
.mapValues(compute_tf))

tfs.sortByKey().first()

## (1,
## {'Lorem': 0.125,
## 'adipiscing': 0.125,
## 'amet': 0.125,
## 'consectetur': 0.125,
## 'dolor': 0.125,
## 'elit': 0.125,
## 'ipsum': 0.125,
## 'sit': 0.125})

使用上述文档频率可以计算如下:

from operator import add

dfs = (tfs
.values()
.flatMap(lambda kv: ((k, 1) for k in kv.keys()))
.reduceByKey(add))

dfs.sortBy(lambda x: -x[1]).take(5)

## [('ipsum', 2),
## ('dolor', 2),
## ('consectetur', 1),
## ('finibus', 1),
## ('fringilla', 1)]

关于python - Spark,关于reduceByKey的小问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33724074/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com