gpt4 book ai didi

RxJS 缓冲并延迟直到另一个可观察对象第一次发出

转载 作者:行者123 更新时间:2023-12-05 02:08:29 26 4
gpt4 key购买 nike

我很难制作一个复杂的可观察管道,如果有人能帮助我,我将不胜感激……

上下文

我有一个数据流,通过蓝牙给我一些值,这些值是我必须解码的数据帧。

这是一个名为 RX$ 的 BehaviorSubject。

现在在 RX$ 上,我有时会收到即时数据 (INST),有时会收到历史数据 (HIST)。使用 INST,我除其他外还收到发送数据版本和型号的设备。我成功生成了一个 observable,它能够计算出一个包含设备版本和型号的 JSON 对象,只要两者都没有,它就不会发出,我们称它为 deviceVersionModelStream$

现在在另一边,我在一个流中批量接收 HIST 数据帧,我们将调用 historyStream$,因为有很多数据,我使用了 bufferTime(2000) 制作一个数据数组并依赖于我的嵌入式数据库批量插入(而不是一个接一个)。

到现在为止这一切都很好......

新用例

现在我的客户添加了一条新规则,他们有一个旧设备类型,无法为我提供特定案例的一些数据,但使用相同的模式我知道它还能给我什么。

因此,在解码帧并插入数据库之前,我需要有设备版本和型号。

我的问题是,只要 deviceVersionModelStream$ 发出一次(这是一个在其他地方也使用过的 HOT),我怎么能延迟 historyStream$ 的出现?发生时,我想生成某种具有原始框架和版本/模型的 JSON 对象。

但也逐渐发送此信息,以免像我的 bufferTime(2000) 之前那样淹没我的数据库批量插入?

我正在尝试使用缓冲区、mergeMap、延迟,但我很难实现这个……

也许 RX 强者可以帮助我?

非常感谢

最佳答案

这篇文章有点旧,但我最近在寻找“缓冲区 rxjs 可观察值直到另一个事件发生”时偶然发现了它,我想我会分享更新版本的样子。

我在 https://thinkrx.io 上构建了这个.这是我使用的代码:

const { rxObserver } = require('api/v0.3');
const { timer, of, combineLatest, concat } = require('rxjs');
const { delay, take, share, buffer, mergeAll, bufferTime, filter, takeUntil, skipUntil } = require('rxjs/operators');



const historyStream$ = timer(0, 10).pipe(
share(), take(10)
);

const versionModel$ = of("A").pipe(
delay(50),
take(1)
);

historyStream$.subscribe(rxObserver('History Stream'));
versionModel$.subscribe(rxObserver('Version Model'));

combineLatest([
versionModel$,
concat([
historyStream$.pipe(buffer(versionModel$)),
historyStream$.pipe(skipUntil(versionModel$))
]).pipe(
mergeAll()
)]
).subscribe(rxObserver("Buffer Window"));

这是输出:

Marble diagram of solution

这里发生的事情是我们将两个可观察对象传递给 concat:一个代表缓冲的事件集,另一个代表事件进入时的流。我们最终通过管道将所有将其合并到 mergeAll 中以获得适当的效果。

关于RxJS 缓冲并延迟直到另一个可观察对象第一次发出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60854222/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com