gpt4 book ai didi

rxjs6 - RXJS : Observable stream piped to groupBy() followed by concatMap(); data for subsequent keys lost

转载 作者:行者123 更新时间:2023-12-04 18:19:33 27 4
gpt4 key购买 nike

我正在尝试使用 RxJS groupBy运算符后跟 concatMap根据某些键将记录收集到各个组中。

我注意到当concatMap关注 groupBy运算符,它似乎丢失了第一个之后出现的所有键的数据。

例如:

考虑以下代码块:

// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
groupBy(x => x.substr(2,1)),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
//
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3

但是,当我使用 concatMap运营商本身,它的行为符合预期。
// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();

const result = clicks.pipe(
concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));

// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3

阅读 RxJS 的文档 groupBy concatMap 没有为我提供任何关于这里可能发生的事情的线索。而关于 RxJS concatMap 的部分在 reactivex.io让我相信这应该有效。

谁能帮我理解这里的第一个场景发生了什么?我怎样才能让第一个场景起作用?

最佳答案

我终于似乎弄清楚了这里的问题所在。

在上述问题的场景 #1 中,代码首先将源流通过管道传输到 groupBy运算符,后跟 concatMap运算符(operator)。而这种运营商的组合似乎导致了这个问题。
groupBy 的内部结构和 mergeMap
通读the code for the groupBy operator ,我意识到 groupBy在内部创建一个新的 Subject在源流中找到的每个键的实例。属于该键的所有值都会立即由该 Subject 发出。实例。

所有Subject实例被包装到 GroupedObservale s 并由 groupBy 向下游发出运算符(operator)。 GroupedObservable 这个流实例是 concatMap 的输入运算符(operator)。
concatMap运算符(operator)在内部调用 mergeMap concurrency 的值为 1 的运算符,这意味着只有一个源 observable 被同时订阅。
mergeMap运算符仅订阅一个可观察对象,或 conccurency 允许的尽可能多的可观察对象参数,并将所有其他可观察对象保存在“缓冲区”中,直到第一个完成。

这如何产生问题?

首先,既然我已经阅读了这些运算符的代码,我不太确定这是否是一个“问题”。

尽管如此,我在问题中描述的行为还是发生了,因为 groupBy运算符使用相应的 Subject 发出单个值立即实例,mergeMap运营商不会订阅该特定 Subject .因此,源流中使用 Subject 发出的所有值丢失了。

我试图用一个粗略的大理石图来说明这个问题:
groupBy to concatMap issue

这不是这些运算符的工作方式的“问题”,而是我理解这些运算符的方式以及可能的文档(特别是 concatMap 的文档,这对于 RxJS 的新手来说可能有点困惑)。

这可以通过获取 groupBy 轻松解决。运算符使用 ReplaySubject而不是 Subject发出分组的值。 groupBy接受 subjectSelector允许我们切换Subject的参数带有 ReplaySubject 的实例实例。

以下代码有效:

// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();

// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }

为什么方案 2 有效?

我的问题中的场景 2 工作正常,因为 interval只是创建一个 Observable 但不开始发射值。因此,当 mergeMap 时,该 Observable 的所有值都可用。终于订阅了。

关于rxjs6 - RXJS : Observable stream piped to groupBy() followed by concatMap(); data for subsequent keys lost,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52562029/

27 4 0