gpt4 book ai didi

javascript - 具有异步订阅者功能的 RxJS Observable

转载 作者:搜寻专家 更新时间:2023-10-31 23:45:55 27 4
gpt4 key购买 nike

我正在尝试做一些感觉应该很简单的事情,但事实证明却出奇地困难。

我有一个订阅 RabbitMQ 队列的函数。具体来说,这是 Channel.consume 函数:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

它返回一个使用订阅 ID 解决的 promise - 稍后需要取消订阅 - 并且还有一个回调参数,当消息从队列中拉出时调用。

当我想取消订阅队列时,我需要在此处使用 Channel.cancel 函数取消消费者:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel .这采用之前返回的订阅 ID。

我想将所有这些东西包装在一个 Observable 中,该 Observable 在订阅 observable 时订阅队列,并在取消订阅 observable 时取消订阅。然而,由于调用的“双异步”性质(我的意思是说它们既有回调又有返回 promise ),这被证明有些困难。

理想情况下,我希望能够编写的代码是:

return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async () => {
await channel.cancel(consumeResult.consumerTag);
};
});

但是,这是不可能的,因为此构造函数不支持异步订阅者函数或拆卸逻辑。

我一直没能弄明白这一点。我在这里错过了什么吗?为什么这么难?

干杯,亚历克斯

最佳答案

创建的可观察对象不需要等待 channel.consume promise 解析,因为观察者(它是传递的观察者,而不是订阅者)仅从您提供的函数中调用.

但是,您返回的取消订阅函数必须等待该 promise 解决。它可以在内部做到这一点,就像这样:

return new Rx.Observable((observer) => {
var consumeResult = channel.consume(queueName, (message) => observer.next(message));
return () => {
consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
};
});

关于javascript - 具有异步订阅者功能的 RxJS Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43242578/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com