gpt4 book ai didi

java - 使用Kafka消费者池是否正确?

转载 作者:行者123 更新时间:2023-12-02 01:37:45 28 4
gpt4 key购买 nike

有时,我需要从特定分区读取同一主题的特定偏移量的记录。

我每次都可以创建新的 kafka 消费者。但是,我可以创建消费者池并以这种方式使用它:

List<KafkaConsumer> consumers = new ArrayList<>();

// acquire consumer
KafkaConsumer consumer = consumers.get(0);
TopicPartition topicPartition = new TopicPartition("my-topic", 42);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, 13);

ConsumerRecords records = consumer.poll(0);
// process records
// .....

// release consumer
consumer.unsubscribe();

我应该建立消费者池吗?或者它没有效果,我应该为每次使用创建新的消费者。

最佳答案

您只需要一个消费者。只需取消订阅并将其重新分配给另一个 TopicPartition

String topic = "my-topic";
int partition = 42;
int offset = 13;
boolean running = true;

while(running) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);

ConsumerRecords records = consumer.poll(0);
// process records
// .....

// release consumer
consumer.unsubscribe();
// Change topic, partition, offset as needed
}

关于java - 使用Kafka消费者池是否正确?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54981105/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com