gpt4 book ai didi

hadoop - Pig : How to send all Tuples to a UDF to be Processed without Grouping them? 或者如何在不分组的情况下将元组转换为包?

转载 作者:可可西里 更新时间:2023-11-01 14:34:09 26 4
gpt4 key购买 nike

这就是我想要做的:

A = LOAD '...' USING PigStorage(',') AS (
col1:int
,col2:chararray
);
B = ORDER A by col2;
C = CUSTOM_UDF(A);

CUSTOM_UDF 遍历需要按顺序排列的元组。 UDF 会为每几个输入元组输出一个聚合元组;即,我不会以 1:1 的方式返回元组。

本质上:

public class CustomUdf extends EvalFunc<Tuple> {
public Tuple exec(Tuple input) throws IOException {
Aggregate aggregatedOutput = null;

DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
....
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
aggregatedOutputTuple = new Aggregate(tuple);
return returnTuple;
}
}
}
// Establish the output Schema as a tuple
public Schema outputSchema(Schema input) {
Schema tupleSchema = new Schema();
...
return new Schema(
new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
tupleSchema,
DataType.TUPLE));
}

/** This inner class is simply a wrapper for the output tuple **/
class Aggregate {
//member variables

public Aggregate(Tuple input) {
//set member variables to value of input's fields
}
public Tuple getTuple() {
Tuple output = TupleFactory.getInstance().newTuple(5);
//set tuple's fields to values of member variables
return output;
}
}
}

我已经能够做类似的事情了

A = LOAD '...' USING PigStorage(',') AS (
col1:int
,col2:chararray
);
B = ORDER A by col2;
C = GROUP B BY col1;
D = FOREACH C {
GENERATE CUSTOM_UDF(B);
}

然而,这似乎并没有保留 ORDER BY,而且我无法弄清楚如何订购 d,因为我不断收到无效的字段投影。

另外,我不需要分组依据(它恰好适用于此用例)并且只想将 B 别名作为元组包发送到 CUSTOM_UDF。

我怎样才能做到这一点?

最佳答案

我认为您对 CustomUdf 的编写方式有疑问。根据您的描述,这听起来应该是 EvalFunc < DataBag >,而不是 EvalFunc < Tuple >。然后在实现中,当您遍历输入包中的所有元组时,您将累积的元组附加到在方法结束时返回的 DataBag 中。

您的 Pig 代码将如下所示。我认为 ORDER BY 不会像您拥有的那样在单独的语句中保留顺序。但是,它会在嵌套的 FOREACH 中保留顺序,如下所示。

A = LOAD '...' USING PigStorage(',') AS (
col1:int
,col2:chararray
);
B = FOREACH (GROUP A ALL) {
A_ordered = ORDER A BY col2;
GENERATE FLATTEN(CUSTOM_UDF(A_ordered));
}

exec 方法看起来像下面的修改版本。请注意我所做的更改。

public DataBag exec(Tuple input) throws IOException { // different return type
Aggregate aggregatedOutput = null;

DataBag result = BagFactory.newDefaultBag(); // change here
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
....
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
aggregatedOutputTuple = new Aggregate(tuple);
result.add(returnTuple); // change here
}
}
return result; // change here
}

关于hadoop - Pig : How to send all Tuples to a UDF to be Processed without Grouping them? 或者如何在不分组的情况下将元组转换为包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21445730/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com