gpt4 book ai didi

java - KafkaConsumer 0.10 Java API 错误信息 : No current assignment for partition

转载 作者:太空狗 更新时间:2023-10-29 22:43:09 26 4
gpt4 key购买 nike

我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量中使用。我查了一下,发现有一个seek方法,但是它抛出了一个异常。有人有类似的用例或解决方案吗?

代码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

异常

java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)

最佳答案

在您可以seek()之前,您首先需要subscribe()一个主题 assign() 将主题划分给消费者。还要记住,subscribe()assign() 是惰性的——因此,您还需要对 poll() 进行“虚拟调用” ,然后才能使用 seek()

Note: as of Kafka 2.0, the new poll(Duration timeout) is async and it's not guaranteed that you have a complete assignment when poll returns. Thus, you might need to check your assignment before using seek() and also poll again to refresh the assignment. (Cf. KIP-266 for details)

如果你使用subscribe(),你使用组管理:因此,你可以使用相同的group.id启动多个消费者,并且主题的所有分区都将被自动平均分配给组内的所有消费者(每个分区将分配给组中的单个消费者)。

如果要读取特定分区,需要通过assign()手动赋值。这使您可以执行任何您想要的任务。

顺便说一句:KafkaConsumer 有一个非常长的详细类 JavaDoc,包括示例。值得一读。

关于java - KafkaConsumer 0.10 Java API 错误信息 : No current assignment for partition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41008610/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com