gpt4 book ai didi

java - Spring Kafka 并发与 spring-integration

转载 作者:行者123 更新时间:2023-11-30 10:15:58 26 4
gpt4 key购买 nike

我不知道我是否遗漏了一些明显的东西,或者 spring-integration-kafka:3.0.1 中存在一个错误,试图让多个消费者运行一个主题。该场景是一个具有 10 个分区的 Kafka 主题,以及一个监听它的 springboot-app。相关配置为:

应用程序.yml:

spring:
kafka:
consumer:
group-id: test-consumer
auto-offset-reset: earliest
listener:
concurrency: 4

配置:

@Configuration
@EnableIntegration
@IntegrationComponentScan("com.test")
public class MessageConfig {
@Bean
public MessageChannel testReceiveChannel() {
return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow testReceiveFlow(@Qualifier("kafkaConsumerFactory") final ConsumerFactory<?, ?> kafkaConsumer, final MessageChannel testReceiveChannel) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic"))
.transform(new JsonToObjectTransformer(EventMessage.class))
.channel(testReceiveChannel)
.get();
}
}

听众:

@Component
public class EventListener {
private static final Logger LOG = LoggerFactory.getLogger(EventListener.class);

@ServiceActivator(inputChannel = "testReceiveChannel")
public void processMessage(final EventMessage message) {
LOG.info("Got message {} on {}", message.getValue(), Thread.currentThread().getName());
}
}

启动时,我只有 1 个容器在所有 10 个分区上监听。我可以看到 ConcurrentKafkaListenerContainerFactory 设置了正确的并发值,但它似乎从未调用过 initializeContainer 方法(如果我理解,它将应用于实际的消费者它正确)。然而,我可能看到了完全错误的东西。

知道我忽略了什么吗?

最佳答案

Spring Boot KafkaProperties(例如 spring.kafka.listener.concurrency = 4)和提到的 ConcurrentKafkaListenerContainerFactory 应用于 @KafkaListener 组件。与 Spring Integration 完全无关。至少是自动的。

您需要手动执行此操作:

Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic")
.configureListenerContainer(c ->
c.concurrency(this.kafkaProperties.getListener().getConcurrency()))

关于java - Spring Kafka 并发与 spring-integration,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50235548/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com