gpt4 book ai didi

python - MapReduce 对多个文件中的所有行进行成对比较

转载 作者:太空宇宙 更新时间:2023-11-04 06:27:32 24 4
gpt4 key购买 nike

我开始使用 python 的 mrjob将我的一些长期运行的 python 程序转换为 MapReduce hadoop 作业。我已经得到了简单的字数统计示例,并且我在概念上理解了“文本分类”示例。

但是,我在确定解决问题所需的步骤时遇到了一些麻烦。

我有多个文件(大约 6000 个),每个文件都有 2 到 800 行。在这种情况下,每一行都是一个简单的以空格分隔的“信号”。我需要比较每个文件中的每一行与所有文件中的每一行(包括它本身)之间的相关性。然后根据相关系数输出结果。

一个文件的例子:

1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...

我需要生成此文件的每一行,并与每个其他文件的每一行配对......或者我可以将所有文件连接到一个文件中,如果这样可以使事情变得更容易,但我仍然需要成对迭代。

我了解如何进行计算以及如何使用最终的归约步骤来聚合和过滤结果。我遇到的困难是如何在不读取单个 setp 中的所有文件的情况下yield 所有成对的项目到连续的步骤?我想我可以提前准备一个使用 itertools.product 的输入文件,但这个文件会非常大。

最佳答案

好吧,由于没有人提出答案,我将发布我当前的解决方法,以防其他人需要它。我不确定这是多么“规范”或高效,但到目前为止它是有效的。

我将文件名作为文件每一行的第一项,然后是 \t,然后是其余数据。对于这个例子,我只是在每行上使用一个数字,然后对它们求平均值,就像一个非常简单的例子。

然后我在 mrjob 中进行了以下 map-reduce 步骤。

class MRAvgPairwiseLines(MRJob):

def input_mapper(self, _, value):
"""Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""

fnum, val = value.split('\t')
yield 'ALL', (fnum, val)

def input_reducer(self, key, values):

for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
yield fnum1, (fnum1, fnum2, val1, val2)

def do_avg(self, key, value):

fnum1, fnum2, val1, val2 = value
res = (float(val1)+float(val2))/float(2)
yield key, (fnum2, res)

def get_max_avg(self, key, values):

max_fnum, max_avg = max(values, key = lambda x: x[1])
yield key, (max_fnum, max_avg)

def steps(self):
return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]

这样,input_mapper 函数的所有输出都被分组到相同的 input_reducer,然后 yield 连续对。然后将它们传递到适当的位置以最终返回最大的平均值(这实际上是所有其他文件中最大的项目)。

希望对某人有所帮助。

关于python - MapReduce 对多个文件中的所有行进行成对比较,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6643521/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com