gpt4 book ai didi

javascript - 如何在 rxjs 中链接可观察对象

转载 作者:行者123 更新时间:2023-11-29 21:34:42 25 4
gpt4 key购买 nike

我有一个可观察对象,它从服务器拉取事件,过滤应用程序类型的事件,然后订阅事件并将其分派(dispatch)给一个或多个处理程序进行处理。

然后处理程序开始对数据库进行一些异步更新,我发现可观察对象会如此快速地发出事件,以至于更新相互影响。这是我应该预料到的。

所以我认为我需要我的处理程序每​​个都使用自己的可观察对象来充当一个队列,该队列将处理一个事件并等待确认。

所以我的问题是,如何创建一个连续接收消息并一次发送一条消息以等待确认然后再发布下一条消息的可观察对象?

此外,我认为可观察对象必须是冷的,因为我不能丢失消息。

最佳答案

我认为运算符 concatMap 所做的事情与您正在寻找的东西很接近。您可以在此处查看以前的答案,以说明 concatMap 的类似用例: RxJS queueing dependent tasks

它很接近但不完全是您想要的,因为不需要等待 ACK 信号来释放下一个值。相反,concatMap 使用当前“已执行”可观察对象的完成信号来订阅下一个。如果您的可观察对象在某处包含对数据库执行更新,那么这些更新将按顺序执行。例如:

function handler (source$) {
// source$ is your source of events from which you generate the update calls
return source$.concatMap(function (event){
return updateDB(event);
})
}

function updateDB(event) {
return Rx.Observable.create(function(observer){
// do the update in the db
// you probably have a success and error handler
// you plug the observer notification into those handlers
if (success) {
// if you need to pass down some value from the update
observer.onNext(someValue);
// In any case, signal completion to allow concatMap to move to next update
observer.onCompleted();
}
if (error) {observer.onError(error);}
})
}

这是专门针对您正在使用的库的通用代码。根据数据库更新函数的 API,您可以直接使用操作符 fromNodeCallbackfromCallback

尽管如此,请注意,在执行当前可观察对象时,可能会涉及一些缓冲以保持下一个可观察对象,并且该缓冲区只能是有限的,因此如果生产者之间的速度确实存在显着差异和消费者,或内存限制,您可能希望以不同的方式处理事情。

此外,如果您使用的是 RxJS v5,onError 变为 erroronComplete 变为 completeonNext 变为 next(参见 new observer interface)。

最后的评论,你的流的有损/无损性质是一个不同于流的热与冷性质的概念。你可以看看illustrated subscription and data flows对于这两种类型的流。

关于javascript - 如何在 rxjs 中链接可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35158387/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com