gpt4 book ai didi

java - 使用Flink时Kafka中json数据不清楚如何反序列化

转载 作者:行者123 更新时间:2023-12-02 10:57:00 26 4
gpt4 key购买 nike

我想通过Flink计算Kafka中的数据,但问题是Kafka中的JASON数据可能会发生变化。

像这样:

{"data":{"template":25,"name":"name"}}

或者这个:

{"data"{"type":"type1","props":"props","strem":"stream1"}

而且我无法提前知道这个JSON中包含了多少数据。所以在使用Flink时存在一个问题:

streamExecutionEnvironment.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new KafkaEventSchema(),
kafkaProps))
.flatMap(new KafkaEventToRow()).returns(getReturnType());

那么当Json数据是这样的时候,如何定义pojo类型和mapFuncation呢?

最佳答案

您必须定义一个更通用的反序列化架构,例如 Map

定义架构

class CustomSchema implements DeserializationSchema {

private ObjectMapper mapper = new ObjectMapper();

@Override
public Map<String,Object> deserialize(byte[] bytes) throws IOException {
Map<String,Object> t = null;
t = mapper.readValue(bytes, Map.class);
return t;
}

@Override
public boolean isEndOfStream(Object o) {
return false;
}

@Override
public TypeInformation<Map> getProducedType() {
return TypeInformation.of(new TypeHint<Map>() {
});
}
}

现在使用它作为架构

streamExecutionEnvironment
.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new CustomSchema(),......

现在你得到一个通用的 Map,它可以包含任何数据结构

关于java - 使用Flink时Kafka中json数据不清楚如何反序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51648705/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com