gpt4 book ai didi

spring-boot - ConcurrentMessageListenerContainer 不是并发的

转载 作者:行者123 更新时间:2023-12-04 02:52:28 24 4
gpt4 key购买 nike

我正在尝试创建一个多线程监听器,但所有消息都在同一个线程中执行。运行时,线程 ID 始终相同,甚至认为 KafkaListerContainerFactory 是(正确地)我实例化的那个。如果我几乎同时发送 7 条消息,我希望前三个并发处理,然后是接下来的三个并发,然后是最后一个。我看到的是第一个过程完成,然后是第二个,然后是第三个,依此类推。我是不是误解了什么,或者只是配置错误?

这是我的听众:

@Component
public class ExampleKafkaController {
Log log = Log.getLog(ExampleKafkaController.class);

@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;

@KafkaListener(topics = "${kafka.example.topic}")
public void listenForMessage(ConsumerRecord<?, ?> record) {
log.info("Got record:\n" + record.value());
System.out.println("Kafka Thread: " + Thread.currentThread());
System.out.println(kafkaListenerContainerFactory);

log.info("Waiting...");
waitSleep(10000);

log.info("Done!");
}

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Value("${kafka.example.topic}")
public String topic;

public void send(String payload) {
log.info("sending payload='" + payload + "' to topic='" + topic + "'");
kafkaTemplate.send(topic, payload);
}

private void waitSleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

这是我使用新配置的应用程序:

@SpringBootApplication
@ComponentScan("net.reigrut.internet.services.example.*")
@EntityScan("net.reigrut.internet.services.example.*")
@EnableKafka
@Configuration
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Autowired
ConsumerFactory<Integer,String> consumerFactory;

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
System.out.println("===========>" + consumerFactory);
System.out.println(factory);
return factory;
}
}

最佳答案

对于 Kafka,并发性受限于主题中的分区数。如果只有一个分区,则无论容器的并发设置如何,您都只会在一个线程上接收消息。

您应该提供大于或等于您想要的并发数的分区数。如果分区数大于并发数,分区将分布在消费者线程中。

一组中只有一个消费者可以从一个分区消费。

关于spring-boot - ConcurrentMessageListenerContainer 不是并发的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47878258/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com