gpt4 book ai didi

java - 如何在 Flink 中用 Java 将 AvroFile 读入 Tuple 类

转载 作者:行者123 更新时间:2023-12-02 01:44:57 25 4
gpt4 key购买 nike

我正在尝试读取 Avro 文件并对其执行一些操作,一切正常,但聚合函数,当我使用它们时,会出现以下异常:

aggregating on field positions is only possible on tuple data types

然后我更改我的类以实现 Tuple4 (因为我有 4 个字段),但是当我想收集结果时,会得到 AvroTypeException Unknown Type : T0

这是我的数据和工作类别:

public class Nation{

public Integer N_NATIONKEY;
public String N_NAME;
public Integer N_REGIONKEY;
public String N_COMMENT;

public Integer getN_NATIONKEY() {
return N_NATIONKEY;
}

public void setN_NATIONKEY(Integer n_NATIONKEY) {
N_NATIONKEY = n_NATIONKEY;
}

public String getN_NAME() {
return N_NAME;
}

public void setN_NAME(String n_NAME) {
N_NAME = n_NAME;
}

public Integer getN_REGIONKEY() {
return N_REGIONKEY;
}

public void setN_REGIONKEY(Integer n_REGIONKEY) {
N_REGIONKEY = n_REGIONKEY;
}

public String getN_COMMENT() {
return N_COMMENT;
}

public void setN_COMMENT(String n_COMMENT) {
N_COMMENT = n_COMMENT;
}
public Nation() {
}


public static void main(String[] args) throws Exception {
Configuration parameters = new Configuration();

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


Path path2 = new Path("/Users/violet/Desktop/nation.avro");

AvroInputFormat<Nation> format = new AvroInputFormat<Nation>(path2,Nation.class);
format.configure(parameters);
DataSet<Nation> nation = env.createInput(format);
nation.aggregate(Aggregations.SUM,0);


JobExecutionResult res = env.execute();
}

这是元组类以及与上面相同的作业代码:

public class NationTuple extends Tuple4<Integer,String,Integer,String> {

Integer N_NATIONKEY(){ return this.f0;}
String N_NAME(){return this.f1;}
Integer N_REGIONKEY(){ return this.f2;}
String N_COMMENT(){ return this.f3;}

}

我尝试使用此类并得到 TypeException(到处使用 NationTuple 而不是 Nation)

最佳答案

我不认为让你的类实现 Tuple4 是正确的方法。相反,您应该向您的拓扑添加一个 MapFunction,将您的 NationTuple 转换为 Tuple4。

static Tuple4<Integer, String, Integer, String> toTuple(Nation nation) {
return Tuple4.of(nation.N_NATIONKEY, ...);
}

然后在您的拓扑调用中:

inputData.map(p -> toTuple(p)).returns(new TypeHint<Tuple4<Integer, String, Integer, String>(){});

唯一微妙的部分是您需要提供类型提示,以便 flink 可以确定您的函数返回哪种类型的元组。

另一个解决方案是在进行聚合时使用字段名称而不是元组字段索引。例如:

groupBy("N_NATIONKEY", "N_REGIONKEY")

这一切都在这里解释:https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#specifying-keys

关于java - 如何在 Flink 中用 Java 将 AvroFile 读入 Tuple 类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53829527/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com