gpt4 book ai didi

java - Apache 弗林克 : How do I use a stream of Java Map (or Map containing DTOs)?

转载 作者:行者123 更新时间:2023-12-01 23:58:08 29 4
gpt4 key购买 nike

我正在使用 Flink,并且有一个 JSON 字符串流到达我的系统,其中包含动态更改的字段和嵌套字段。所以我不能模拟这个传入的 JSON 并将其转换为静态 POJO,我必须依赖于 Map。

我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将 map 包装在名为 Data 的 DTO 中。

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

在此转换处理之后,我尝试将生成的流输入到我的自定义 Flink 接收器中时,就会出现问题。调用函数不会在接收器中被调用。但是,如果我将此包含 DTO 的 Map 更改为不带 Map 的原始或常规 DTO,则接收器可以工作。

我的 DTO 如下所示:

public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal

// getters and setters
// constructors, empty and with fields

我尝试了以下两种解决方案:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

在这种情况下我可以使用任何专家建议吗?

最佳答案

我能够解决这个问题。在我的 Flink 日志中,我看到没有找到一个名为 ReflectionSerializerFactory 类的 Kryo 文件。我在 maven 中更新了 Kryo 版本,并为我的 map 使用了 Flink 文档说 Flink 支持的 Map 类型。

只需确保在代码中指定泛型类型,并在 map 的 POJO 中添加 getter 和 setter。

我还使用 .returns(xyz.class) 类型减速来避免类型删除的影响。

关于java - Apache 弗林克 : How do I use a stream of Java Map (or Map containing DTOs)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58194096/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com