gpt4 book ai didi

java - Hadoop mapreduce空输入格式

转载 作者:行者123 更新时间:2023-12-02 21:35:39 25 4
gpt4 key购买 nike

我只需要执行一次映射任务,并且在hdfs上没有输入目录。

这是OozieLauncherInputFormat,它们只执行一次map,但是使用mapred API。

我想要实现mapreduce API。
我尝试做喜欢他们。

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
*
*/
public class EmptyInputFormat extends InputFormat<Object, Object> {

boolean isReadingDone = false;

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
ArrayList<InputSplit> splits = new ArrayList<>();
splits.add(new EmptySplits());
return splits;
}

@Override
public RecordReader<Object, Object> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new RecordReader<Object, Object>() {
@Override
public void initialize(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isReadingDone) {
return false;
} else {
isReadingDone = true;
return true;
}
}

@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return new Object();
}

@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return new Object();
}

@Override
public float getProgress() throws IOException, InterruptedException {
if (isReadingDone) {
return 1.0f;
} else {
return 0.0f;
}
}

@Override
public void close() throws IOException {

}
};
}

public static class EmptySplits extends InputSplit implements Writable {

@Override
public long getLength() throws IOException, InterruptedException {
return 0L;
}

@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}

@Override
public void write(DataOutput out) throws IOException {

}

@Override
public void readFields(DataInput in) throws IOException {

}
}
}

但是我有Mapper然后执行。
Error: java.lang.ClassCastException: java.lang.Object cannot be cast to java.lang.Long
at ListenerMapper.map(ListenerMapper.java:19)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

ListenerMapper类
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;

import lib.Parameters;
import exception.ListenerException;

/**
* Mapper class for Listener
*/
public class ListenerMapper extends Mapper<Long, Text, Text, Text> {

@Override
protected void map(Long key, Text value, Context context)
throws IOException, InterruptedException {
Map<String, String> parameters = new HashMap<>();

Configuration configuration = new Configuration();

for (Map.Entry<String, String> entry : configuration) {
parameters.put(entry.getKey(), entry.getValue());
}

configuration.addResource(new Path("file:///",
System.getProperty(
"oozie.action.conf.xml")));

TaskAttemptID attemptId = null;
String command = System.getProperties().getProperty("sun.java.command");
for (String arg : command.split(" ")) {
if (arg.startsWith("attempt_")) {
attemptId = TaskAttemptID.forName(arg);
break;
}
}
if (attemptId != null) {
parameters.put(Parameters.MAPREDUCE_ATTEMPT_ID,
attemptId.toString());
parameters.put(Parameters.MAPREDUCE_TASK_ID,
attemptId.getTaskID().toString());
parameters.put(Parameters.MAPREDUCE_JOB_ID,
attemptId.getJobID().toString());
}
System.out.println("parameters = " + parameters);
try {
Main.listenerBootstrap(parameters);
} catch (ListenerException le) {
throw new RuntimeException(le);
}


}
}

最佳答案

我通过在Mapper中使用通用类型Object来解决它

public class ListenerMapper extends Mapper<Object, Object, Object, Object>

关于java - Hadoop mapreduce空输入格式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32497038/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com