- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
如何在 reducer 中使用 MultipleOutputs 类来编写多个输出,每个输出都可以有自己独特的配置? MultipleOutputs javadoc 中有一些文档,但它似乎仅限于文本输出。事实证明,MultipleOutputs 可以处理每个输出的输出路径、键类和值类,但尝试使用需要使用其他配置属性的输出格式失败。
(这个问题已经多次出现,但我试图回答它的尝试被挫败了,因为提问者实际上有一个不同的问题。由于这个问题已经花了我几天多的调查时间来回答,我正在回答根据 this Meta Stack Overflow question 的建议,我自己的问题在这里。)
最佳答案
我浏览了 MultipleOutputs 实现,发现它不支持任何具有除 outputDir、键类和值类之外的属性的 OutputFormatType。我尝试编写自己的 MultipleOutputs 类,但失败了,因为它需要调用 Hadoop 类中某处的私有(private)方法。
我只剩下一种似乎适用于所有情况以及输出格式和配置的所有组合的解决方法:编写我想使用的 OutputFormat 类的子类(这些子类最终证明是可重用的)。这些类了解其他 OutputFormats 正在同时使用,并且知道如何存储它们的属性。该设计利用了这样一个事实,即可以在请求其 RecordWriter 之前使用上下文配置 OutputFormat。
我已将其与 Cassandra 的 ColumnFamilyOutputFormat 一起使用:
package com.myorg.hadoop.platform;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public abstract class ConcurrentColumnFamilyOutputFormat
extends ColumnFamilyOutputFormat
implements Configurable {
private static String[] propertyName = {
"cassandra.output.keyspace" ,
"cassandra.output.keyspace.username" ,
"cassandra.output.keyspace.passwd" ,
"cassandra.output.columnfamily" ,
"cassandra.output.predicate",
"cassandra.output.thrift.port" ,
"cassandra.output.thrift.address" ,
"cassandra.output.partitioner.class"
};
private Configuration configuration;
public ConcurrentColumnFamilyOutputFormat() {
super();
}
public Configuration getConf() {
return configuration;
}
public void setConf(Configuration conf) {
configuration = conf;
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(property);
if (value != null) {
conf.set(propertyName[i], value);
}
}
}
public void configure(Configuration conf) {
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(propertyName[i]);
if (value != null) {
conf.set(property, value);
}
}
}
public abstract String getMultiOutputName();
对于你想要的 reducer 的每个 Cassandra(在本例中)输出,你都有一个类:
package com.myorg.multioutput.ReadCrawled;
import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;
public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {
public StrongOutputFormat() {
super();
}
@Override
public String getMultiOutputName() {
return "Strong";
}
}
然后您将在映射器/缩减器配置类中配置它:
// This is how you'd normally configure the ColumnFamilyOutputFormat
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
// This is how you tell the MultipleOutput-aware OutputFormat that
// it's time to save off the configuration so no other OutputFormat
// steps all over it.
new StrongOutputFormat().configure(job.getConfiguration());
// This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
// to out set of outputs
MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);
再举一个例子,FileOutputFormat 的 MultipleOutput 子类使用这些属性:
private static String[] propertyName = {
"mapred.output.compression.type" ,
"mapred.output.compression.codec" ,
"mapred.output.compress" ,
"mapred.output.dir"
};
并且将像上面的 ConcurrentColumnFamilyOutputFormat
一样实现,除了它会使用上述属性。
关于hadoop - 如何在 hadoop reducer 中写入不同格式的多个输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13574821/
我是一名优秀的程序员,十分优秀!