gpt4 book ai didi

javascript - RxJs 避免外部状态但仍然访问以前的值

转载 作者:搜寻专家 更新时间:2023-10-31 23:11:53 25 4
gpt4 key购买 nike

我正在使用 RxJs 来监听 amqp 队列(不是很相关)。

我有一个函数 createConnection,它返回一个 Observable,它发出新的 connection 对象。建立连接后,我想每 1000 毫秒 通过它发送消息,并在发送 10 条消息后关闭连接。

我试图避免外部状态,但如果我不将连接存储在外部变量中,我该如何关闭它?看到我从连接开始,然后是 flatMap 和推送消息,所以在几个链之后我不再有连接对象。

这不是我的流程,但想象一下这样的事情:

createConnection()
.flatMap(connection => connection.createChannel())
.flatMap(channel => channel.send(message))
.do(console.log)
.subscribe(connection => connection.close()) <--- obviously connection isn't here

现在我明白这样做很愚蠢,但现在我该如何访问连接?我当然可以从 var connection = createConnection()

开始

后来以某种方式加入其中。但是我该怎么做呢?我什至不知道如何正确地问这个问题。底线,我有一个可观察的,它发出一个连接,在连接打开后我想要一个每 1000 毫秒发出消息的可观察的(使用 take(10)),然后关闭连接

最佳答案

您问题的直接答案是“您可以完成每一步”。例如,您可以替换这一行

.flatMap(connection => connection.createChannel())

用这个:

.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))

并一直保留对连接的访问​​权限。

但是还有另一种方法可以做您想做的事。假设您的 createConnection 和 createChannel 函数如下所示:

function createConnection() {
return Rx.Observable.create(observer => {
console.log('creating connection');
const connection = {
createChannel: () => createChannel(),
close: () => console.log('disposing connection')
};

observer.onNext(connection);

return Rx.Disposable.create(() => connection.close());
});
}

function createChannel() {
return Rx.Observable.create(observer => {
const channel = {
send: x => console.log('sending message: ' + x)
};

observer.onNext(channel);

// assuming no cleanup here, don't need to return disposable
});
}

createConnection(和 createChannel,但我们将重点关注前者)返回冷可观察对象;每个订阅者都将获得自己的包含单个连接的连接流,当该订阅到期时,将自动调用处理逻辑。

这允许你做这样的事情:

const subscription = createConnection()
.flatMap(connection => connection.createChannel())
.flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i })))
.take(10)
.subscribe(x => x.channel.send(x.data))
;

实际上,您不必为了清理而处置订阅; take(10) 满足后,整个链将完成并触发清理。您需要在订阅上显式调用处置的唯一原因是,如果您想在 10 1000 毫秒间隔结束之前拆除内容。

请注意,此解决方案还包含对您的问题的直接回答的实例:我们将 channel 推到线路下方,以便我们可以在传递给订阅调用的 onNext lambda 中使用它(通常是此类代码出现的地方) .

这是整个工作过程:https://jsbin.com/korihe/3/edit?js,console,output

关于javascript - RxJs 避免外部状态但仍然访问以前的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36612519/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com