gpt4 book ai didi

node.js - NodeJS Kafka Consumer 收到重复消息?

转载 作者:太空宇宙 更新时间:2023-11-04 01:27:36 27 4
gpt4 key购买 nike

我在 NodeJS 应用程序中使用 Kafka-Node 来生成和使用消息。我启动了一个正在等待我的主题的消费者。然后我启动生产者并将消息发送到 Kafka。我的消费者正在将这些消息中的每一条插入到 Postgres 数据库中。

对于单个消费者来说,这工作得很好。

当我停止消费者并继续生产时,我会在大约 30 秒后重新启动消费者。我注意到大约有十几条消息已经从原始消费者插入到数据库中。

我假设当我杀死消费者时,还有尚未提交的抵消,这就是第二个消费者选择它们的原因?

处理这种情况的最佳方法是什么?

var kafka = require('kafka-node');
var utilities = require('./utilities');
var topics = ['test-ingest', 'test-ingest2'];
var groupName = 'test';

var options = {
groupId: groupName,
autoCommit: false,
sessionTimeout: 15000,
fetchMaxBytes: 1024 * 1024,
protocol: ['roundrobin'],
fromOffset: 'latest',
outOfRangeOffset: 'earliest'
};

var consumerGroup = new kafka.ConsumerGroup(options, topics);

// Print the message
consumerGroup.on('message', function (message) {

// Submit our message into postgres - return a promise
utilities.storeRecord(message).then((dbResult) => {

// Commit the offset
consumerGroup.commit((error, data) => {
if (error) {
console.error(error);
} else {
console.log('Commit success: ', data);
}
});

});

});

最佳答案

我不知道为什么 fromOffset: 'latest' 不适合你。一个简单的解决方法是使用 offset.fetchLatestOffsets 来获取最新的偏移量,然后从该点开始使用。

关于node.js - NodeJS Kafka Consumer 收到重复消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56975170/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com