gpt4 book ai didi

node.js - Node Redis XREAD 阻塞订阅

转载 作者:行者123 更新时间:2023-12-03 06:42:31 27 4
gpt4 key购买 nike

我正在将 redis 发布/订阅系统转换为 redis 流,以便我可以为服务器发送的事件添加一些容错。

订阅传统方式是微不足道的:

import { createClient } from 'redis';

const redisOptions = {
url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
console.log(channel);
console.log(message);
});

redis.subscribe('foo');

这将永久阻止,并保持连接打开。在这种情况下,发布到 redis 将添加到您的日志中。

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

切换到流,您使用 XREAD而不是订阅,和 XADD而不是发布,并且数据有很大的不同。我正在努力解决的部分是阻塞。

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
if (err) return console.error('Error reading from stream:', err);

str.forEach(message => {
console.log(message);
});
}

发送消息时,我的“订阅”会接收第一个消息,但不会记录更多消息。

最佳答案

说实话,我问这个问题只是因为谷歌对我不好,我找不到其他人发布关于这个问题的帖子。希望这可以帮助!
所以,XREAD仅在初始调用时阻塞。它将坐等一段时间(或者无限期,如果您将时间设置为 0),但是一旦它接收到数据,就认为它的职责已完成,并且它会解除阻塞。要使“订阅”保持事件状态,您需要调用 XREAD再次,使用来自流的最新 id。这取代了最初的 $我们通过它的值(value)。
递归似乎是一个完美的解决方案:

const xread = ({ stream, id }) => {
redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
if (err) return console.error('Error reading from stream:', err);

str[0][1].forEach(message => {
id = message[0];
console.log(id);
console.log(message[1]);
});

setTimeout(() => xread({ stream, id }), 0)
});
}

xread({ stream: 'asdf', id: '$' })

关于node.js - Node Redis XREAD 阻塞订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62179656/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com