gpt4 book ai didi

python - 尽管python脚本在本地工作,但运行MapReduce代码时出错

转载 作者:行者123 更新时间:2023-12-02 21:41:19 25 4
gpt4 key购买 nike

我有一个mapper.py和reducer.py来处理输入文件,该文件只是具有以下格式的常规linux文件:

ID \t time \t duration \t Description \t status

基本上,我想将我的ID分组在化简器中,因此我按如下方式构造映射器:
#!/usr/bin/env python

import sys
import re

for line in sys.stdin:
#remove leading and trailing whitespace
line = line.strip()
#split the line into portions
portions = re.split(r'\t+',line)
#take the first column (which is block number) to emit as key
block = portions[0]
print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])

然后在reducer中,我将进行如下数据处理:
#!/usr/bin/env python

from operator import itemgetter
import sys

bitmapStr=""
current_block=None
block=start=duration=precision=status=""
round=0 #interval is every 11 mins or 660 seconds

for line in sys.stdin:
line=line.strip()
block,start,duration,precision,status=line.split('\t')

if current_block == block:
duration = int(duration)
while round < duration:
if(status.islower()):
bitmapStr=bitmapStr+"1"
else:
bitmapStr=bitmapStr+"0"
round = round + 660

#amount of time exceed this block record
round = round - duration

else:
if current_block:
print '%s\t%s' % (current_block,bitmapStr)
round=0
bitmapStr=""
current_block=block
duration = int(duration)
while round < duration:
if(status.islower()):
bitmapStr=bitmapStr+"1"
else:
bitmapStr=bitmapStr+"0"
round = round + 660
#amount of time exceed this block record
round = round - duration

if current_block == block:
print '%s\t%s' % (current_block,bitmapStr)

我通过执行以下操作在本地测试了mapper和reducer:
cat small_data_sample | ./mapper.py | sort -k1,1 | ./reducer.py
#output is working as I expect

但是,当我尝试通过Hadoop运行它时,会产生以下错误:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

运行hadoop的确切命令如下:
bin/hadoop jar hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapred.text.key.partitioner.options='-k1,1' \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options='-k1,1 -k2,2n' \
-D stream.num.map.output.key.fields=2 \
-input $hadoop_dir/data/sample \
-output $hadoop_dir/data/data_test1-output \
-mapper $dir/calBitmap_mapper.py \
-reducer $dir/calBitmap_reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

$ hadoop_dir是我的hdfs位置的路径,而$ dir是我的mapper和reducer python脚本所在的位置。

请让我知道我需要纠正该错误。先感谢您!

*编辑:我尝试使用其他输入文件(大小要小得多),它似乎可以正常工作。因此,我不知道为什么在输入文件较大时,MapReduce会中断

最佳答案

我找到了解决我的错误的方法。正是在mapper中,我对其他类型的输入没有特别注意。我的某些输入的前几行是注释,因此,由于索引超出范围,部分数组失败。为了解决这个问题,我添加了一个检查:

if len(portions) == 5: #make sure it has 5 elements in there
print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])

关于python - 尽管python脚本在本地工作,但运行MapReduce代码时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28513455/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com