gpt4 book ai didi

spring - 错误处理-消费者-apache kafka和spring

转载 作者:行者123 更新时间:2023-12-03 07:38:51 26 4
gpt4 key购买 nike

我正在学习使用 kafka,我有两个服务,生产者和消费者。

生产者产生需要处理的消息(对服务和数据库的查询)。这些消息被消费者接收,它负责处理它们并将结果保存在数据库中

制作人

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
...
kafkaTemplate.send(topic, message);

消费者

@KafkaListener(topics = "....")
public void listen(@Payload String message) {
....
}

我希望消费者能够正确处理所有消息。我不知道在这种情况下如何处理消费者方面的错误。例如,数据库可能暂时禁用并且无法处理某些消息。

遇到这些情况怎么办?

我知道责任属于消费者。我可以重试,但如果数据库已关闭,则连续重试几次似乎不是一个好主意。如果我继续消费消息,索引会增加,我会丢失无法错误处理的事件。

最佳答案

您可以通过提交读取记录的偏移量的形式来控制 kafka 消费者。除非提交偏移量,否则 Kafka 将继续返回相同的记录。您可以将偏移提交设置为手动,并根据您的业务逻辑的成功决定是否提交。请参阅下面的示例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}

Consumer.commitsync() 提交偏移量。

另请参阅 kakfa 消费者文档以了解消费者抵消 here .

关于spring - 错误处理-消费者-apache kafka和spring,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57066995/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com