gpt4 book ai didi

apache-flink - Flink 中是否弃用了 JSONDeserializationSchema()?

转载 作者:行者123 更新时间:2023-12-04 12:38:19 25 4
gpt4 key购买 nike

我是 Flink 的新手,正在做与以下链接非常相​​似的事情。

Cannot see message while sinking kafka stream and cannot see print message in flink 1.2

我还尝试添加 JSONDeserializationSchema() 作为我的 Kafka 输入 JSON 消息的反序列化器,该消息没有 key 。

但我发现 JSONDeserializationSchema() 不存在。

如果我做错了什么,请告诉我。

enter image description here

最佳答案

JSONDeserializationSchema在 Flink 1.8 中被删除,之前被弃用。

推荐的方法是编写一个实现 DeserializationSchema<T> 的解串器。 .这是我从 Flink Operations Playground 复制的一个例子:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
* A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
*
*/
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {

private static final long serialVersionUID = 1L;

private static final ObjectMapper objectMapper = new ObjectMapper();

@Override
public ClickEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, ClickEvent.class);
}

@Override
public boolean isEndOfStream(ClickEvent nextElement) {
return false;
}

@Override
public TypeInformation<ClickEvent> getProducedType() {
return TypeInformation.of(ClickEvent.class);
}
}

对于 Kafka 生产者,您需要实现 KafkaSerializationSchema<T> ,你会在同一个项目中找到这样的例子。

关于apache-flink - Flink 中是否弃用了 JSONDeserializationSchema()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62067772/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com