gpt4 book ai didi

apache-flink - 在 Flink 中解析 JSON 时如何处理异常

转载 作者:行者123 更新时间:2023-12-05 00:48:24 54 4
gpt4 key购买 nike

我正在使用 flink 1.4.2 从 Kafka 读取数据,并使用 JSONDeserializationSchema 将它们解析为 ObjectNode。如果传入记录不是有效的 JSON,那么我的 Flink 作业将失败。我想跳过破记录而不是失败。

FlinkKafkaConsumer010<ObjectNode> kafkaConsumer =
new FlinkKafkaConsumer010<>(TOPIC, new JSONDeserializationSchema(), consumerProperties);
DataStream<ObjectNode> messageStream = env.addSource(kafkaConsumer);
messageStream.print();

如果 Kafka 中的数据不是有效的 JSON,我会收到以下异常。

Job execution switched to status FAILING.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'This': was expecting ('true', 'false' or 'null')
at [Source: [B@4f522623; line: 1, column: 6]
Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

最佳答案

最简单的解决方案是实现自己的 DeserializationSchema 并包装 JSONDeserializationSchema。然后,您可以捕获异常并忽略它或执行自定义操作。

关于apache-flink - 在 Flink 中解析 JSON 时如何处理异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51301549/

54 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com