gpt4 book ai didi

java - 如何消费多个主题的消息?

转载 作者:行者123 更新时间:2023-12-02 09:46:09 24 4
gpt4 key购买 nike

我正在尝试使用 assign() 方法使用来自多个主题的消息。通过我的实现,有时我能够使用来自所有主题的消息,有时我只能使用一个主题的消息。经过一番研究,我发现 Kafka 默认使用 Range 分配器。因此它不会总是分配所有分区。

对于我的用例,我应该能够从所有主题和分区进行消费。

我尝试过设置 RoundRobin 分配器。但这并没有帮助

List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics) {
topicPartitions.add(new TopicPartition(topic, 0);
}
kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`

最佳答案

KafkaConsumer.assign 通常用于复杂的用例,在这些用例中,您不仅想要控制主题,还想要控制您使用的分区。如果您只想从多个主题(及其所有分区)进行消费,您应该使用 KafkaConsumer.subscribe。

consumer.subscribe(Arrays.asList("topic1", "topic2"));

查看 javadoc javadoc其中还显示了代码示例。

编辑:如果您需要控制分区分配,那么您确实需要使用 allocate() 方法,但在您的(不完整)代码示例中,看起来您分配了每个主题的分区 0 ;因此您将只使用来自分区 0 的消息。

如果您需要手动控制偏移量,您仍然可以使用订阅,但您可以禁用自动提交并使用seek()和commitSync()或commitAsync()来控制偏移量。

关于java - 如何消费多个主题的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56634416/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com