作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我不想使用@KafkaListener或@StreamListener,但我想手动轮询kafka。我正在使用 spring-cloud-starter-stream-kafka 库,并且我有以下 Kafka Producer
@Autowired
private KafkaTemplate<byte[], byte[]> template;
public void sendMessages() {
IntStream.range(2)
.forEach(val -> {
template.send("kafka-topic", "hello".getBytes());
});
}
我想使用 spring-kafka 手动轮询相同的 kafka 主题。我尝试了以下消费者
@Autowired
private ConsumerFactory consumerFactory;
public void processKafkaRecords() throws InterruptedException {
Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
consumer.subscribe(Arrays.asList("kafka-topic"));
ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
poll.forEach(record -> {
log.info("record {}", record);
});
}
应用程序属性
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true
spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false
但是,消费者永远不会收到生产者发送的任何记录。有什么想法如何手动轮询 kafka 主题吗?
最佳答案
可能有以下几个原因:
Duration.ofMillis(1000)
- 尝试增加时间,在某些情况下 1 秒可能太短,除非您的客户端和 kafka 都在同一台计算机上运行。因为poll(Duration)
的文档说如果超时,将返回一个空记录集auto.offset.reset=earliest
关于java - Spring Cloud Stream 手动轮询器 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60083681/
我是一名优秀的程序员,十分优秀!