gpt4 book ai didi

java - 如何在Spring Kafka中动态创建多个消费者

转载 作者:行者123 更新时间:2023-11-30 01:53:22 28 4
gpt4 key购买 nike

我有两个 Kafka 集群,我从数据库动态获取这些 IP。我正在使用 @KafkaListener 来创建监听器。现在我想根据引导服务器属性(逗号分隔值)在运行时创建多个 Kafka 监听器,每个监听器监听一个集群。您能建议我如何实现这一目标吗?

Spring 启动:2.1.3.RELEASE卡夫卡-2.0.1Java-8

最佳答案

您的要求尚不清楚,但假设您希望使用相同的监听器配置来监听多个集群,这里有一个解决方案。即,使监听器 bean 成为原型(prototype)并改变每个实例的容器工厂...

@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application {

public static void main(String[] args) {
SpringApplication.run(So55311070Application.class, args);
}

private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry) {

return args -> {
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster -> {
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
});
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
};
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener() {
return new MyListener();
}

}

class MyListener {

@KafkaListener(topics = "so55311070")
public void listen(String in) {
System.out.println(in);
}

}

@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties {

private String[] clusters;

public String[] getClusters() {
return this.clusters;
}

public void setClusters(String[] clusters) {
this.clusters = clusters;
}

}
kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

结果

group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]

编辑

添加代码以重试启动失败的容器。

事实证明,我们不需要监听器的本地映射,注册表有所有容器的映射,包括启动失败的容器。

@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application {

public static void main(String[] args) {
SpringApplication.run(So55311070Application.class, args);
}

private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {

return args -> {
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster -> {
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
});
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure) {
Runnable rescheduleTask = () -> {
registry.getListenerContainers().forEach(c -> {
this.atLeastOneFailure = false;
if (!c.isRunning()) {
System.out.println("Attempting restart of " + c.getGroupId());
try {
c.start();
}
catch (Exception e) {
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;
}
}
});
if (!this.atLeastOneFailure) {
this.restartTask.cancel(false);
}
};
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));
}
};
}

private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId) {

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try {
context.getBean("listener", MyListener.class);
}
catch (BeanCreationException e) {
this.atLeastOneFailure = true;
}
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener() {
return new MyListener();
}

@Bean
public TaskScheduler scheduler() {
return new ThreadPoolTaskScheduler();
}

}

class MyListener {

@KafkaListener(topics = "so55311070")
public void listen(String in) {
System.out.println(in);
}

}

关于java - 如何在Spring Kafka中动态创建多个消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55311070/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com