gpt4 book ai didi

apache-kafka - kafka 异步提交请求失败

转载 作者:行者123 更新时间:2023-12-05 07:25:04 29 4
gpt4 key购买 nike

我观察到,kafka 消费者延迟在运行数小时/数天后突然开始增加。

检查日志后,我看到了很多异常:

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.

我的 ConsumerThread 类:

public class ConsumerThread implements Runnable {
private final KafkaConsumer<String, Map<String, Object>> consumer;
public ConsumerThread(
this.consumer = new KafkaConsumer<>(getConsumerConfig(kafkaConfiguration));
}

@Override
public void run() {
try {
consumer.subscribe(topicList);

while (true) {
ConsumerRecords<String, Map<String, Object>> records =
consumer.poll(Duration.ofMillis(kafkaConfiguration.getPollIntervalMs()));

long startPerPoll = System.nanoTime();
for (final ConsumerRecord<String, Map<String, Object>> record : records) {
// message processing logic here
}


consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
//log.error(exception.getMessage());
log.info("exception while committing offset, consumerThread: {}, exception: {}", Thread.currentThread().getName(), exception);
exception.printStackTrace();
}
});


}
} catch (Exception e) {
// ignore for shutdown
log.info("exception in run for consumerThread: {}", e);
} finally {
try {
if (Objects.nonNull(consumer)) {
consumer.commitSync();
}
} finally {
if (Objects.nonNull(consumer)) {
consumer.close();
}
}
}
}

我的kafka配置:

groupId: cep-cg
autoCommitEnabled: false
sessionTimeoutMs: 30000
heartBeatIntervalMs: 10000
autoOffsetReset: latest
maxPollRecord: 250
maxPollIntervalMs: 180000
requestTimeoutMs: 240000
pollIntervalMs: 3000

我检查了 stackoverflow 上的其他答案并做了一些调整,但它们似乎都不起作用。

我想知道的是:

  1. 关于滞后为何突然增加的任何线索?

  2. 是否有可能有很多 commitAsync 请求在代理上挂起并且可能在某个时间(由代理上的某些配置定义)commitAsync 请求开始失败?

  3. 假设 consumerThread 处理消息花费的时间超过 max.poll.interval.ms。在这种情况下,它将被踢出组并触发重新平衡。现在所有在代理上挂起的 commitAsync 请求都因 CommitFailedException 而失败,因为该分区现在属于组中的其他某个消费者。在上面的代码中,消费者会跳出无限循环,永远关闭。这是正确的方法吗?或者我应该捕获 CommitFailedException 并再次恢复循环以保持消费者存活?

最佳答案

想象一下,我们发送了一个提交偏移量 2000 的请求。存在暂时的通信问题,所以代理永远不会收到请求,因此永远不会响应。同时,我们处理了另一批并成功提交了偏移量 3000。现在重试先前批处理失败的提交,并且在异常中,它显示相同的消息。在重新平衡的情况下,这将导致更多重复

A.延迟时间在增加

由于消费者不再不断地消费记录,而生产者不断地生产记录,因此重新平衡发生得更加频繁。

B. commitAsync 请求超时

只允许组中的活跃成员提交偏移量。如果消费者在尝试提交偏移量时被踢出组,它将抛出 CommitFailedException

c.再平衡

当重新平衡开始时,消费者必须在 session 超时到期之前完成它当前正在执行的任何处理、提交偏移量并重新加入组。

我们应该实现 consumerRebalanceListener 并使用 onPartitionsRevoked() 在失去分区所有权之前提交偏移量以提交当前偏移量。

ma​​x.poll.interval.msma​​x.poll.records 设置为相当低的值,同时保持 session.timeout.ms 较低,以便故障检测时间不会不需要牺牲。

从 commitSync() 抛出的 CommitFailedException。这保证了只允许该组的活跃成员提交偏移量。如果消费者被踢出组,那么它的分区将被分配给另一个成员,该成员将提交自己的偏移量。

关于apache-kafka - kafka 异步提交请求失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55064077/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com