gpt4 book ai didi

rxjs - 每当另一个 Observable 发射时跳过来自源的下一次发射的运算符

转载 作者:行者123 更新时间:2023-12-04 08:03:26 25 4
gpt4 key购买 nike

我有一个用例,当另一个通知程序 Observable 发出时,我需要一个 Observable 跳过它的下一次发射。

source:    |---X---X---X---X---X---X---X---X---X---X--|>
notifier: |-------------X---------X----------X-------|>
result: |---X---X---X-------X---X-------X-------X--|>

基本上,我想要一个名为 skipNextWhen 的运算符它接收通知器 observable 并跳过来自源的下一次发射。

我尝试使用使用 pausable 的实现运算符(使用 switchMap 重新实现),但无法使其工作。

pausable.ts
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';

declare module 'rxjs/Observable' {
interface Observable<T> {
pausable: typeof pausable;
}
}

function pausable<T>(notifier: Observable<boolean>): Observable<T> {
return notifier.startWith(false).switchMap((paused) => {
if (paused) {
return Observable.never();
} else {
const source = new Subject();
this.subscribe(source);
return source;
}
});
}

Observable.prototype.pausable = pausable;

skipNextWhen.ts
import { Observable } from 'rxjs/Observable';
import './pausable';

declare module 'rxjs/Observable' {
interface Observable<T> {
skipNextWhen: typeof skipNextWhen;
}
}

function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
const notifier = Observable.merge(this.map(() => false),
other.map(() => true));
return this.pausable(notifier);
}

Observable.prototype.skipNextWhen = skipNextWhen;

我应该考虑使用更合适的运算符吗?我在当前实现中看到的行为是结果 Observable 发出一次,然后再也不会发出 - 即使通知程序 Observable 从未发出。

最佳答案

我可以想到两种解决方案:

  • 使用 .filter() , .do()和一些副作用。

    即使不是那种“Rx”方式,这也可能更容易理解解决方案:
    function skipNextWhen(other) {
    let skipNext = false;

    return this.merge(other.do(() => skipNext = true).filter(() => false))
    .filter(val => {
    const doSkip = skipNext;
    skipNext = false;
    return !doSkip;
    });
    }

    我正在使用 merge()只是为了更新skipNext , other的值总是被忽略。
  • 使用 .scan() :

    该解决方案没有任何状态变量和副作用。
    function skipNextWhen(other) {
    const SKIP = 'skip';

    return this.merge(other.mapTo(SKIP))
    .scan((acc, val) => {
    if (acc === SKIP) {
    return null;
    } else if (val === SKIP) {
    return SKIP;
    } else {
    return val;
    }
    }, [])
    .filter(val => Boolean(val) && val !== SKIP);
    }

    基本上,当 SKIP到了我马上退回,因为它会在acc再次通过参数由scan()运算符,后来被 filter() 忽略.

    如果我收到一个正常值但以前的值是 SKIP我忽略它并返回 null后来过滤掉了。

  • 两种解决方案都给出了相同的结果:
    Observable.prototype.skipNextWhen = skipNextWhen;

    const source = Observable.range(1, 10)
    .concatMap(val => Observable.of(val).delay(100));

    source
    .skipNextWhen(Observable.interval(350))
    .subscribe(console.log);

    这将打印以下内容:
    1
    2
    3
    5
    6
    8
    9
    10

    请注意,您实际上并不是在创建新的运算符。您只有运算符(operator)链的快捷方式。例如,这不会让您取消订阅 other当源完成时。

    关于rxjs - 每当另一个 Observable 发射时跳过来自源的下一次发射的运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44887844/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com