gpt4 book ai didi

error-handling - 如何在Spring Kafka异常处理程序中获取原始记录

转载 作者:行者123 更新时间:2023-12-03 08:38:59 28 4
gpt4 key购买 nike

我正在为spring kafka项目实现异常处理,我有自己的DeadLetterPublishingRecoverer,它可以处理Kafka侦听器中发生的异常,整个流程是完美的,即当我在逻辑中抛出任何异常时,我可以记录并发送给DLQ根据框架实现的主题。消费者记录出现问题,如果我更改了消费者记录值中的任何内容,则相同的消费者记录将被发布到DLQ主题,而更改实际上在我的senairo中是错误的,我想记录原始消息。
...

       @KafkaListener(topics = "${message.topic.name}",containerFactory = "kafkaListenerContainerFactory")  
public void listenGroupFoo(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException, ExecutionException {

System.out.println("Received Message" + consumerRecord.value());

MyEvent consumedEvent=consumerRecord.value();
consumedEvent.setQuantity(1000);

throw new InvalidProductCodeException();

}
...
我发送给主题的实际消息仅包含10个数量,并且我正在更改一些内容,例如将数量更改为1000并引发一些异常(例如处理过程中发生的任何异常情况)
...
  Received MessageOrderObject CreatedDate=Mon Sep 14 19:38:15 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002
...
我抛出错误后,我的记录将是
...
Received MessageOrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002]
Error Record OrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=1000, customerId=0002]
这是我的DLQ子类
...
 @Component
public class DeadLetterSubclass extends DeadLetterPublishingRecoverer {

@Value(value="${error.topic.name}")
static
String errorTopicName;


BusinessUtils utils=new BusinessUtils();
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
MY_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition("stock-order-error-topic", cr.partition());

public DeadLetterSubclass(KafkaOperations<? extends Object, ? extends Object> template) {
super(template,MY_DESTINATION_RESOLVER);
this.setHeadersFunction((consumerRecord,exception)->{
System.out.println("Error Record "+consumerRecord.value());
return consumerRecord.headers();
});

}
...
我想记录发生异常时要发布的原始事件对象(ConsumerRecord)。在我的情况下,我的实际订购数量是10,但是订购的数量是1000,这不是实际订单。

最佳答案

由于您对原始ConsumerRecord有严格的引用,因此,如果您需要在错误处理程序中保持不变,则不应对其进行突变。
该框架无法预期您可能会改变侦听器中的值。如果“以防万一”,那么制作一份副本将会有太多的开销。
您不应该更改原始值-克隆它,然后更改克隆。

关于error-handling - 如何在Spring Kafka异常处理程序中获取原始记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63897535/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com