gpt4 book ai didi

arrays - rxjs 从两个来源追上订阅

转载 作者:太空宇宙 更新时间:2023-11-03 23:36:04 25 4
gpt4 key购买 nike

我需要将追赶和订阅新提要结合起来。因此,首先我在数据库中查询我错过的所有新记录,然后切换到发布订阅以获取所有传入的新记录。

第一部分很容易进行查询,也许批量为 500 个,这将为您提供一个数组,您可以 rx.observeFrom 。

第二部分很简单,你只需在 pubsub 上放置一个 rx.observe 即可。

但我需要做的是按顺序进行,因此我需要在开始播放新唱片之前播放所有旧唱片。

我想我可以开始订阅 pubsub,将它们放入一个数组中,然后开始处理旧的,当我完成后,要么删除重复项(或者因为我做了重复项检查),要么允许少数重复项,但播放累积的记录,直到它们消失,然后一进一出。

我的问题是最好的方法是什么?我应该创建一个订阅来开始在数组中构建新记录,然后开始处理旧记录,然后在旧记录过程的“然后”中订阅另一个数组吗?

好的,这就是我到目前为止所拥有的。我需要构建测试并完成一些伪代码以查明它是否有效,更不用说是一个好的实现了。在我埋葬自己之前,请随意阻止我。

var catchUpSubscription = function catchUpSubscription(startFrom) {
EventEmitter.call(this);
var subscription = this.getCurrentEventsSubscription();
// calling map to start subscription and catch in an array.
// not sure if this is right
var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x);
// getPastEvents gets batches of 500 iterates over and emits each
// till no more are returned, then resolves a promise
this.getPastEvents({count:500, start:startFrom})
.then(function(){
rx.Observable.fromArray(events).forEach(x=> emit('event', x));
});
};

我不知道这是最好的方法。有什么想法吗?

谢谢

最佳答案

我会避免不必要地混合不同的异步策略。您可以使用 concat 将两个序列连接在一起:

var catchUpSubscription = function catchUpSubscription(startFrom) {
var subscription = this.getCurrentEventsSubscription();

return Rx.Observable.fromPromise(this.getPastEvents({count:500, start:startFrom}))
.flatMap(x => x)
.concat(Rx.Observable.fromEvent(subscription, 'event'));

};

///Sometime later
catchUpSubscription(startTime).subscribe(x => /*handle event*/)

关于arrays - rxjs 从两个来源追上订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32232969/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com