gpt4 book ai didi

python - 如何在多步 map-reduce 程序中运行一次最终的 'print' 语句?

转载 作者:可可西里 更新时间:2023-11-01 16:18:16 25 4
gpt4 key购买 nike

我主要是尝试通过在 Hadoop 上扩展来实现推荐系统。

在第一步中,我尝试计算输入文件中每对项目之间的相似度。如果我将其简单地存储为

{A项,B项,相似度}

输出文件大小变得非常非常大(对于 60kb 输入,我得到的输出文件大小为 6mb)。

因此我想是否将结果存储在 python dict 中并在整个 map reduce 程序结束后仅打印一次 dict 会更好。我这样做不成功请帮助我。

我的 python 代码是:

#!/usr/bin/env python
from mrjob.job import MRJob
from math import sqrt

from itertools import combinations

PRIOR_COUNT = 10

PRIOR_CORRELATION = 0

prefs={}

def correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared):
'''
The correlation between two vectors A, B is
[n * dotProduct(A, B) - sum(A) * sum(B)] /
sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }

'''
numerator = size * dot_product - rating_sum * rating2sum
denominator = sqrt(size * rating_norm_squared - rating_sum * rating_sum) * \
sqrt(size * rating2_norm_squared - rating2sum * rating2sum)

return (numerator / (float(denominator))) if denominator else 0.0


def regularized_correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared,
virtual_cont, prior_correlation):
'''
The Regularized Correlation between two vectors A, B

RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
where w = # actualPairs / (# actualPairs + # virtualPairs).
'''
unregularizedCorrelation = correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared)

w = size / float(size + virtual_cont)

return w * unregularizedCorrelation + (1.0 - w) * prior_correlation

class SemicolonValueProtocol(object):

# don't need to implement read() since we aren't using it

def write(self, key, values):
return ';'.join(str(v) for v in values)

class BooksSimilarities(MRJob):

#OUTPUT_PROTOCOL = SemicolonValueProtocol

def steps(self):
return [
self.mr(mapper=self.group_by_user_rating,
reducer=self.count_ratings_users_freq),
self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity),
self.mr(mapper=self.calculate_ranking,
reducer=self.top_similar_items)]

def group_by_user_rating(self, key, line):
'''
Emit the user_id and group by their ratings (item and rating)

17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2

'''
line=line.replace("\"","");
user_id, item_id, rating = line.split(',')

yield user_id, (item_id, float(rating))

def count_ratings_users_freq(self, user_id, values):
'''
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.

17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)

'''
item_count = 0
item_sum = 0
final = []
for item_id, rating in values:
item_count += 1
item_sum += rating
final.append((item_id, rating))

yield user_id, (item_count, item_sum, final)

def pairwise_items(self, user_id, values):
'''
The output drops the user from the key entirely, instead it emits
the pair of items as the key:

19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2

'''
item_count, item_sum, ratings = values
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), (item1[1], item2[1])

def calculate_similarity(self, pair_key, lines):
'''
Sum components of each corating pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
corating counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.

19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1

'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y in lines:
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
sum_xx += item_x * item_x
sum_yy += item_y * item_y
n += 1

reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy, PRIOR_COUNT, PRIOR_CORRELATION)

yield (item_xname, item_yname), (reg_corr_sim, n)


def calculate_ranking(self, item_keys, values):
'''
Emit items with similarity in key for ranking:

19,0.4 70,1
19,0.6 21,2
21,0.6 19,2
21,0.9 70,1
70,0.4 19,1
70,0.9 21,1

'''
reg_corr_sim, n = values
item_x, item_y = item_keys
if int(n) > 0:
yield (item_x, reg_corr_sim),(item_y, n)

def top_similar_items(self, key_sim, similar_ns):
'''
For each item emit K closest items in comma separated file:

De La Soul;A Tribe Called Quest;0.6;1
De La Soul;2Pac;0.4;2

'''
item_x, reg_corr_sim = key_sim
for item_y, n in similar_ns:
#yield None, (item_x, item_y, reg_corr_sim, n)
prefs.setdefault(item_x,{})
prefs[item_x][item_y] = float(reg_corr_sim)
prefs.setdefault(item_y,{})
prefs[item_y][item_x] = float(reg_corr_sim)
print "exiting"

if __name__ == '__main__':
BooksSimilarities.run()

那么执行后我想要什么

python thisfile.py < input.csv -r hadoop > output.txt

是一个相对较小的输出文件,没有重复,只有一个字典。

简而言之,

目前这个程序打印exiting n 次,但我希望它只打印一次。

除此之外,还有任何更好的方法可以通过以更好的方式扩展 hadoop 来实现协同过滤。

提前致谢。

最佳答案

你只能保证具有相同key的values会去到同一个reducer。因此,如果您在集群上运行多个 reducer,工作将被分开,并且当 reducer 运行以完成所有键上的任务时,您将有许多“退出”。

尝试在本地运行并验证其是否正常工作:python thisfile.py < 输入.csv > 输出.txt

也许您可以在您的 steps() 中定义一个“reducer_final”来获取所有最后一步的 reducer 输出并按照您的意愿进行管理。

检查:http://pythonhosted.org/mrjob/job.html#mrjob.job.MRJob.steps

亲切的问候,

关于python - 如何在多步 map-reduce 程序中运行一次最终的 'print' 语句?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15225146/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com