gpt4 book ai didi

java - 处理 Hadoop 中 reduce 步骤的大输出值

转载 作者:可可西里 更新时间:2023-11-01 14:27:11 26 4
gpt4 key购买 nike

在我的 MapReduce 程序的 Reduce 阶段,我正在执行的唯一操作是连接提供的 Iterator 中的每个值,如下所示:

public void reduce(Text key, Iterator<text> values,
OutputCollector<Text, Text> output, Reporter reporter) {
Text next;
Text outKey = new Text()
Text outVal = new Text();
StringBuilder sb = new StringBuilder();
while(values.hasNext()) {
next = values.next();
sb.append(next.toString());
if (values.hasNext())
sb.append(',');
}
outKey.set(key.toString());
outVal.set(sb.toSTring());
output.collect(outKey,outVal);
}

我的问题是一些reduce 输出值是巨大 行文本;如此之大以至于即使初始大小非常大,字符串缓冲区也必须将其大小增加(加倍)数倍以容纳迭代器的所有上下文,从而导致内存问题。

在传统的 Java 应用程序中,这表明缓冲写入文件是写入输出的首选方法。如何处理 Hadoop 中的超大输出键值对?我应该将结果直接流式传输到 HDFS 上的文件吗(每个 reduce 调用一个文件)?除了 output.collect 方法之外,还有其他方法可以缓冲输出吗?

注意:我已经最大限度地增加了我的内存/堆大小。此外,一些消息来源表明,增加 reducer 的数量有助于解决内存/堆问题,但这里的问题已直接追溯到 SringBuilder 在扩展其容量时的使用。

谢谢

最佳答案

并不是说我理解您为什么想要拥有巨大的值(value),但有一种方法可以做到这一点。

如果您编写自己的 OutputFormat,则可以修复 RecordWriter.write(Key, Value) 方法的行为,以根据 Key 值是否为空来处理值串联。

这样,在您的 reducer 中,您可以编写如下代码(键的第一个输出是实际键,之后的所有内容都是空键:

public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) {
boolean firstKey = true;
for (Text value : values) {
output.collect(firstKey ? key : null, value);
firstKey = false;
}
}

实际的 RecordWriter.write() 然后具有以下逻辑来处理空键/值连接逻辑:

    public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}

if (!nullKey) {
// if we've written data before, append a new line
if (dataWritten) {
out.write(newline);
}

// write out the key and separator
writeObject(key);
out.write(keyValueSeparator);
} else if (!nullValue) {
// write out the value delimiter
out.write(valueDelimiter);
}

// write out the value
writeObject(value);

// track that we've written some data
dataWritten = true;
}

public synchronized void close(Reporter reporter) throws IOException {
// if we've written out any data, append a closing newline
if (dataWritten) {
out.write(newline);
}

out.close();
}

你会注意到 close 方法也被修改为在最后写出的记录中写入一个尾随换行符

完整的代码 list 可以在 pastebin 上找到,这是测试输出:

key1    value1
key2 value1,value2,value3
key3 value1,value2

关于java - 处理 Hadoop 中 reduce 步骤的大输出值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10140171/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com