gpt4 book ai didi

java - Flink DataStream如何将自定义的POJO合并到另一个DataStream中

转载 作者:行者123 更新时间:2023-12-01 18:04:53 25 4
gpt4 key购买 nike

我想将 DataStream 转换为带有架构信息的 DataStream

输入

args[0] 数据流

{"fields":["China","Beijing"]}

args[1] 架构

message spark_schema {
optional binary country (UTF8);
optional binary city (UTF8);
}

预期输出

{"country":"china", "city":"beijing"}

我的代码是这样的

public DataStream<String> convert(DataStream source, MessageType messageType) {

SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
JSONObject data = new JSONObject();
this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
for (int i = 0; i < fields.size(); i++) {
data.put(fields.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}

异常错误

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)

但是下面的代码工作正常

public DataStream<String> convert(DataStream source, MessageType messageType) {
if (this.fields == null) {
throw new RuntimeException("The schema of AbstractRowStreamReader is null");
}

List<String> field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
JSONObject data = new JSONObject();
for (int i = 0; i < field.size(); i++) {
data.put(field.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}

Flink的map算子如何组合外部复杂的POJO?

最佳答案

为了让 Flink 跨任务分发代码,代码需要完整 Serializable 。在你的第一个例子中,它不是;第二个是。特别是Type::getName将生成一个不是 Serializable 的 lambda .

获得一个 lambda Serializable ,您需要将其显式转换为可序列化接口(interface)(例如 Flink MapFunction )或将其与 (Serializable & Function) 一起使用

由于第二个也可以节省计算量,因此无论如何它都会更好。 Convert 在作业编译期间只会执行一次,而 DataStream#map为每条记录调用。如果不清楚,我建议在 IDE 中执行它并使用断点。

关于java - Flink DataStream如何将自定义的POJO合并到另一个DataStream中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60573782/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com