gpt4 book ai didi

node.js - 如何解决 amqplib Channel#consume 奇怪的签名?

转载 作者:搜寻专家 更新时间:2023-10-31 23:53:11 25 4
gpt4 key购买 nike

我正在编写一个使用 amqplib 的 Channel#consume 方法的 worker。我希望这个工作人员等待作业并在它们出现在队列中时立即处理它们。

我编写了自己的模块来抽象出 ampqlib,这里是获取连接、设置队列和使用消息的相关函数:

const getConnection = function(host) {
return amqp.connect(host);
};

const createChannel = function(conn) {
connection = conn;
return conn.createConfirmChannel();
};

const assertQueue = function(channel, queue) {
return channel.assertQueue(queue);
};

const consume = Promise.method(function(channel, queue, processor) {
processor = processor || function(msg) { if (msg) Promise.resolve(msg); };
return channel.consume(queue, processor)
});

const setupQueue = Promise.method(function setupQueue(queue) {
const amqp_host = 'amqp://' + ((host || process.env.AMQP_HOST) || 'localhost');

return getConnection(amqp_host)
.then(conn => createChannel(conn)) // -> returns a `Channel` object
.tap(channel => assertQueue(channel, queue));
});

consumeJob: Promise.method(function consumeJob(queue) {
return setupQueue(queue)
.then(channel => consume(channel, queue))
});

我的问题是 Channel#consume 的奇怪签名。来自 http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume :

#consume(queue, function(msg) {...}, [options, [function(err, ok) {...}]])

回调不是魔法发生的地方,消息的处理实际上应该在第二个参数中进行,这会破坏 promise 的流程。

这就是我计划使用它的方式:

return queueManager.consumeJob(queue)
.then(msg => {
// do some processing
});

但它不起作用。如果队列中没有消息,则 promise 被拒绝,然后如果队列中有消息被丢弃,则什么也不会发生。如果有一条消息,则只处理一条消息,然后 worker 停止,因为它从 Channel#consume 调用中退出了“处理器”函数。

我该怎么办?我想保留 queueManager 抽象,这样我的代码更容易推理,但我不知道该怎么做……有任何指示吗?

最佳答案

正如@idbehold 所说,Promises 只能解决一次。如果您想在消息传入时对其进行处理,那么除了使用该函数之外别无他法。 Channel#get 只会检查一次队列然后返回;它不适用于需要工作人员的场景。

关于node.js - 如何解决 amqplib Channel#consume 奇怪的签名?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34615091/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com