gpt4 book ai didi

python - Hadoop 中 Map 函数的输入拆分

转载 作者:可可西里 更新时间:2023-11-01 15:35:53 26 4
gpt4 key购买 nike

这是我在 Hadoop 中的第一个实现。我正在尝试在 Map Reduce 中实现我的概率数据集算法。在我的数据集中,最后一列将有一些 id(数据集中唯一 id 的数量等于我的集群中的节点数量)。我必须根据此列值划分我的数据集,并且每组记录都应由集群中的每个节点处理。

例如,如果我的集群中有三个节点,对于下面的数据集,一个节点应该处理所有 id=1 的记录,另一个节点处理 id=2,另一个节点处理 id=3

name time  dept  id
--------------------
b1 2:00pm z1 1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2

我的 map 函数应该将每个拆分作为输入并在每个节点中并行处理。

我只是想了解,在 Hadoop 中可以采用哪种方法。将此数据集输入为我的 map 函数的输入,并使用 map 传递一个附加参数以根据 id 值拆分数据。或者预先将数据拆分为“n”(节点数)个子集并将其加载到节点中,如果这是正确的方法,如何根据值拆分数据并加载到不同的节点中。因为,我从阅读中了解到,hadoop 根据指定的大小将数据分成 block 。我们如何在加载时指定特定条件。加起来,我正在用 python 编写我的程序。

有人请指教。谢谢

最佳答案

对你来说最简单的事情可能是让 mapper 输出以 id 作为键的数据,这将保证一个 reducer 将获取特定 id 的所有记录,然后在 reducer 阶段进行处理。

例如,

输入数据:

 b1  2:00pm z1   1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2

映射器代码:

#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[-1]
print key + "\t" + line

map 输出:

 1  b1  2:00pm z1   1
2 b2 3:00pm z2 2
1 c1 4:00pm y2 1
3 b3 3:00pm z3 3
2 c4 4:00pm x2 2

Reducer 1 输入:

 1  b1  2:00pm z1   1
1 c1 4:00pm y2 1

Reducer 2 输入:

 2  b2  3:00pm z2   2

Reducer 3 输入:

 3  b3  3:00pm z3   3

reducer 代码:

#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
orig_line = "\t".join(cols[1:])
# do stuff...

请注意,通过这种方式,单个 reducer 可能会获得多个键,但数据将被排序,您可以使用 mapred.reduce.tasks 选项控制 reducer 的数量。

编辑如果你想按键在 reducer 中收集数据,你可以这样做(不确定它会按原样运行,但你明白了)

#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
# data_list has all the lines for key_id

last_key = None
data = []
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
if last_key and key != last_key:
process_data(last_key, data)
data = []
orig_line = "\t".join(cols[1:])
data.append(orig_line)
last_key = key
process_data(last_key, data)

如果您不担心在 reducer 步骤中耗尽内存,您可以像这样简化代码:

#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
# data_list has all the lines for key_id

all_data = defaultdict(list)
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
orig_line = "\t".join(cols[1:])
all_data[key].append(orig_line)
for key, data in all_data.iteritems():
process_data(key, data)

关于python - Hadoop 中 Map 函数的输入拆分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25720178/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com