作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在我们的应用程序中,我们使用 kafka 消费者来确定发送电子邮件。
前几天我们遇到了一个问题,kafka 分区在读取和处理其所有记录之前超时。结果,它循环回到分区的开始,无法完成它收到的记录集,并且循环开始后生成的新数据从未得到处理。
我的团队建议我们可以在读取每条消息后告诉 Kafka 提交,但是我不知道如何从 Spring-kakfa 做到这一点。
应用程序使用 spring-kafka 2.1.6,消费者代码有点像这样。
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
try{
EmailData data = objectMapper.readValue(message, EmailData.class);
if(isEligableForEmail(data)){
emailHandler.sendEmail(data)
}
} catch (Exception e) {
log.error("Error: "+e.getMessage(), e);
}
}
注意:sendEmail 函数使用 CompletableFutures,因为它必须在发送电子邮件之前调用不同的 API。
配置:(消费者的 yaml 文件片段,以及生产者的一部分)
consumer:
max.poll.interval.ms: 3600000
producer:
retries: 0
batch-size: 100000
acks: 0
buffer-memory: 33554432
request.timeout.ms: 60000
linger.ms: 10
max.block.ms: 5000
最佳答案
如果你想要手动确认,那么你可以在方法参数中提供确认
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, Acknowledgment ack, @Header("kafka_offset") int offSet) {
When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.
来自文档的示例 @KafkaListener Annotation
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
关于java - 如何在 spring-kafka 中的每条 kafka 消息后提交?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58566465/
我是一名优秀的程序员,十分优秀!