gpt4 book ai didi

java - 卡夫卡流-TimeoutException : Expiring * record(s) for TOPIC:* ms has passed since batch creation

转载 作者:行者123 更新时间:2023-12-01 19:14:38 27 4
gpt4 key购买 nike

流应用程序在生产中推出,10 天后,在 CustomProductionExceptionHandler 中观察到属于旧日窗口的过期事务的错误/警告。

流程:

输入主题 --> 流媒体应用程序(生成统计数据并在日窗口关闭后发出)--> 输出主题

生产者不断尝试将记录发布到已在旧窗口中过期的 OUTPUT 主题,并将错误记录到 CustomProductionExceptionHandler 中。

我减少了批量大小并保留默认值,但此更改尚未推广到生产。

CustomProductionExceptionHandler实现:避免流由于NeworkException、TimeOutException而死掉。

使用此实现,生产者不会重试,如果出现任何异常,它会继续..在返回失败时的另一侧..流线程死亡并且不会自动重新启动..需要建议..

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
String recordKey = new String(record.key());
String recordVal = new String(record.value());
String recordTopic = record.topic();
logger.error("Kafka message marked as processed although it failed. Message: [{}:{}], destination topic: [{}]", recordKey,recordVal,recordTopic,exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
}

异常(exception):

2019-12-20 16:31:37.576 ERROR com.jpmc.gpg.exception.CustomProductionExceptionHandler.handle(CustomProductionExceptionHandler.java:19) kafka-producer-network-thread | profile-day-summary-generator-291e69b1-5a3d-4d49-8797-252c2ae05607-StreamThread-19-producerid - Kafka message marked as processed although it failed. Message: [{"statistics":{}], destination topic: [OUTPUT-TOPIC]

org.apache.kafka.common.errors.TimeoutException: Expiring * record(s) for TOPIC:1086149 ms has passed since batch creation

尝试获得以下问题的答案。

1) 为什么生产者试图将较旧的交易发布到已关闭的日期窗口的 OUTPUT 主题?

示例 - 生产者尝试发送 12/09 天窗口交易,但当前打开的窗口是 12/20

2) 如果没有 CustomProductionExceptionHandler,流线程可能会被终止 --> ProductionExceptionHandlerResponse.CONTINUE。 我们是否有任何方法可以让 Producer 在出现 NetworkException 或 TimeoutException 的情况下进行重试? 然后继续而不是流线程死掉? 在中指定 ProductionExceptionHandlerResponse.CONTINUE 的问题 CustomProductionExceptionHandler 是 - 如果出现任何异常,它会跳过 该记录发布到输出主题并继续下一个记录。 没有弹性。

最佳答案

1) 如果不知道你的程序是做什么的,就不可能回答这个问题。请注意,一般来说,Kafka Streams 在事件时间上工作并处理无序数据。

2) 您可以通过在传入的 Properties 中指定相应的客户端配置来配置 Kafka Streams 应用程序的所有内部使用的客户端(即消费者、生产者、管理客户端和恢复消费者) KafkaStreams。如果您希望为不同的客户端使用不同的配置,则可以为它们添加相应的前缀,即 Producer.retries 而不是 retries 。查看文档了解更多详细信息:https://docs.confluent.io/current/streams/developer-guide/config-streams.html#ak-consumers-producer-and-admin-client-configuration-parameters

关于java - 卡夫卡流-TimeoutException : Expiring * record(s) for TOPIC:* ms has passed since batch creation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59432031/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com