gpt4 book ai didi

java - Kafka CommitFailedException 消费者异常

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:41:35 26 4
gpt4 key购买 nike

创建多个消费者(使用 Kafka 0.9 java API)并启动每个线程后,出现以下异常

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)

然后开始正常消费消息,我想知道是什么导致了这个异常以便修复它。

最佳答案

也尝试调整以下参数:

  • heartbeat.interval.ms - 这告诉 Kafka 在它认为消费者将被视为“死亡”之前等待指定的毫秒数
  • ma​​x.partition.fetch.bytes - 这将限制消费者在轮询时收到的消息量(最多)。

我注意到,如果消费者在心跳超时之前没有提交给 Kafka,就会发生重新平衡。如果提交发生在消息被处理之后,处理它们的时间量将决定这些参数。因此,减少消息数量和增加心跳时间将有助于避免重新平衡。

还要考虑使用更多分区,这样会有更多线程处理您的数据,即使每次轮询的消息更少。

我编写了这个小应用程序来进行测试。希望对您有所帮助。

https://github.com/ajkret/kafka-sample

更新

Kafka 0.10.x 现在提供了一个新参数来控制接收消息的数量: - ma​​x.poll.records - 单次调用 poll() 时返回的最大记录数。

更新

Kafka 提供了一种暂停队列的方法。当队列暂停时,您可以在单独的线程中处理消息,允许您调用 KafkaConsumer.poll() 发送心跳。然后在处理完成后调用KafkaConsumer.resume()。通过这种方式,您可以缓解由于不发送心跳而导致重新平衡的问题。以下是您可以执行的操作的概述:

while(true) {
ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
consumer.commitSync();

consumer.pause();
for(ConsumerRecord record: records) {

Future<Boolean> future = workers.submit(() -> {
// Process
return true;
});


while (true) {
try {
if (future.get(1, TimeUnit.SECONDS) != null) {
break;
}
} catch (java.util.concurrent.TimeoutException e) {
getConsumer().poll(0);
}
}
}

consumer.resume();
}

关于java - Kafka CommitFailedException 消费者异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35658171/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com