gpt4 book ai didi

spring - 当与 Apache Kafka Server 的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?

转载 作者:行者123 更新时间:2023-12-04 03:03:14 25 4
gpt4 key购买 nike

我目前正在实现一个微服务,它从 Apache Kafka 主题中读取数据。我正在为微服务使用“spring-boot,版本:1.5.6.RELEASE”,为同一微服务中的监听器使用“spring-kafka,版本:1.2.2.RELEASE”。这是我的卡夫卡配置:

    @Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
}};
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

我已经通过@KafkaListener注解实现了监听器:

@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
//business logic
latch.countDown();
}

我需要能够在监听器与 Apache Kafka 服务器断开连接时关闭微服务。

当我终止 kafka 服务器时,我在 spring boot 日志中收到以下消息:

2017-11-01 19:58:15.721  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup

当我启动 kafka sarver 时,我得到:

2017-11-01 20:01:37.748  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.

很明显,我的微服务中的 Spring Kafka Listener 能够检测到 Kafka 服务器何时启动并运行,何时停止运行。在书中由 confluent Kafka The Definitive GuideBut How Do We Exit? 一章中提到需要在 Consumer 上调用 wakeup() 方法,以便 WakeupException会被抛出。因此,我尝试使用 @EventListener 标记捕获这两个事件(Kafka 服务器关闭和 Kafka 服务器启动),如 Spring for Apache Kafka documentation 中所述。 ,然后调用 wakeup()。但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况。有人可以帮我解决这个问题吗?提前致谢。

最佳答案

我不知道如何获得服务器停机情况的通知(根据我的经验,消费者会在 poll() 中进入一个紧密循环)。

但是,如果您弄清楚了,您可以停止监听器容器,这将唤醒消费者并退出紧密循环...

@Autowired
private KafkaListenerEndpointRegistry registry;

...

this.registry.stop();

2017-11-01 16:29:54.290 INFO 21217 --- [ad | so47062346] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group so47062346

2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

...

2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped

您可以通过添加 reconnect.backoff.ms 来改进紧密循环,但是 poll() 永远不会退出,因此我们无法发出空闲事件。

spring:
kafka:
consumer:
enable-auto-commit: false
group-id: so47062346
properties:
reconnect.backoff.ms: 1000

我想您可以启用空闲事件并使用计时器来检测您是否在一段时间内没有收到任何数据(或空闲事件),然后停止容器。

关于spring - 当与 Apache Kafka Server 的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47062346/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com