gpt4 book ai didi

使用 Java Mapper/Reducer 的 Hadoop Streaming

转载 作者:可可西里 更新时间:2023-11-01 16:17:50 27 4
gpt4 key购买 nike

我正在尝试使用 java Mapper/Reducer 在一些维基百科转储(压缩的 bz2 形式)上运行 hadoop 流作业。我正在尝试使用 WikiHadoop ,这是维基媒体最近发布的一个界面。

WikiReader_Mapper.java

package courseproj.example;

// Mapper: emits (token, 1) for every article occurrence.
public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {

// Reuse objects to save overhead of object creation.
private final static Text KEY = new Text();
private final static IntWritable VALUE = new IntWritable(1);

@Override
public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
throws IOException {
KEY.set("article count");
collector.collect(KEY, VALUE);
}
}

WikiReader_Reducer.java

package courseproj.example;

//Reducer: sums up all the counts.
public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

private final static IntWritable SUM = new IntWritable();

public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
SUM.set(sum);
collector.collect(key, SUM);
}
}

我正在运行的命令是

hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \
-libjars lib2/wikihadoop-0.2.jar \
-D mapreduce.input.fileinputformat.split.minsize=300000000 \
-D mapreduce.task.timeout=6000000 \
-D org.wikimedia.wikihadoop.previousRevision=false \
-input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \
-output out \
-inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
-mapper WikiReader_Mapper \
-reducer WikiReader_Reducer

我得到的错误信息是

Error: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)

Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)

Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)

与旧的相比,我更熟悉新的 hadoop API。由于我的 mapper 和 reducer 代码在两个不同的文件中,我在哪里定义作业的 JobConf 配置参数,同时遵循 hadoop 流的命令结构(明确设置 mapper 和 reducer 类)。有没有一种方法可以将映射器和缩减器代码全部包装到一个类中(扩展配置并实现工具,这是在新 API 中所做的)并将类名传递给 hadoop 流命令行与设置分别映射和归约类?

最佳答案

Streaming 使用旧的 API (org.apache.hadoop.mapred) - 但您的 mapper 和 reducer 类扩展了新的 API 类 (org.apache.hadoop.mapreduce >).

尝试更改您的映射器以实现 org.apache.hadoop.mapred.Mapper,并更改 reducer 以实现 org.apache.hadoop.mapred.Reducer,例如:

package courseproj.example;

// Mapper: emits ("article", 1) for every article occurrence.
public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> {

// Reuse objects to save overhead of object creation.
private final static Text KEY = new Text();
private final static IntWritable VALUE = new IntWritable(1);

@Override
public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
throws IOException, InterruptedException {
KEY.set("article count");
collector.collect(KEY, VALUE);
}
}

关于使用 Java Mapper/Reducer 的 Hadoop Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16043572/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com