gpt4 book ai didi

exception - 如何跳过在 Kafka 中产生运行时异常的记录并保持流运行?

转载 作者:行者123 更新时间:2023-12-05 04:36:14 25 4
gpt4 key购买 nike

我已经实现了kafka流应用。假设流当前正在处理的对象字段之一包含数字而不是字符串值。当前,当处理逻辑中抛出异常时,例如。 .transform() 方法,整个流被终止,我的应用程序停止处理数据。

我想跳过这样的无效记录并继续处理输入主题上可用的下一条记录。 此外,我不想在我的流处理代码中实现任何 try-catch 语句。

为了实现这一点,我实现了 StreamsUncaughtExceptionHandler,因此它返回 StreamThreadExceptionResponse.REPLACE_THREAD 枚举,以便生成新线程并继续处理等待输入主题的下一条记录。然而,事实证明流消费者偏移量没有提交,当新线程启动时,它会获取刚刚杀死前一个流线程的旧记录......由于逻辑相同,新线程也将无法处理错误记录并再次失败。某种产生新线程并每次都在同一条记录上失败的循环。

是否有任何干净的方法来跳过失败的记录并保持流处理下一个记录?

请注意,我不是在询问 DeserializationExceptionHandlerProductionExceptionHandler

最佳答案

当谈到应用程序级代码时,主要取决于应用程序如何处理异常。这个用例以前出现过。请参阅这些以前的 Stack Overflow 线程。

Example on handling processing exception in Spring Cloud Streams with Kafka Streams Binder and the functional style processor

How to stop sending to kafka topic when control goes to catch block Functional kafka spring

尝试看看这些答案是否适用于您的场景。

关于exception - 如何跳过在 Kafka 中产生运行时异常的记录并保持流运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70883014/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com