gpt4 book ai didi

java - 类型删除和 Flink : what causes run time error?

转载 作者:行者123 更新时间:2023-12-04 13:23:40 31 4
gpt4 key购买 nike

我有一个抽象类,它的抽象方法创建了一个 SourceFunction ,因此派生类可以返回简单或更复杂的源(例如 KafkaConsumers 等)。 ChangeMe是通过编译 AvroSchema 创建的一个简单的自动生成类。

public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {
FromElementsFunction<ChangeMe> dataSource = null;

List<ChangeMe> changeMeList = Arrays.asList(
ChangeMe.newBuilder().setSomeField("Some field 1").build(),
ChangeMe.newBuilder().setSomeField("Some field 2").build(),
ChangeMe.newBuilder().setSomeField("Some field 3").build()
);
try {
dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);
}
catch (IOException ex){

}

return dataSource;
}

在我的 Flink 工作中,我基本上有这个:
SourceFunction<ChangeMe> source = createSourceFunction(params);
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);


DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream(); // gets sourceDataStream above
changeMeEventsStream.print();

当我运行作业时,我收到有关调用 print() 的错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
……
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

我使用的是 Eclipse 编译器,所以我以为会包含类型信息(虽然我认为这只是用于 lambdas,上面没有)。我需要做什么才能让它正确运行?

最佳答案

如果想直接实例化一个FromElementsFunction ,那么你必须手动提供一个 TypeInformation ChangeMe 的实例调用时的类 addSource .这是 Flink 了解元素类型所必需的。

以下代码片段应该可以解决问题:

SourceFunction<ChangeMe> source = createSourceFunction();

TypeInformation<ChangeMe> typeInfo = TypeInformation.of(ChangeMe.class);
DataStream<ChangeMe> sourceDataStream = env.addSource(source, typeInfo);

DataStream<ChangeMe> changeMeEventsStream = sourceDataStream;
changeMeEventsStream.print();

关于java - 类型删除和 Flink : what causes run time error?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44724251/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com