gpt4 book ai didi

java - 鉴于我正在将 DataBag 溢出到磁盘,为什么这个 Pig UDF 会导致 "Error: Java heap space"?

转载 作者:可可西里 更新时间:2023-11-01 15:38:24 26 4
gpt4 key购买 nike

这是我的 UDF:

public DataBag exec(Tuple input) throws IOException { 
Aggregate aggregatedOutput = null;

int spillCount = 0;

DataBag outputBag = BagFactory.newDefaultBag();
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
//spillCount++;
...
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
outputBag.add(returnTuple);
spillCount++;
aggregatedOutputTuple = new Aggregate(tuple);


if (spillCount == 1000) {
outputBag.spill();
spillCount = 0;
}
}
}
return outputBag;
}

请注意这样一个事实,即每 1000 个输入元组,包就会溢出到磁盘。我已将此数字设置为低至 50 和高至 100,000,但仍然收到内存错误:

Pig logfile dump:

Backend error message
---------------------
Error: Java heap space

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

我该怎么做才能解决这个问题?它正在处理大约一百万行。

这是解决方案

使用累加器接口(interface):

public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
private DataBag outputBag = null;
private UltraAggregation currentAggregation = null;

public void accumulate(Tuple input) throws IOException {
DataBag values = (DataBag)input.get(0);
Aggregate aggregatedOutput = null;
outputBag = BagFactory.getInstance().newDefaultBag();

for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
...
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
outputBag.add(aggregatedOutput.getTuple());
aggregatedOutputTuple = new Aggregate(tuple);
}
}
}

// Called when all tuples from current key have been passed to accumulate
public DataBag getValue() {
//Add final current aggregation
outputBag.add(currentAggregation.getTuple());
return outputBag;
}
// This is called after getValue()
// Not sure if these commands are necessary as they are repeated in beginning of accumulate
public void cleanup() {
outputBag = null;
currentAggregation = null;
}

public DataBag exec(Tuple input) throws IOException {
// Same as above ^^ but this doesn't appear to ever be called.
}

public Schema outputSchema(Schema input) {
try {
return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
} catch {FrontendException e) {
e.printStackTrace();
return null;
}
}

class Aggregate {
...
public Tuple getTuple() {
Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
try {
output.set(0, val);
...
} catch (ExecException e) {
e.printStackTrace();
return null;
}
}
...
}
}

最佳答案

你应该递增 spillCount每次附加到 outputBag ,不是每次你从迭代器得到一个元组。只有当 spillCount 是 1000 的倍数并且不满足您的 if 条件时,您才会溢出,这可能不会经常发生(取决于逻辑)。这可以解释为什么您看不到不同溢出阈值有太大差异。

如果这不能解决您的问题,我会尝试扩展 AccumulatorEvalFunc<DataBag> .在您的情况下,您实际上不需要访问整个包。您的实现适合累加器样式实现,因为您只需要访问当前元组。这可能会减少内存使用量。本质上,您将拥有一个 DataBag 类型的实例变量,用于累积最终输出。您还将有一个实例变量 aggregatedOutput那将有当前的聚合。调用 accumulate()将 1) 更新当前聚合,或 2) 将当前聚合添加到 aggregatedOutput并开始一个新的聚合。这基本上跟在您的 for 循环主体之后。

关于java - 鉴于我正在将 DataBag 溢出到磁盘,为什么这个 Pig UDF 会导致 "Error: Java heap space"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21567307/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com