- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在尝试在从主题中阅读消息后立即提交消息。我已点击此链接 ( https://www.confluent.io/blog/apache-kafka-spring-boot-application ) 使用 spring 创建一个 Kafka 消费者。通常它工作完美,消费者收到消息并等待另一个人进入队列。但问题是,当我处理这些消息时,它会花费很多时间(大约 10 分钟)kafka 队列认为消息没有被消费(提交)并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续时间更长时,它不会提交消息。
我已经四处寻找一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但我失败了。一些来源是:
Spring Boot Kafka: Commit cannot be completed since the group has already rebalanced
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
Kafka 0.10 Java consumer not reading message from topic
https://github.com/confluentinc/confluent-kafka-dotnet/issues/470
主类在这里:
@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}
消费者类(我需要提交消息的地方)
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
如何在从队列中读取消息后立即提交消息。
我想确保在收到消息时立即提交消息。现在,当我在 (System.out.println) 之后完成执行该方法时,消息已提交。那么有人可以告诉我该怎么做吗?
-----更新------
很抱歉回复晚了,但正如@GirishB 所建议的那样,我一直在寻找 GirishB 的配置,但我看不到在哪里可以定义我想从我的配置文件 (applications.yml) 中读取/收听的主题.我看到的所有示例都使用与此类似的结构 ( http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html )。是否有任何选项可以让我阅读在其他服务器中声明的主题?使用类似于 @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
=========== 解决方案 1 ================================== ======
我听从了@victor gallet 的建议,并在 oder 中包含了混淆器属性的声明,以在 consume 方法中容纳“Acknowledgment”对象。我还点击此链接 (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration.java) 获取我用于声明和设置所有属性(consumerProperties、consumerFactory、kafkaListenerContainerFactory)的所有方法。我发现的唯一问题是“new SeekToCurrentErrorHandler() ”声明,因为我遇到了一个错误,目前我无法解决它(如果有人向我解释它会很棒)。
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
``````````````````````````````````````````````````````````
最佳答案
您必须将属性 enable.auto.commit
设置为 false 来修改您的消费者配置:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
然后,您必须修改 Spring Kafka Listener 工厂并将 ack-mode 设置为 MANUAL_IMMEDIATE
。下面是一个 ConcurrentKafkaListenerContainerFactory
示例:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
如文档中所述,MANUAL_IMMEDIATE
表示:当监听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。
您可以找到所有提交方法here .
然后,在您的监听器代码中,您可以通过添加一个Acknowledgment
对象来手动提交偏移量,例如:
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
// commit immediately
acknowledgment.acknowledge();
}
关于java - 在阅读主题后异步提交消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56354721/
我正在编写一个具有以下签名的 Java 方法。 void Logger(Method method, Object[] args); 如果一个方法(例如 ABC() )调用此方法 Logger,它应该
我是 Java 新手。 我的问题是我的 Java 程序找不到我试图用作的图像文件一个 JButton。 (目前这段代码什么也没做,因为我只是得到了想要的外观第一的)。这是我的主课 代码: packag
好的,今天我在接受采访,我已经编写 Java 代码多年了。采访中说“Java 垃圾收集是一个棘手的问题,我有几个 friend 一直在努力弄清楚。你在这方面做得怎么样?”。她是想骗我吗?还是我的一生都
我的 friend 给了我一个谜语让我解开。它是这样的: There are 100 people. Each one of them, in his turn, does the following
如果我将使用 Java 5 代码的应用程序编译成字节码,生成的 .class 文件是否能够在 Java 1.4 下运行? 如果后者可以工作并且我正在尝试在我的 Java 1.4 应用程序中使用 Jav
有关于why Java doesn't support unsigned types的问题以及一些关于处理无符号类型的问题。我做了一些搜索,似乎 Scala 也不支持无符号数据类型。限制是Java和S
我只是想知道在一个 java 版本中生成的字节码是否可以在其他 java 版本上运行 最佳答案 通常,字节码无需修改即可在 较新 版本的 Java 上运行。它不会在旧版本上运行,除非您使用特殊参数 (
我有一个关于在命令提示符下执行 java 程序的基本问题。 在某些机器上我们需要指定 -cp 。 (类路径)同时执行java程序 (test为java文件名与.class文件存在于同一目录下) jav
我已经阅读 StackOverflow 有一段时间了,现在我才鼓起勇气提出问题。我今年 20 岁,目前在我的家乡(罗马尼亚克卢日-纳波卡)就读 IT 大学。足以介绍:D。 基本上,我有一家提供簿记应用
我有 public JSONObject parseXML(String xml) { JSONObject jsonObject = XML.toJSONObject(xml); r
我已经在 Java 中实现了带有动态类型的简单解释语言。不幸的是我遇到了以下问题。测试时如下代码: def main() { def ks = Map[[1, 2]].keySet()
一直提示输入 1 到 10 的数字 - 结果应将 st、rd、th 和 nd 添加到数字中。编写一个程序,提示用户输入 1 到 10 之间的任意整数,然后以序数形式显示该整数并附加后缀。 public
我有这个 DownloadFile.java 并按预期下载该文件: import java.io.*; import java.net.URL; public class DownloadFile {
我想在 GUI 上添加延迟。我放置了 2 个 for 循环,然后重新绘制了一个标签,但这 2 个 for 循环一个接一个地执行,并且标签被重新绘制到最后一个。 我能做什么? for(int i=0;
我正在对对象 Student 的列表项进行一些测试,但是我更喜欢在 java 类对象中创建硬编码列表,然后从那里提取数据,而不是连接到数据库并在结果集中选择记录。然而,自从我这样做以来已经很长时间了,
我知道对象创建分为三个部分: 声明 实例化 初始化 classA{} classB extends classA{} classA obj = new classB(1,1); 实例化 它必须使用
我有兴趣使用 GPRS 构建车辆跟踪系统。但是,我有一些问题要问以前做过此操作的人: GPRS 是最好的技术吗?人们意识到任何问题吗? 我计划使用 Java/Java EE - 有更好的技术吗? 如果
我可以通过递归方法反转数组,例如:数组={1,2,3,4,5} 数组结果={5,4,3,2,1}但我的结果是相同的数组,我不知道为什么,请帮助我。 public class Recursion { p
有这样的标准方式吗? 包括 Java源代码-测试代码- Ant 或 Maven联合单元持续集成(可能是巡航控制)ClearCase 版本控制工具部署到应用服务器 最后我希望有一个自动构建和集成环境。
我什至不知道这是否可能,我非常怀疑它是否可能,但如果可以,您能告诉我怎么做吗?我只是想知道如何从打印机打印一些文本。 有什么想法吗? 最佳答案 这里有更简单的事情。 import javax.swin
我是一名优秀的程序员,十分优秀!