gpt4 book ai didi

javascript - RxJS bufferWithCount() 不会因超时而暂停

转载 作者:行者123 更新时间:2023-11-28 19:03:34 26 4
gpt4 key购买 nike

我正在尝试控制慢速订阅者的流入。在 NodeJS 中尝试了以下内容

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var commJson = xmlNodeStream.bufferWithCount(2).publish();

var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});

commJson.connect();

当我 run the above code ,我希望慢速订阅者在收到下一个数据批处理之前每次都会暂停 5 秒。

但这并没有发生。在最初的 5 秒延迟后,所有数据都会以 2 为一批洪泛slowSubscriber

控制流量流入的正确方法是什么,以便慢速订阅者可以慢慢来(最好是快速订阅者可以等待慢速订阅者完成)?

最佳答案

它不会暂停,因为 setTimeout 不会阻止执行,它只是安排稍后完成的工作,即 2 秒后,然后更多数据进入,并安排 2 秒+ 从现在开始一些微小的三 Angular 洲。结果是快速订阅者和慢速订阅者将同时完成,但慢速订阅者的结果直到 2 秒后才会可视化。

如果您的实际用例中的慢订阅者确实是非阻塞的,那么您有两种选择来控制事件流,要么您需要从消息源控制流,无论消息源在哪里。或者您需要使用背压运算符之一,例如 controlled()

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();

var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () {
console.log("============\nSlowsub called: ", x);
controller.request(1);
}, 5000);
});

commJson.request(1);

关于javascript - RxJS bufferWithCount() 不会因超时而暂停,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32097094/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com