gpt4 book ai didi

node.js - 如何使用 kafka-node 控制消费的 kafka 消息的提交

转载 作者:搜寻专家 更新时间:2023-11-01 00:05:56 24 4
gpt4 key购买 nike

我第一次使用 Node 和 kafka,使用 kafka-node。使用消息需要调用外部 API,这甚至可能需要一秒钟的时间来响应。我希望克服我的消费者突然失败的情况,如果一个消费者失败,另一个将消费并取代它的消费者将收到相同的消息,表明其工作未完成。

我正在使用 kafka 0.10 并尝试使用 ConsumerGroup。

我想到了在选项中设置 autoCommit: false,并仅在其工作完成后才提交消息(就像我之前对一些 Java 代码所做的那样)。

但是,我似乎无法确定如何在消息完成后才正确提交消息。我该如何提交?

我担心的另一个问题是,由于回调,似乎在上一条消息完成之前正在读取下一条消息。恐怕如果消息 x+2 在消息 x+1 之前完成,那么偏移量将设置为 x+2,因此在失败的情况下 x+1 将永远不会被重新执行。

这基本上是我到目前为止所做的:

var options = {
host: connectionString,
groupId: consumerGroupName,
id: clientId,
autoCommit: false
};

var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;

var consumerGroup = new ConsumerGroup(options, topic);

consumerGroup.on('connect', function() {
console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});

consumerGroup.on('message', function(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
console.log(message.value);
doSomeStuff(function() {
// HOW TO COMMIT????
consumerGroup.commit(function(err, data) {
console.log("------ Message done and committed ------");
});
});
});

consumerGroup.on('error', function(err) {
console.log("Error in consumer: " + err);
close();
});

process.once('SIGINT', function () {
close();
});

var close = function() {
// SHOULD SEND 'TRUE' TO CLOSE ???
consumerGroup.close(true, function(error) {
if (error) {
console.log("Consuming closed with error", error);
} else {
console.log("Consuming closed");
}
});
};

最佳答案

您可以在这里做的一件事是为您处理的每条消息设置重试机制。

你可以引用我在这个帖子上的回答: https://stackoverflow.com/a/44328233/2439404

我使用 kafka-consumer 消费来自 Kafka 的消息,使用 async/cargo 将它们批处理在一起,然后将它们放入 async/queue(在-内存队列)。队列将工作函数作为参数,我将向其传递 async/retryable

对于你的问题,你可以只使用 retryable 来处理你的消息。 https://caolan.github.io/async/docs.html#retryable

这可能会解决您的问题。

关于node.js - 如何使用 kafka-node 控制消费的 kafka 消息的提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40111532/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com