gpt4 book ai didi

java - 如何使用 spring-kafka 暂停和恢复 @KafkaListener

转载 作者:行者123 更新时间:2023-12-02 09:26:04 39 4
gpt4 key购买 nike

我已经实现了 Kafka 消费者,现在我有了一个场景。

  1. 从Kafka流中读取数据2.2.5.通过Srpingboot发布
  2. 加载数据库表1
  3. 将数据从表1复制到表2
  4. 清空表格1

要执行上述操作,我需要使用使用quartz的调度作业(已编写)来暂停/恢复Kafka消费者,该作业将数据从表1复制到表2。但是在此 Activity 期间,我希望我的Kafka监听器暂停,一旦复制完成,它应该恢复。

我的实现:

@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "data_pipe", partitions = { "0" })})
public void listen(ConsumerRecord<String, String> cr) throws Exception {

最佳答案

如果您使用'kafkaListener注释'自动创建KafkaListenerEndpointRegistry bean,那么,您可以像这样的代码使用它:

@Component
public class KafkaManager {

private final KafkaListenerEndpointRegistry registry;

public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}

public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}

文档:https://docs.spring.io/spring-kafka/reference/html/#pause-resume

关于java - 如何使用 spring-kafka 暂停和恢复 @KafkaListener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58336973/

39 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com