gpt4 book ai didi

java - 如何跳过 KafkaStreams API 中的 Avro 序列化异常?

转载 作者:行者123 更新时间:2023-12-01 19:30:23 25 4
gpt4 key购买 nike

我有一个由 KafkaStreams Java api 编写的 Kafka 应用程序。它从 Mysql binlog 读取数据并执行一些与我的问题无关的操作。问题是某一特定行在 avro 反序列化过程中产生错误。我可以深入研究 Avro 架构文件并找到问题,但总的来说,我需要的是一个宽容的异常处理程序,在遇到此类错误时不会使整个应用程序停止。这是我的流应用程序的主要部分:

StreamsBuilder streamsBuilder = watchForCourierUpdate(builder);

KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}

private static StreamsBuilder watchForCourierUpdate(StreamsBuilder builder){
CourierUpdateListener courierUpdateListener = new CourierUpdateListener(builder);
courierUpdateListener.start();
return builder;
}

private static Properties configProperties(){

Properties streamProperties = new Properties();

streamProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Configs.getConfig("schemaRegistryUrl"));
streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "courier_app");
streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Configs.getConfig("bootstrapServerUrl"));
streamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamProperties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/state_dir");
streamProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
streamProperties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CourierSerializationException.class);

return streamProperties;

}

这是我的 CourierSerializationException 类:

public class CourierSerializationException implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> producerRecord, Exception e) {
Logger.logError("Failed to de/serialize entity from " + producerRecord.topic() + " topic.\n" + e);
return ProductionExceptionHandlerResponse.CONTINUE;
}

@Override
public void configure(Map<String, ?> map) {

}
}

不过,每当 avro 反序列化异常发生时,流就会关闭并且应用程序不会继续。我是不是错过了什么!

最佳答案

您是否尝试过使用kafka提供的default.deserialization.exception.handler来执行此操作?您可以使用 LogAndContinueExceptionHandler 它将记录并继续。

我可能是错的,但我认为通过实现 ProductionExceptionHandler 创建 CustomException 仅适用于 kafka 端的网络相关错误。

将其添加到属性中,看看会发生什么:

> props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

关于java - 如何跳过 KafkaStreams API 中的 Avro 序列化异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59907421/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com