gpt4 book ai didi

java - 在 Java 中加入 Hadoop

转载 作者:可可西里 更新时间:2023-11-01 16:27:02 24 4
gpt4 key购买 nike

<分区>

我从很短的时间开始使用 Hadoop 并尝试在 Java 中实现连接。 Map-Side 或 Reduce-Side 并不重要。我采用了 Reduce-Side join,因为它应该更容易实现。我知道 Java 不是连接、聚合等的最佳选择,最好选择我已经做过的 Hive 或 Pig。但是,我正在进行一个研究项目,我必须使用所有这 3 种语言才能进行比较。

无论如何,我有两个结构不同的输入文件。一个是键|值,另一个是键|值1;值2;值3;值4。每个输入文件中的一条记录如下所示:

  • 输入 1:1;2010-01-10T00:00:01
  • 输入 2:1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59

我遵循了 Hadoop Definitve Guide 一书中的示例,但它对我不起作用。我在这里发布了我的 java 文件,所以你可以看到哪里出了问题。

public class LookupReducer extends Reducer<TextPair,Text,Text,Text> {


private String result = "";
private String msisdn;
private String attribute, product;
private long trans_dt_long, start_dt_long, end_dt_long;
private String trans_dt, start_dt, end_dt;

@Override
public void reduce(TextPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

context.progress();
//value without key to remember

Iterator<Text> iter = values.iterator();

for (Text val : values) {

Text recordNoKey = val; //new Text(iter.next());

String valSplitted[] = recordNoKey.toString().split(";");

//if the input is coming from CDR set corresponding values

if(key.getSecond().toString().equals(CDR.CDR_TAG))
{
trans_dt = recordNoKey.toString();
trans_dt_long = dateToLong(recordNoKey.toString());
}
//if the input is coming from Attributes set corresponding values
else if(key.getSecond().toString().equals(Attribute.ATT_TAG))
{
attribute = valSplitted[0];
product = valSplitted[1];
start_dt = valSplitted[2];
start_dt_long = dateToLong(valSplitted[2]);
end_dt = valSplitted[3];
end_dt_long = dateToLong(valSplitted[3]);;
}

Text record = val; //iter.next();
//System.out.println("RECORD: " + record);
Text outValue = new Text(recordNoKey.toString() + ";" + record.toString());

if(start_dt_long < trans_dt_long && trans_dt_long < end_dt_long)
{
//concat output columns
result = attribute + ";" + product + ";" + trans_dt;

context.write(key.getFirst(), new Text(result));
System.out.println("KEY: " + key);
}
}
}

private static long dateToLong(String date){
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parsedDate = null;
try {
parsedDate = formatter.parse(date);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long dateInLong = parsedDate.getTime();

return dateInLong;

}

public static class TextPair implements WritableComparable<TextPair> {

private Text first;
private Text second;

public TextPair(){
set(new Text(), new Text());
}

public TextPair(String first, String second){
set(new Text(first), new Text(second));
}

public TextPair(Text first, Text second){
set(first, second);
}

public void set(Text first, Text second){
this.first = first;
this.second = second;
}

public Text getFirst() {
return first;
}

public void setFirst(Text first) {
this.first = first;
}

public Text getSecond() {
return second;
}

public void setSecond(Text second) {
this.second = second;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first.readFields(in);
second.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
first.write(out);
second.write(out);
}

@Override
public int hashCode(){
return first.hashCode() * 163 + second.hashCode();
}

@Override
public boolean equals(Object o){
if(o instanceof TextPair)
{
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString(){
return first + ";" + second;
}

@Override
public int compareTo(TextPair tp) {
// TODO Auto-generated method stub
int cmp = first.compareTo(tp.first);
if(cmp != 0)
return cmp;
return second.compareTo(tp.second);
}


public static class FirstComparator extends WritableComparator {

protected FirstComparator(){
super(TextPair.class, true);
}

@Override
public int compare(WritableComparable comp1, WritableComparable comp2){
TextPair pair1 = (TextPair) comp1;
TextPair pair2 = (TextPair) comp2;
int cmp = pair1.getFirst().compareTo(pair2.getFirst());

if(cmp != 0)
return cmp;

return -pair1.getSecond().compareTo(pair2.getSecond());
}
}

public static class GroupComparator extends WritableComparator {
protected GroupComparator()
{
super(TextPair.class, true);
}

@Override
public int compare(WritableComparable comp1, WritableComparable comp2)
{
TextPair pair1 = (TextPair) comp1;
TextPair pair2 = (TextPair) comp2;

return pair1.compareTo(pair2);
}
}

}

}

public class Joiner  extends Configured implements Tool {

public static final String DATA_SEPERATOR =";"; //Define the symbol for seperating the output data
public static final String NUMBER_OF_REDUCER = "1"; //Define the number of the used reducer jobs
public static final String COMPRESS_MAP_OUTPUT = "true"; //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false")
public static final String
USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec"; //set the used codec for the data compression
public static final boolean JOB_RUNNING_LOCAL = true; //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true
//if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false
public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false



public static class KeyPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) {
System.out.println("numPartitions: " + numPartitions);
return (/*[*/key.getFirst().hashCode()/*]*/ & Integer.MAX_VALUE) % numPartitions;
}
}

private static Configuration hadoopconfig() {
Configuration conf = new Configuration();

conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR);
conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT);
//conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC);
conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER);
return conf;
}

@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
if ((args.length != 3) && (JOB_RUNNING_LOCAL)) {

System.err.println("Usage: Lookup <CDR-inputPath> <Attribute-inputPath> <outputPath>");
System.exit(2);
}

//starting the Hadoop job
Configuration conf = hadoopconfig();
Job job = new Job(conf, "Join cdrs and attributes");
job.setJarByClass(Joiner.class);

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class);
//FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //expecting a folder instead of a file

if(JOB_RUNNING_LOCAL)
FileOutputFormat.setOutputPath(job, new Path(args[2]));
else
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));


job.setPartitionerClass(KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
job.setReducerClass(LookupReducer.class);

job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new Joiner(), args);
System.exit(exitCode);

}
}

public class Attribute {

public static final String ATT_TAG = "1";


public static class AttributeMapper
extends Mapper<LongWritable, Text, TextPair, Text>{

private static Text values = new Text();
//private Object output = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//partition the input line by the separator semicolon
String[] attributes = value.toString().split(";");
String valuesInString = "";

if(attributes.length != 5)
System.err.println("Input column number not correct. Expected 5, provided " + attributes.length
+ "\n" + "Check the input file");
if(attributes.length == 5)
{
//setting the values with the input values read above
valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4];
values.set(valuesInString);
//writing out the key and value pair
context.write( new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values);
}
}
}

}

public class CDR {


public static final String CDR_TAG = "0";

public static class CDRMapper
extends Mapper<LongWritable, Text, TextPair, Text>{

private static Text values = new Text();
private Object output = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//partition the input line by the separator semicolon
String[] cdr = value.toString().split(";");

//setting the values with the input values read above
values.set(cdr[1]);
//output = CDR_TAG + cdr[1];

//writing out the key and value pair
context.write( new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values);
}


}

}

如果有人至少可以发布教程链接或实现此类连接功能的简单示例,我会很高兴。找了很多,要么代码不全,要么解释不够。

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com