gpt4 book ai didi

javascript - 根据流中的值暂停 RxJS 流

转载 作者:太空狗 更新时间:2023-10-29 16:54:29 25 4
gpt4 key购买 nike

我有一个简单的组件,带有一个按钮,用于启动和暂停由 RxJS 计时器生成的数字流。

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
selector: 'my-app',
template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
active$ = new BehaviorSubject<boolean>(true);

ngOnInit(): void {
const on$ = this.active$.pipe(filter(v => v));
const off$ = this.active$.pipe(filter(v => !v));

const stream$ = timer(500, 500).pipe(share());

const out$ = merge(
stream$.pipe(
bufferToggle(off$, () => on$),
mergeAll(),
),
stream$.pipe(
windowToggle(on$, () => off$),
mergeAll(),
),
);

out$.subscribe(v => console.log(v));
}

toggle(): void {
this.active$.next(!this.active$.value);
}
}

enter image description here

这很完美,但我需要再添加一项功能!

我需要根据流中满足条件的值自动暂停流。

例如,如果最新值是 5 的倍数,则暂停流。


你有什么想法吗?

这是一个关于 stackblitz 的可运行示例 https://stackblitz.com/edit/angular-6hjznn

最佳答案

可以 (1) 扩展当前的 bufferToggle/windowToggle 方法或 (2) 使用自定义缓冲区实现。

1。扩展 bufferToggle/windowToggle 方法

您可以在 bufferToggle 之后向运算符队列添加一个数组。

  1. bufferToggle 发出时,将这些值追加到数组中。
  2. 从数组中取值,直到数组中的某个元素匹配停止条件。
  3. 发出这些值并暂停您的流。

可暂停(Demo)

pausable 运算符将发出与暂停条件匹配的值,然后立即停止流。

export function pausable<T, O>(
on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values
off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
let buffer: T[] = [];
return merge(
source.pipe(
bufferToggle(off$, () => on$),
tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
),
source.pipe(
windowToggle(on$, () => off$),
mergeMap(x => x),
tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
),
);
});
}

您可以根据您的特定需求调整此运算符,例如使用较少的输入参数并将 share 合并到其中,参见 this version with less parameters .

用法

active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));

interval(500).pipe(
share(),
pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

2。完全自定义的缓冲区

您可以采用一种完全自定义的方法,仅使用一个类似于 Brandon's approach 的输入可观察对象.

bufferIf ( Demo )

bufferIf 将在给定的 condition 发出 true 时缓冲传入值,并在 时发出缓冲区中的所有值或传递新值code>conditionfalse

export function bufferIf<T>(condition: Observable<boolean>) {
return (source: Observable<T>) => defer(() => {
const buffer: T[] = [];
let paused = false;
let sourceTerminated = false;
return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
condition.pipe(map(v => [v, 1]))
).pipe( // add values from the source to the buffer or set the paused variable
tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean),
switchMap(_ => new Observable<T>(s => {
setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
while (buffer.length > 0 && !paused) s.next(buffer.shift())
}, 0)
})), // complete the stream when the source terminated and the buffer is empty
takeWhile(_ => !sourceTerminated || buffer.length > 0, true)
);
})
}

用法

pause$ = new BehaviorSubject<boolean>(false);

interval(500).pipe(
bufferIf(this.pause$),
tap(value => this.pauseOn(value) ? this.pause$.next(true) : null)
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

关于javascript - 根据流中的值暂停 RxJS 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56747633/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com