- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
<分区>
我从很短的时间开始使用 Hadoop 并尝试在 Java 中实现连接。 Map-Side 或 Reduce-Side 并不重要。我采用了 Reduce-Side join,因为它应该更容易实现。我知道 Java 不是连接、聚合等的最佳选择,最好选择我已经做过的 Hive 或 Pig。但是,我正在进行一个研究项目,我必须使用所有这 3 种语言才能进行比较。
无论如何,我有两个结构不同的输入文件。一个是键|值,另一个是键|值1;值2;值3;值4。每个输入文件中的一条记录如下所示:
1;2010-01-10T00:00:01
1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59
我遵循了 Hadoop Definitve Guide 一书中的示例,但它对我不起作用。我在这里发布了我的 java 文件,所以你可以看到哪里出了问题。
public class LookupReducer extends Reducer<TextPair,Text,Text,Text> {
private String result = "";
private String msisdn;
private String attribute, product;
private long trans_dt_long, start_dt_long, end_dt_long;
private String trans_dt, start_dt, end_dt;
@Override
public void reduce(TextPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
context.progress();
//value without key to remember
Iterator<Text> iter = values.iterator();
for (Text val : values) {
Text recordNoKey = val; //new Text(iter.next());
String valSplitted[] = recordNoKey.toString().split(";");
//if the input is coming from CDR set corresponding values
if(key.getSecond().toString().equals(CDR.CDR_TAG))
{
trans_dt = recordNoKey.toString();
trans_dt_long = dateToLong(recordNoKey.toString());
}
//if the input is coming from Attributes set corresponding values
else if(key.getSecond().toString().equals(Attribute.ATT_TAG))
{
attribute = valSplitted[0];
product = valSplitted[1];
start_dt = valSplitted[2];
start_dt_long = dateToLong(valSplitted[2]);
end_dt = valSplitted[3];
end_dt_long = dateToLong(valSplitted[3]);;
}
Text record = val; //iter.next();
//System.out.println("RECORD: " + record);
Text outValue = new Text(recordNoKey.toString() + ";" + record.toString());
if(start_dt_long < trans_dt_long && trans_dt_long < end_dt_long)
{
//concat output columns
result = attribute + ";" + product + ";" + trans_dt;
context.write(key.getFirst(), new Text(result));
System.out.println("KEY: " + key);
}
}
}
private static long dateToLong(String date){
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parsedDate = null;
try {
parsedDate = formatter.parse(date);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long dateInLong = parsedDate.getTime();
return dateInLong;
}
public static class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair(){
set(new Text(), new Text());
}
public TextPair(String first, String second){
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second){
set(first, second);
}
public void set(Text first, Text second){
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public void setFirst(Text first) {
this.first = first;
}
public Text getSecond() {
return second;
}
public void setSecond(Text second) {
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first.readFields(in);
second.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
first.write(out);
second.write(out);
}
@Override
public int hashCode(){
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o){
if(o instanceof TextPair)
{
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString(){
return first + ";" + second;
}
@Override
public int compareTo(TextPair tp) {
// TODO Auto-generated method stub
int cmp = first.compareTo(tp.first);
if(cmp != 0)
return cmp;
return second.compareTo(tp.second);
}
public static class FirstComparator extends WritableComparator {
protected FirstComparator(){
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable comp1, WritableComparable comp2){
TextPair pair1 = (TextPair) comp1;
TextPair pair2 = (TextPair) comp2;
int cmp = pair1.getFirst().compareTo(pair2.getFirst());
if(cmp != 0)
return cmp;
return -pair1.getSecond().compareTo(pair2.getSecond());
}
}
public static class GroupComparator extends WritableComparator {
protected GroupComparator()
{
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable comp1, WritableComparable comp2)
{
TextPair pair1 = (TextPair) comp1;
TextPair pair2 = (TextPair) comp2;
return pair1.compareTo(pair2);
}
}
}
}
public class Joiner extends Configured implements Tool {
public static final String DATA_SEPERATOR =";"; //Define the symbol for seperating the output data
public static final String NUMBER_OF_REDUCER = "1"; //Define the number of the used reducer jobs
public static final String COMPRESS_MAP_OUTPUT = "true"; //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false")
public static final String
USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec"; //set the used codec for the data compression
public static final boolean JOB_RUNNING_LOCAL = true; //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true
//if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false
public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false
public static class KeyPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) {
System.out.println("numPartitions: " + numPartitions);
return (/*[*/key.getFirst().hashCode()/*]*/ & Integer.MAX_VALUE) % numPartitions;
}
}
private static Configuration hadoopconfig() {
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR);
conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT);
//conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC);
conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER);
return conf;
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
if ((args.length != 3) && (JOB_RUNNING_LOCAL)) {
System.err.println("Usage: Lookup <CDR-inputPath> <Attribute-inputPath> <outputPath>");
System.exit(2);
}
//starting the Hadoop job
Configuration conf = hadoopconfig();
Job job = new Job(conf, "Join cdrs and attributes");
job.setJarByClass(Joiner.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class);
//FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //expecting a folder instead of a file
if(JOB_RUNNING_LOCAL)
FileOutputFormat.setOutputPath(job, new Path(args[2]));
else
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.setPartitionerClass(KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
job.setReducerClass(LookupReducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Joiner(), args);
System.exit(exitCode);
}
}
public class Attribute {
public static final String ATT_TAG = "1";
public static class AttributeMapper
extends Mapper<LongWritable, Text, TextPair, Text>{
private static Text values = new Text();
//private Object output = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//partition the input line by the separator semicolon
String[] attributes = value.toString().split(";");
String valuesInString = "";
if(attributes.length != 5)
System.err.println("Input column number not correct. Expected 5, provided " + attributes.length
+ "\n" + "Check the input file");
if(attributes.length == 5)
{
//setting the values with the input values read above
valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4];
values.set(valuesInString);
//writing out the key and value pair
context.write( new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values);
}
}
}
}
public class CDR {
public static final String CDR_TAG = "0";
public static class CDRMapper
extends Mapper<LongWritable, Text, TextPair, Text>{
private static Text values = new Text();
private Object output = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//partition the input line by the separator semicolon
String[] cdr = value.toString().split(";");
//setting the values with the input values read above
values.set(cdr[1]);
//output = CDR_TAG + cdr[1];
//writing out the key and value pair
context.write( new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values);
}
}
}
如果有人至少可以发布教程链接或实现此类连接功能的简单示例,我会很高兴。找了很多,要么代码不全,要么解释不够。
我正在编写一个具有以下签名的 Java 方法。 void Logger(Method method, Object[] args); 如果一个方法(例如 ABC() )调用此方法 Logger,它应该
我是 Java 新手。 我的问题是我的 Java 程序找不到我试图用作的图像文件一个 JButton。 (目前这段代码什么也没做,因为我只是得到了想要的外观第一的)。这是我的主课 代码: packag
好的,今天我在接受采访,我已经编写 Java 代码多年了。采访中说“Java 垃圾收集是一个棘手的问题,我有几个 friend 一直在努力弄清楚。你在这方面做得怎么样?”。她是想骗我吗?还是我的一生都
我的 friend 给了我一个谜语让我解开。它是这样的: There are 100 people. Each one of them, in his turn, does the following
如果我将使用 Java 5 代码的应用程序编译成字节码,生成的 .class 文件是否能够在 Java 1.4 下运行? 如果后者可以工作并且我正在尝试在我的 Java 1.4 应用程序中使用 Jav
有关于why Java doesn't support unsigned types的问题以及一些关于处理无符号类型的问题。我做了一些搜索,似乎 Scala 也不支持无符号数据类型。限制是Java和S
我只是想知道在一个 java 版本中生成的字节码是否可以在其他 java 版本上运行 最佳答案 通常,字节码无需修改即可在 较新 版本的 Java 上运行。它不会在旧版本上运行,除非您使用特殊参数 (
我有一个关于在命令提示符下执行 java 程序的基本问题。 在某些机器上我们需要指定 -cp 。 (类路径)同时执行java程序 (test为java文件名与.class文件存在于同一目录下) jav
我已经阅读 StackOverflow 有一段时间了,现在我才鼓起勇气提出问题。我今年 20 岁,目前在我的家乡(罗马尼亚克卢日-纳波卡)就读 IT 大学。足以介绍:D。 基本上,我有一家提供簿记应用
我有 public JSONObject parseXML(String xml) { JSONObject jsonObject = XML.toJSONObject(xml); r
我已经在 Java 中实现了带有动态类型的简单解释语言。不幸的是我遇到了以下问题。测试时如下代码: def main() { def ks = Map[[1, 2]].keySet()
一直提示输入 1 到 10 的数字 - 结果应将 st、rd、th 和 nd 添加到数字中。编写一个程序,提示用户输入 1 到 10 之间的任意整数,然后以序数形式显示该整数并附加后缀。 public
我有这个 DownloadFile.java 并按预期下载该文件: import java.io.*; import java.net.URL; public class DownloadFile {
我想在 GUI 上添加延迟。我放置了 2 个 for 循环,然后重新绘制了一个标签,但这 2 个 for 循环一个接一个地执行,并且标签被重新绘制到最后一个。 我能做什么? for(int i=0;
我正在对对象 Student 的列表项进行一些测试,但是我更喜欢在 java 类对象中创建硬编码列表,然后从那里提取数据,而不是连接到数据库并在结果集中选择记录。然而,自从我这样做以来已经很长时间了,
我知道对象创建分为三个部分: 声明 实例化 初始化 classA{} classB extends classA{} classA obj = new classB(1,1); 实例化 它必须使用
我有兴趣使用 GPRS 构建车辆跟踪系统。但是,我有一些问题要问以前做过此操作的人: GPRS 是最好的技术吗?人们意识到任何问题吗? 我计划使用 Java/Java EE - 有更好的技术吗? 如果
我可以通过递归方法反转数组,例如:数组={1,2,3,4,5} 数组结果={5,4,3,2,1}但我的结果是相同的数组,我不知道为什么,请帮助我。 public class Recursion { p
有这样的标准方式吗? 包括 Java源代码-测试代码- Ant 或 Maven联合单元持续集成(可能是巡航控制)ClearCase 版本控制工具部署到应用服务器 最后我希望有一个自动构建和集成环境。
我什至不知道这是否可能,我非常怀疑它是否可能,但如果可以,您能告诉我怎么做吗?我只是想知道如何从打印机打印一些文本。 有什么想法吗? 最佳答案 这里有更简单的事情。 import javax.swin
我是一名优秀的程序员,十分优秀!