gpt4 book ai didi

java - 在阅读主题后异步提交消息

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:30:50 26 4
gpt4 key购买 nike

我正在尝试在从主题中阅读消息后立即提交消息。我已点击此链接 ( https://www.confluent.io/blog/apache-kafka-spring-boot-application ) 使用 spring 创建一个 Kafka 消费者。通常它工作完美,消费者收到消息并等待另一个人进入队列。但问题是,当我处理这些消息时,它会花费很多时间(大约 10 分钟)kafka 队列认为消息没有被消费(提交)并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续时间更长时,它不会提交消息。

我已经四处寻找一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但我失败了。一些来源是:

Spring Boot Kafka: Commit cannot be completed since the group has already rebalanced

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Kafka 0.10 Java consumer not reading message from topic

https://github.com/confluentinc/confluent-kafka-dotnet/issues/470

主类在这里:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}


消费者类(我需要提交消息的地方)

@Service
public class Consumer {

@Autowired
AppPropert prop;

Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();

List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {

CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");

}

}


如何在从队列中读取消息后立即提交消息。

我想确保在收到消息时立即提交消息。现在,当我在 (System.out.println) 之后完成执行该方法时,消息已提交。那么有人可以告诉我该怎么做吗?

-----更新------

很抱歉回复晚了,但正如@GirishB 所建议的那样,我一直在寻找 GirishB 的配置,但我看不到在哪里可以定义我想从我的配置文件 (applications.yml) 中读取/收听的主题.我看到的所有示例都使用与此类似的结构 ( http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html )。是否有任何选项可以让我阅读在其他服务器中声明的主题?使用类似于 @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")

=========== 解决方案 1 ================================== ======

我听从了@victor gallet 的建议,并在 oder 中包含了混淆器属性的声明,以在 consume 方法中容纳“Acknowledgment”对象。我还点击此链接 (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration.java) 获取我用于声明和设置所有属性(consumerProperties、consumerFactory、kafkaListenerContainerFactory)的所有方法。我发现的唯一问题是“new SeekToCurrentErrorHandler() ”声明,因为我遇到了一个错误,目前我无法解决它(如果有人向我解释它会很棒)。


@Service
public class Consumer {

@Autowired
AppPropert prop;

Consumer cons;


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}

@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}




@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();

List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {

CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");

}

}

``````````````````````````````````````````````````````````

最佳答案

您必须将属性 enable.auto.commit 设置为 false 来修改您的消费者配置:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后,您必须修改 Spring Kafka Listener 工厂并将 ack-mode 设置为 MANUAL_IMMEDIATE。下面是一个 ConcurrentKafkaListenerContainerFactory 示例:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}

如文档中所述,MANUAL_IMMEDIATE 表示:当监听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

您可以找到所有提交方法here .

然后,在您的监听器代码中,您可以通过添加一个Acknowledgment对象来手动提交偏移量,例如:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
// commit immediately
acknowledgment.acknowledge();
}

关于java - 在阅读主题后异步提交消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56354721/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com