gpt4 book ai didi

java - 为什么调用Kafka的seekToBeginning和seekToEnd API永远挂起?

转载 作者:行者123 更新时间:2023-11-30 02:40:09 26 4
gpt4 key购买 nike

我的环境:

  • Kafka 版本:kafka_2.10-0.10.0.0
  • Kafka Java API 版本(客户端):kafka-clients-0.10.0.0

我的配置:

  • 主题:event_notification
  • 分区:20
  • 客户端消费者线程:1
  • 消费者组 ID:event_cg01
  • 自动提交标志:false

根据要求,在我的应用程序启动期间,我必须根据标志将偏移量设置为开始结束。为此,我使用以下代码:

final List<PartitionInfo> partitionsInfos = kafkaConsumer.partitionsFor(this.topic);

final List<TopicPartition> assignedPartitions = FluentIterable
.from(partitionsInfos)
.filter(Predicates.notNull())
.transform(new Function<PartitionInfo, TopicPartition>() {
@Override
public TopicPartition apply(final PartitionInfo input) {
return new TopicPartition(topic, input.partition());
}
}).toList();

switch (listenMode) {
case OLDEST:
kafkaConsumer.seekToBeginning(assignedPartitions);
break;
case LATEST:
kafkaConsumer.seekToEnd(assignedPartitions);
break;
default:
break;
}

此代码未按预期工作。它永远卡在 seekToBeginningseekToEnd 调用上。

我错过了什么吗?

最佳答案

在使用 seek() 之前,您首先需要 subscribe() 某个主题或 assign() 主题的分区消费者。请注意,subscribe()assign() 是惰性调用,因此,您还需要对 进行“虚拟调用” >poll(),然后才能使用seek() 或seekToBeginning()seekToEnd()

关于java - 为什么调用Kafka的seekToBeginning和seekToEnd API永远挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41997415/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com