gpt4 book ai didi

spring-kafka - 重新排队kafka主题中的失败记录

转载 作者:行者123 更新时间:2023-12-05 02:48:10 24 4
gpt4 key购买 nike

我有一个用例,记录要保存在表中,该表本身有外键。

例子:

z对象{uid,姓名, parent 身份

parent uid 也存在于同一个表中,任何具有不存在的 parentuid 的对象将无法持久化。

有时记录放在主题中的方式使得依赖关系不在列表的头部,而是在依赖记录存在之后

这将导致处理记录失败。我使用了 seektocurrenterrorhandler,它实际上会为给定的退避重试相同的失败记录,但由于不满足依赖关系而失败。

有什么方法可以让我在主题末尾重新排队记录以满足依赖性?如果即使在入队后仍然失败了 5 天,记录可以被推送到 DLT。

谢谢,拉贾塞卡

最佳答案

没有内置任何东西;但是,您可以使用 DeadLetterPublishingRecoverer 中的自定义目标解析器,根据失败记录中的 header 来确定要发布到哪个主题。

参见 https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#dead-letters

编辑

@SpringBootApplication
public class So64646996Application {

public static void main(String[] args) {
SpringApplication.run(So64646996Application.class, args);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("so64646996").partitions(1).replicas(1).build();
}

@Bean
public NewTopic dlt() {
return TopicBuilder.name("so64646996.DLT").partitions(1).replicas(1).build();
}

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("so64646996.DLT", rec.partition())
: new TopicPartition("so64646996", rec.partition());
}), new FixedBackOff(0L, 0L));
}


@KafkaListener(id = "so64646996", topics = "so64646996")
public void listen(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {

System.out.println(in + "@" + offset + ":" + retry[0]);
throw new IllegalStateException();
}

@KafkaListener(id = "so64646996.DLT", topics = "so64646996.DLT")
public void listenDLT(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {

System.out.println("DLT: " + in + "@" + offset + ":" + retry[0]);
}


@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> System.out.println(template.send("so64646996", "foo").get(10, TimeUnit.SECONDS)
.getRecordMetadata());
}

}

关于spring-kafka - 重新排队kafka主题中的失败记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64646996/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com