gpt4 book ai didi

java - CustomArrayWritable 类的 toString() 方法中的 NullPointerException,MapReduce

转载 作者:可可西里 更新时间:2023-11-01 15:58:55 25 4
gpt4 key购买 nike

我正在尝试使用 Time_Ant10s(自定义 ArrayWritable 类)作为 Reducer 的输出。

我指的是这个好问题:MapReduce Output ArrayWritable ,但我在 Reducer 的最后一行的 context.write() 中得到 NullPointerException。

我想 Time_Ant10s.toString() 中的 get() 可能会返回 null,但我不知道为什么会这样。你能帮帮我吗?

主要方法

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "something");

// general
job.setJarByClass(CommutingTime1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setNumReduceTasks(1);
job.setInputFormatClass (TextInputFormat.class);

// mapper output
job.setMapOutputKeyClass(Date_Uid.class);
job.setMapOutputValueClass(Time_Ant10.class);

// reducer output
job.setOutputFormatClass(CommaTextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Time_Ant10s.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

映射器

public static class Mapper1 extends Mapper<LongWritable, Text, Date_Uid, Time_Ant10> {
/* map as <date_uid, time_ant10> */
// omitted
}
}

reducer

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
/* <date_uid, time_ant10> -> <date, time_ant10s> */

private IntWritable date = new IntWritable();

@Override
protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {

date.set(date_uid.getDate());

// count ants
int num = 0;
for(Time_Ant10 time_ant10 : time_ant10s){
num++;
}

if(num>=1){
Time_Ant10[] temp = new Time_Ant10[num];

int i=0;
for(Time_Ant10 time_ant10 : time_ant10s){
String time = time_ant10.getTimeStr();
int ant10 = time_ant10.getAnt10();
temp[i] = new Time_Ant10(time, ant10);
i++;
}

context.write(date, new Time_Ant10s(temp));
}
}
}

作者

public static class CommaTextOutputFormat extends TextOutputFormat<IntWritable, Time_Ant10s> {
@Override
public RecordWriter<IntWritable, Time_Ant10s> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
String extension = ".txt";
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<IntWritable, Time_Ant10s>(fileOut, ",");
}
}

自定义可写

// Time
public static class Time implements Writable {
private int h, m, s;

public Time() {}

public Time(int h, int m, int s) {
this.h = h;
this.m = m;
this.s = s;
}

public Time(String time) {
String[] hms = time.split(":", 0);
this.h = Integer.parseInt(hms[0]);
this.m = Integer.parseInt(hms[1]);
this.s = Integer.parseInt(hms[2]);
}

public void set(int h, int m, int s) {
this.h = h;
this.m = m;
this.s = s;
}

public void set(String time) {
String[] hms = time.split(":", 0);
this.h = Integer.parseInt(hms[0]);
this.m = Integer.parseInt(hms[1]);
this.s = Integer.parseInt(hms[2]);
}

public int[] getTime() {
int[] time = new int[3];
time[0] = this.h;
time[1] = this.m;
time[2] = this.s;
return time;
}

public String getTimeStr() {
return String.format("%1$02d:%2$02d:%3$02d", this.h, this.m, this.s);
}

public int getTimeInt() {
return this.h * 10000 + this.m * 100 + this.s;
}

@Override
public void readFields(DataInput in) throws IOException {
h = in.readInt();
m = in.readInt();
s = in.readInt();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(h);
out.writeInt(m);
out.writeInt(s);
}
}

// Time_Ant10
public static class Time_Ant10 implements Writable {
private Time time;
private int ant10;

public Time_Ant10() {
this.time = new Time();
}

public Time_Ant10(Time time, int ant10) {
this.time = time;
this.ant10 = ant10;
}

public Time_Ant10(String time, int ant10) {
this.time = new Time(time);
this.ant10 = ant10;
}

public void set(Time time, int ant10) {
this.time = time;
this.ant10 = ant10;
}

public void set(String time, int ant10) {
this.time = new Time(time);
this.ant10 = ant10;
}

public int[] getTime() {
return this.time.getTime();
}

public String getTimeStr() {
return this.time.getTimeStr();
}

public int getTimeInt() {
return this.time.getTimeInt();
}

public int getAnt10() {
return this.ant10;
}

@Override
public void readFields(DataInput in) throws IOException {
time.readFields(in);
ant10 = in.readInt();
}

@Override
public void write(DataOutput out) throws IOException {
time.write(out);
out.writeInt(ant10);
}
}

// Time_Ant10s
public static class Time_Ant10s extends ArrayWritable {
public Time_Ant10s(){
super(Time_Ant10.class);
}

public Time_Ant10s(Time_Ant10[] time_ant10s){
super(Time_Ant10.class, time_ant10s);
}

@Override
public Time_Ant10[] get() {
return (Time_Ant10[]) super.get();
}

@Override
public String toString() {
int time, ant10;
Time_Ant10[] time_ant10s = get();
String output = "";

for(Time_Ant10 time_ant10: time_ant10s){
time = time_ant10.getTimeInt();
ant10 = time_ant10.getAnt10();
output += time + "," + ant10 + ",";
}

return output;
}
}

// Data_Uid
public static class Date_Uid implements WritableComparable<Date_Uid> {
// omitted
}

错误信息

java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.NullPointerException
at CommutingTime1$Time_Ant10s.toString(CommutingTime1.java:179)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at CommutingTime1$Reducer1.reduce(CommutingTime1.java:323)
at CommutingTime1$Reducer1.reduce(CommutingTime1.java:291)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

最佳答案

我发现问题是reduce中的Iterable不能迭代两次。所以我指的是this page并如下更改 reducer 和 Time_Ant10s。现在一切顺利。

@redflar3:非常感谢你给我提示。我完全误解了我的代码哪里有错误。

reducer

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
private IntWritable date = new IntWritable();

@Override
protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
String time = "";
int ant10;

date.set(date_uid.getDate());

ArrayList<Time_Ant10> temp_list = new ArrayList<Time_Ant10>();
for (Time_Ant10 time_ant10 : time_ant10s){
time = time_ant10.getTimeStr();
ant10 = time_ant10.getAnt10();
temp_list.add(new Time_Ant10(time, ant10));
}

if(temp_list.size() >= 1){
Time_Ant10[] temp_array = temp_list.toArray(new Time_Ant10[temp_list.size()]);
context.write(date, new Time_Ant10s(temp_array));
}
}
}

Time_Ant10s

public static class Time_Ant10s extends ArrayWritable {
public Time_Ant10s(){
super(Time_Ant10.class);
}

public Time_Ant10s(Time_Ant10[] time_ant10s){
super(Time_Ant10.class, time_ant10s);
}

@Override
public Time_Ant10[] get() {
return (Time_Ant10[]) super.get();
}

@Override
public String toString() {
int time, ant10;
Time_Ant10[] time_ant10s = get();
String output = "";

for(Time_Ant10 time_ant10: time_ant10s){
time = time_ant10.getTimeInt();
ant10 = time_ant10.getAnt10();
output += time + "," + ant10 + ",";
}

return output;
}
}

关于java - CustomArrayWritable 类的 toString() 方法中的 NullPointerException,MapReduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40404979/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com