gpt4 book ai didi

javascript - rxjs 在处理数据之前检查流是否为空

转载 作者:太空宇宙 更新时间:2023-11-04 16:32:14 24 4
gpt4 key购买 nike

我们有以下流。

const recorders = imongo.listCollections('recorders')
.flatMapConcat(names => {
const recorders = names
.map(entry => entry.name)
.filter(entry => !_.contains(
['recorders.starts',
'recorders.sources',
'system.indexes',
'system.users'],
entry));
console.log(recorders);
return Rx.Observable.fromArray(recorders);
});

recorders.isEmpty()
.subscribe(
empty => {
if(empty) {
logger.warn('No recorders found.');
}
},
() => {}
);

recorders.flatMapConcat(createRecorderIntervals)
.finally(() => process.exit(0))
.subscribe(
() => {},
e => logger.error('Error while updating: %s', e, {}),
() => logger.info('Finished syncing all recorders')
);

如果流为空,那么我们不想createRecorderIntervals。上面的代码正在运行。但是,检查流是否为空会导致 console.log 被执行两次。为什么会发生这种情况?我能以某种方式修复它吗?

编辑:所以,在重新考虑了@Martin的回答后,我采取了以下方式

const recorders = imongo.listCollections('recorders')
.flatMapConcat(names => {
const recorders = names
.map(entry => entry.name)
.filter(entry => !_.contains(
['recorders.starts',
'recorders.sources',
'system.indexes',
'system.users'],
entry));

if(!recorders.length) {
logger.warn('No recorders found.');
return Rx.Observable.empty();
}

return Rx.Observable.fromArray(recorders);
})
.flatMapConcat(createRecorderIntervals)
.finally(() => scheduleNextRun())
.subscribe(
() => {},
e => logger.error('Error while updating: %s', e, {}),
() => logger.info('Finished syncing all recorders')
);

最佳答案

当您在 Observable 上调用 subscribe() 方法时,它会创建整个运算符链,然后调用 imongo.listCollections('recorders ') 在你的情况下两次。

您可以在调用 flatMapConcat(createRecorderIntervals) 之前插入一个运算符来检查结果是否为空。我特别考虑其中一个,但可能还有其他更适合您的需求:

  • takeWhile() - 将谓词作为参数,并在返回 false 时发出 onComplete

那么你的代码将如下所示:

const recorders = imongo.listCollections('recorders')
.flatMapConcat(names => {
...
return Rx.Observable.fromArray(recorders);
})
.takeWhile(function(result) {
// condition
})
.flatMapConcat(createRecorderIntervals)
.finally(() => process.exit(0))
.subscribe(...);

我不知道你的代码到底是做什么的,但我希望你明白。

编辑:如果您想在整个 Observable 为空时收到通知,有多种方法:

  • do()运算符和自定义Observer目的。您将编写一个自定义观察者,并在 .flatMapConcat(createRecorderIntervals) 之前使用 do() 运算符将其放置。该对象将计算其 next 回调被调用的次数,并且当前面的 Observable 完成时,您可以判断是否至少有一个结果或根本没有结果。

  • 创建 ConnectableObservable 。这可能与我们一开始所做的最相似。您将使用 publish() 运算符将您的 recorders 转换为 ConnectableObservable。然后就可以订阅多个Observer,而不需要触发算子链。当您订阅了所有观察者后,您可以调用 connect() ,它将按顺序向所有观察者发送值:

    var published = recorders.publish();

    published.subscribe(createObserver('SourceA'));
    published.subscribe(createObserver('SourceB'));

    // Connect the source
    var connection = published.connect();

    在您的情况下,您将创建两个 Subjects (因为它们同时充当 Observable 和 Observer),并将其中一个与 isEmpty() 链接起来,第二个是 flatMapConcat()。有关更多信息,请参阅文档:http://reactivex.io/documentation/operators/connect.html

我认为第一个选项实际上对您来说更容易。

关于javascript - rxjs 在处理数据之前检查流是否为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39698287/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com