gpt4 book ai didi

javascript - kafka-node 有没有办法在崩溃后请求收到的消息?

转载 作者:太空宇宙 更新时间:2023-11-04 03:21:19 29 4
gpt4 key购买 nike

我一直在查看码头,现在想知道这是否是 kafka 或 node-kafka 的一个功能。

因此,我的消费者收到来自 kafka 的消息,当我处理该消息时,我喜欢将其标记为在处理完成之前我没有收到它。这样,如果服务器在处理时崩溃,我可以重新连接并接收我的应用程序在崩溃之前收到的消息。

高级示例。

  1. Kafka消息是一个用户id;
  2. 收到后,我会从数据库中获取用户信息。
  3. 我向用户发送电子邮件。
  4. 发送电子邮件时。我将其记录到数据库中。
  5. 我将消息标记为完成。 (我不确定这一步是否可行)

这样,如果任何步骤在第五步之前崩溃,当服务器恢复时,它会再次收到失败的消息。

我正在使用 node.js LTS 8.10.0 和 npm 包 kafka-node@2.4.1

最佳答案

我感谢 void,因为他的回答帮助我在 kafka-node 中找到了我需要的东西。这只是 node.js 答案/示例。

const kafkaConfig = config.get('kafka');
const kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaConfig.host});
const consumer = new kafka.Consumer(kafkaClient, [{topic: kafkaConfig.topic}], {autoCommit: false});
const offset = new kafka.Offset(kafkaClient);

// very import to retrieve the offset
offset.fetchLatestOffsets([kafkaConfig.topic], (error, offsets) => {
const latestOffset = offsets[kafkaConfig.topic][0];
consumer.setOffset(kafkaConfig.topic, 0, latestOffset);
});

consumer.on('message', message => {
// do stuff
Log.insert({message})
.then(() => {
// commit the success so that kafka will update the offset
consumer.commit((err, data) => console.log(err, data));
})
});

关于javascript - kafka-node 有没有办法在崩溃后请求收到的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49820799/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com