我需要将追赶和订阅新提要结合起来。因此,首先我在数据库中查询我错过的所有新记录,然后切换到发布订阅以获取所有传入的新记录。
第一部分很容易进行查询,也许批量为 500 个,这将为您提供一个数组,您可以 rx.observeFrom 。
第二部分很简单,你只需在 pubsub 上放置一个 rx.observe 即可。
但我需要做的是按顺序进行,因此我需要在开始播放新唱片之前播放所有旧唱片。
我想我可以开始订阅 pubsub,将它们放入一个数组中,然后开始处理旧的,当我完成后,要么删除重复项(或者因为我做了重复项检查),要么允许少数重复项,但播放累积的记录,直到它们消失,然后一进一出。
我的问题是最好的方法是什么?我应该创建一个订阅来开始在数组中构建新记录,然后开始处理旧记录,然后在旧记录过程的“然后”中订阅另一个数组吗?
好的,这就是我到目前为止所拥有的。我需要构建测试并完成一些伪代码以查明它是否有效,更不用说是一个好的实现了。在我埋葬自己之前,请随意阻止我。
var catchUpSubscription = function catchUpSubscription(startFrom) {
EventEmitter.call(this);
var subscription = this.getCurrentEventsSubscription();
// calling map to start subscription and catch in an array.
// not sure if this is right
var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x);
// getPastEvents gets batches of 500 iterates over and emits each
// till no more are returned, then resolves a promise
this.getPastEvents({count:500, start:startFrom})
.then(function(){
rx.Observable.fromArray(events).forEach(x=> emit('event', x));
});
};
我不知道这是最好的方法。有什么想法吗?
谢谢
我会避免不必要地混合不同的异步策略。您可以使用 concat
将两个序列连接在一起:
var catchUpSubscription = function catchUpSubscription(startFrom) {
var subscription = this.getCurrentEventsSubscription();
return Rx.Observable.fromPromise(this.getPastEvents({count:500, start:startFrom}))
.flatMap(x => x)
.concat(Rx.Observable.fromEvent(subscription, 'event'));
};
///Sometime later
catchUpSubscription(startTime).subscribe(x => /*handle event*/)
我是一名优秀的程序员,十分优秀!