作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的环境:
kafka_2.10-0.10.0.0
kafka-clients-0.10.0.0
我的配置:
event_notification
20
1
event_cg01
false
根据要求,在我的应用程序启动期间,我必须根据标志将偏移量设置为开始或结束。为此,我使用以下代码:
final List<PartitionInfo> partitionsInfos = kafkaConsumer.partitionsFor(this.topic);
final List<TopicPartition> assignedPartitions = FluentIterable
.from(partitionsInfos)
.filter(Predicates.notNull())
.transform(new Function<PartitionInfo, TopicPartition>() {
@Override
public TopicPartition apply(final PartitionInfo input) {
return new TopicPartition(topic, input.partition());
}
}).toList();
switch (listenMode) {
case OLDEST:
kafkaConsumer.seekToBeginning(assignedPartitions);
break;
case LATEST:
kafkaConsumer.seekToEnd(assignedPartitions);
break;
default:
break;
}
此代码未按预期工作。它永远卡在 seekToBeginning
和 seekToEnd
调用上。
我错过了什么吗?
最佳答案
在使用 seek()
之前,您首先需要 subscribe()
某个主题或 assign()
主题的分区消费者。请注意,subscribe()
和 assign()
是惰性调用,因此,您还需要对 进行“虚拟调用” >poll()
,然后才能使用seek() 或seekToBeginning()
或seekToEnd()
。
关于java - 为什么调用Kafka的seekToBeginning和seekToEnd API永远挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41997415/
当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时,我想查找所有分区的末尾,即使在代理端存储了偏移量。如果更多的消费者正在为同一个消费者组连接,他们应该获取最新存储的偏移量。我正在做以下
我是一名优秀的程序员,十分优秀!