gpt4 book ai didi

java - 带有 Phoenix 的MapReduce:org.apache.hadoop.io.LongWritable无法转换为org.apache.hadoop.io.NullWritable

转载 作者:行者123 更新时间:2023-12-02 21:30:56 28 4
gpt4 key购买 nike

我正在尝试从另一个表(“mea_data”)中收集的数据中将值插入表(“mea_interval”)中。这个想法不是唯一的,它标识数据类型。我使用MeasureWritable类来读写数据库,它实现了DBWritable和Writable。运行jar时出现错误:

15/12/15 10:13:38 WARN mapred.LocalJobRunner: job_local957174264_0001
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.NullWritable
at org.apache.phoenix.mapreduce.PhoenixRecordWriter.write(PhoenixRecordWriter.java:39)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:449))

我可以读取表mea_data中的值。如果我在控制台中显示,它们看起来不错。我认为该错误在 map 中执行context.write时发生,但我不明白为什么。

我为您附上了作业配置和我的 map 类的代码。如果您想查看我的代码的另一部分,请不要犹豫。

预先谢谢你。 :)

作业配置:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;

public class Application {
public static void main(String[] args) {
final Configuration configuration = HBaseConfiguration.create();
final Job job;
try {
job = Job.getInstance(configuration, "phoenix-mr-job");
final String selectQuery = "SELECT * FROM \"mea_data\" where \"timestamp\" > 1450168200";
PhoenixMapReduceUtil.setInput(job, MeasureWritable.class, "mea_data", selectQuery);

// Set the target Phoenix table and the columns
PhoenixMapReduceUtil.setOutput(job, "\"mea_interval\"", "id_collection,startDate,endDate,value");

job.setMapperClass(MeasureMapper.class);
job.setReducerClass(MeasureReducer.class);

job.setOutputFormatClass(PhoenixOutputFormat.class);
// job.setInputFormatClass(PhoenixInputFormat.class);
job.setNumReduceTasks(10);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(MeasureWritable.class);

// TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}

映射器类:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MeasureMapper extends Mapper<NullWritable , MeasureWritable, LongWritable, Text> {

@Override
protected void map(NullWritable key, MeasureWritable measureWritable, Context context) throws IOException, InterruptedException {
final long timestamp = measureWritable.getTimestamp();
double val = measureWritable.getValue();
final long id = measureWritable.getId();
System.out.print("id : "+ new LongWritable(id));
System.out.print(" timestamp : "+ timestamp);
System.out.println(" val : "+ val);
try{
context.write(new LongWritable(id), new Text(timestamp + ";" + val));
} catch (Exception e) {
e.printStackTrace();
}
}
}

reducer 类:
import java.io.IOException;
import java.text.NumberFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MeasureReducer extends Reducer<LongWritable, Iterable<Text>, NullWritable, MeasureWritable> {

protected void reduce(LongWritable key, Iterable<Text> valeurs, Context context) throws IOException, InterruptedException {
MeasureWritable interval = new MeasureWritable();
interval.setId(Long.valueOf(key.toString()).longValue());
NumberFormat nf = NumberFormat.getInstance();
for(Text valeur : valeurs) {
String[] array = valeur.toString().split(";", -1);
interval.setStartingDate(Long.valueOf(array[0]).longValue());
interval.setEndingDate(Long.valueOf(array[0]).longValue());
try {
interval.setValue(nf.parse(array[1]).doubleValue() );
} catch (Exception e) {
e.printStackTrace();
}
}
context.write(NullWritable.get(), interval);
}
}

最佳答案

使用LongWritable作为Mapper的输入键以及map方法的第一个参数,而不是NullWritable。

关于java - 带有 Phoenix 的MapReduce:org.apache.hadoop.io.LongWritable无法转换为org.apache.hadoop.io.NullWritable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34286230/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com