gpt4 book ai didi

java - 为什么多 Kafka Consumer 对象让它总是重新平衡,并且不起作用

转载 作者:行者123 更新时间:2023-11-30 06:06:22 25 4
gpt4 key购买 nike

以前我做出了错误的决定,即在一个线程(同一个group id)中操作多个消费者,但当时没有对应的使用场景,所以没有揭示这个问题。

现在,我意识到当我有多个消费者时,我的程序会出错,查询相应的信息,加上我自己的测试,得到了这三种场景。

  1. 同一个线程内,多个具有相同group id的consumer无法正常工作。
  2. 在一个线程内,多个具有不同组id的消费者可以正常工作。
  3. 单CPU运行2的程序,运行正常。

我想我知道怎么改,但是不知道为什么,kafka消费者在线程中存储什么样的东西会导致这种情况。

我的代码是这样的

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shoothzj.group");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return properties;



private Map<String, KafkaConsumer<String, T>> consumerMap = new HashMap<>();


for (KafkaConsumer<String, T> consumer :consumerMap.values()) {
consumer.poll(1000);
//handle it
}

最佳答案

同一线程中不能有多个具有相同组 ID 的消费者。以下是Kafka:权威指南

一书的摘录

You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the consumer logic in its own object and then use Java’s ExecutorService to start multiple threads each with its own consumer. The Confluent blog has a tutorial that shows how to do just that.

我们在一项设计中遇到了类似的问题,并通过惨痛的教训吸取了教训。如果我没记错的话根本原因是这样的。假设同一个线程中有两个消费者。如果第一次轮询后的处理需要很长时间,则第二个消费者可能无法将心跳发送到组协调器,因为心跳的发送发生在轮询中( ) 方法,这会触发重新平衡,这会导致类似死锁的情况。因此,您的两个消费者都会陷入困境,等待重新平衡完成。

关于java - 为什么多 Kafka Consumer 对象让它总是重新平衡,并且不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51231381/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com