gpt4 book ai didi

java - Kafka消费者未加入自定义groupId

转载 作者:行者123 更新时间:2023-12-02 12:17:44 24 4
gpt4 key购买 nike

我根据Spring Kafka文档设置了Kafka ConsumerFactory。然而groupId似乎没有被使用。也许我也把整个事情弄错了,所以我想让你知道我的经历。

这是我的配置,似乎不起作用:

@Bean
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
getConsumerProperties(),
new StringDeserializer(),
new JsonDeserializer<>(KafkaEvent.class));
}

Map<String, Object> getConsumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);


props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);

return props;
}

我有一个像这样配置的@KafkaEventListener,而无需再次显式指定groupId:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC)
public class KafkaEventListener {

@Autowired
private ConsumerFactory<String, KafkaEvent> consumerFactory;

@KafkaHandler
public void listenTo(@Payload KafkaEvent event) {
LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString());
}

}

我还可以看到我的 groupId“myGroupId”包含在上面记录的错误日志中。然而,让我怀疑的是一些 ConsumerCoordinator 的 DEBUG 日志记录,它总是声明加入不同的 groupId,我有点担心这看起来是否正确。

2017-09-04 15:28:13.904 (    ) INFO consumer.internals.AbstractCoordinator             - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.906 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
2017-09-04 15:28:13.907 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0

还在 Spring 启动时输出 ConsumerConfig。我可以看到 groupId 是错误的,但是其他属性被正确接管。

据我所知,我可以通过在 ConsumerFactory 上设置 groupId 或使用 spring.kafka.consumer.group-id 在 application.properties 中设置来全局设置 groupId。但这两种变体都不起作用。

仅当我使用 @KafkaListener 注释配置 groupId 时,日志才会表明消费者加入了正确的组:

2017-09-04 15:38:30.787 (    ) DEBUG consumer.internals.AbstractCoordinator             - Received successful JoinGroup response for group myGroupId: org.apache.kafka.common.requests.JoinGroupResponse@4c51c449

使用此配置:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId")

我们正在使用 Spring Boot 2.0.0.M3(因此,Spring Kafka 2.0.0.M3)

最佳答案

这是 M3 中的一个错误; fixed on master (2.0.3.BUILD-SNAPSHOT)(以及 1.3.0.M2 中)。我们预计在本周晚些时候发布 2.0.0.RC1 候选版本(等待 Spring Framework RC4)。

关于java - Kafka消费者未加入自定义groupId,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46038364/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com