gpt4 book ai didi

java - Kafka 消费者分配返回空集

转载 作者:行者123 更新时间:2023-12-03 21:20:40 27 4
gpt4 key购买 nike

我已经订阅了一个 Kafka 主题,如下所示。只有在为消费者分配了分区后,我才需要运行一些逻辑。

然而consumer.assignment()无论我等多久,都会以空集的形式返回。如果我没有 while 循环,然后执行 consumer.poll()我确实从该主题中获得了记录。有人可以告诉我为什么会这样吗?

    consumer.subscribe(topics);
Set<TopicPartition> assigned=Collections.emptySet();
while(isAssigned) {
assigned = consumer.assignment();
if(!assigned.isEmpty()) {
isAssigned= false;
}
}

//consumer props
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");

最佳答案

直到您调用 poll() ,消费者只是在闲置。

仅在 poll() 之后被调用时,它将启动与集群的连接,获取分配的分区并尝试获取消息。

这在 Javadoc 中有所提及:

After subscribing to a set of topics, the consumer will automatically join the group when poll(Duration) is invoked.



一旦您开始调用 poll() , 你可以使用 assignment()甚至注册一个 ConsumerRebalanceListener 在您的消费者实例分配或撤销分区时收到通知。

关于java - Kafka 消费者分配返回空集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54316951/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com