gpt4 book ai didi

java - 在 Hadoop 中是否可以为 TextOutputFormat 指定记录分隔符

转载 作者:可可西里 更新时间:2023-11-01 14:18:49 25 4
gpt4 key购买 nike

我看到一种机制可以使用 mapreduce.textoutputformat.separator(使用 1.03 的 api)覆盖键和值之间的分隔符。但我希望能够控制记录之间的分隔符。仅供引用,我使用 ArrayWritable 作为值,使用 NullWritable 作为键。

最佳答案

据我所知,这是不可能的,因为 TextOutputFormat 使用 toString() 来获取值的文本表示,在 ArrayWritable< 的情况下 它没有实现 toString(),所以如果您要编写一个 ArrayWritable,您可能会得到默认的 Object.toString() 到您的 Reducer 的输出。或者您可能打算更改行之间的分隔符,在这种情况下,这与 TextOutputFormat 默认使用 \n 字符的问题相同,如 climbage 所指出的那样。

也就是说,您可以通过实现自定义输出格式来实现,您可以在其中定义自己的 RecordWriter 并在 getRecordWriter 方法中拥有自定义配置属性。这是这样一个类(未测试)的快速和肮脏的实现,它应该做你需要的,让你通过属性 ma​​pred.arraywritable.separator ArrayWritable 的分隔符strong> 和带有 ma​​pred.line.separator 的行之间的分隔符:

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class ArrayTextOutputFormat<K, V> extends TextOutputFormat<K, V> {

protected static class ArrayLineRecordWriter<K, V> extends
LineRecordWriter<K, V> {


private static final String utf8 = "UTF-8";
private final byte[] arraySeparator;
private final byte[] keyValueSeparator;
private final byte[] lineSeparator;

public ArrayLineRecordWriter(DataOutputStream out,
String keyValueSeparator, String arraySeparator, String lineSeparator) {
super(out);
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
this.arraySeparator = arraySeparator.getBytes(utf8);
this.lineSeparator = lineSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8
+ " encoding");
}
}

private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else if (o instanceof ArrayWritable) {
ArrayWritable awo = (ArrayWritable) o;
for (String wrt : awo.toStrings()) {
out.write(wrt.toString().getBytes(utf8));
out.write(arraySeparator);
}
} else {
out.write(o.toString().getBytes(utf8));
}
}

public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(lineSeparator);
}
}

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = conf.get(
"mapred.textoutputformat.separator", "\t");
String arraySeparator = conf.get("mapred.arraywritable.separator", "|");
String lineSeparator = conf.get("mapred.line.separator");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new ArrayLineRecordWriter<K, V>(fileOut, keyValueSeparator,
arraySeparator, lineSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new ArrayLineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator,
arraySeparator, lineSeparator);
}
}
}

关于java - 在 Hadoop 中是否可以为 TextOutputFormat 指定记录分隔符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16927929/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com