gpt4 book ai didi

java - Spring Cloud Stream 手动轮询器 Kafka

转载 作者:行者123 更新时间:2023-12-02 01:05:46 25 4
gpt4 key购买 nike

我不想使用@KafkaListener或@StreamListener,但我想手动轮询kafka。我正在使用 spring-cloud-starter-stream-kafka 库,并且我有以下 Kafka Producer

  @Autowired
private KafkaTemplate<byte[], byte[]> template;

public void sendMessages() {
IntStream.range(2)
.forEach(val -> {
template.send("kafka-topic", "hello".getBytes());
});
}

我想使用 spring-kafka 手动轮询相同的 kafka 主题。我尝试了以下消费者

 @Autowired
private ConsumerFactory consumerFactory;

public void processKafkaRecords() throws InterruptedException {
Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
consumer.subscribe(Arrays.asList("kafka-topic"));
ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
poll.forEach(record -> {
log.info("record {}", record);
});
}

应用程序属性

spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true

spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false

但是,消费者永远不会收到生产者发送的任何记录。有什么想法如何手动轮询 kafka 主题吗?

最佳答案

可能有以下几个原因:

  1. Duration.ofMillis(1000) - 尝试增加时间,在某些情况下 1 秒可能太短,除非您的客户端和 kafka 都在同一台计算机上运行。因为poll(Duration)的文档说如果超时,将返回一个空记录集
  2. 如果您先启动了生产者,然后启动了消费者,并且没有将偏移重置策略设置为最早,那么您将看不到任何消息,因为默认情况下消费者将从最新的偏移开始消费。因此,尝试设置以下 auto.offset.reset=earliest
  3. 来自同一消费者组的另一个消费者可能正在运行,并且只有 1 个分区,或者消费者组已经位于最后一个偏移处。这种情况,您可以尝试更改消费者组id。

关于java - Spring Cloud Stream 手动轮询器 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60083681/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com