gpt4 book ai didi

java - 反序列化异常后继续在reactor kafka中消费后续记录

转载 作者:行者123 更新时间:2023-12-04 04:01:40 24 4
gpt4 key购买 nike

我正在使用 reactor kafka 并有一个自定义的 AvroDeserializer 类来反序列化消息。

现在我有一个案例,对于某些有效负载,反序列化类抛出异常。我的 Kafka 监听器在尝试读取此类记录时立即死亡。我尝试使用 onErrorReturn 并使用(doOnErroronErrorContinue)的组合来处理此异常,但是,它有助于记录异常,但无法使用后续记录。

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
public T deserialize( String topic, byte[] data ) {
try {
//logic to deserialize
}
catch {
// throw new SerializationException("error deserializing" );
}
}
}

在听众端,我试图这样处理 ->

@EventListener(ApplicationStartedEvent.class)

public class listener() {
KakfaReceiver<String, Object> receiver; // kafka listener
receiver.receive()
.delayUntil(do something)//logic to update the record to db
.doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
.onErrorContinue((throwable, o) -> log.info("continuing"))
.doOnNext(r->r.receiverOffset().acknowledge())
.subscribe()

}

一种选择是不从 Deserializer 类中抛出异常,但我想在单独的系统中记录此类异常以供分析,因此希望在 kafka 监听器端处理此类记录。关于如何实现这一点有什么想法吗?

最佳答案

看起来像评论中关于使用的建议

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

在大多数情况下都有效。但是我想不出让它在 Reactor Spring Kafka 中工作的方法。现在,我继续采用不从反序列化器抛出异常并添加逻辑将其记录到那里的方法,这解决了 Kafka 消费者无法在毒记录上消费后续记录的问题

关于java - 反序列化异常后继续在reactor kafka中消费后续记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62961279/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com