- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 RxJS groupBy
运算符后跟 concatMap
根据某些键将记录收集到各个组中。
我注意到当concatMap
关注 groupBy
运算符,它似乎丢失了第一个之后出现的所有键的数据。
例如:
考虑以下代码块:
// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1)),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
//
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3
concatMap
运营商本身,它的行为符合预期。
// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();
const result = clicks.pipe(
concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));
// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3
groupBy
和
concatMap
没有为我提供任何关于这里可能发生的事情的线索。而关于 RxJS
concatMap
的部分在
reactivex.io让我相信这应该有效。
最佳答案
我终于似乎弄清楚了这里的问题所在。
在上述问题的场景 #1 中,代码首先将源流通过管道传输到 groupBy
运算符,后跟 concatMap
运算符(operator)。而这种运营商的组合似乎导致了这个问题。groupBy
的内部结构和 mergeMap
通读the code for the groupBy
operator ,我意识到 groupBy
在内部创建一个新的 Subject
在源流中找到的每个键的实例。属于该键的所有值都会立即由该 Subject
发出。实例。
所有Subject
实例被包装到 GroupedObservale
s 并由 groupBy
向下游发出运算符(operator)。 GroupedObservable
这个流实例是 concatMap
的输入运算符(operator)。concatMap
运算符(operator)在内部调用 mergeMap
concurrency
的值为 1 的运算符,这意味着只有一个源 observable 被同时订阅。mergeMap
运算符仅订阅一个可观察对象,或 conccurency
允许的尽可能多的可观察对象参数,并将所有其他可观察对象保存在“缓冲区”中,直到第一个完成。
这如何产生问题?
首先,既然我已经阅读了这些运算符的代码,我不太确定这是否是一个“问题”。
尽管如此,我在问题中描述的行为还是发生了,因为 groupBy
运算符使用相应的 Subject
发出单个值立即实例,mergeMap
运营商不会订阅该特定 Subject
.因此,源流中使用 Subject
发出的所有值丢失了。
我试图用一个粗略的大理石图来说明这个问题:
这不是这些运算符的工作方式的“问题”,而是我理解这些运算符的方式以及可能的文档(特别是 concatMap
的文档,这对于 RxJS 的新手来说可能有点困惑)。
这可以通过获取 groupBy
轻松解决。运算符使用 ReplaySubject
而不是 Subject
发出分组的值。 groupBy
接受 subjectSelector
允许我们切换Subject
的参数带有 ReplaySubject
的实例实例。
以下代码有效:
// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();
// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
interval
只是创建一个 Observable 但不开始发射值。因此,当
mergeMap
时,该 Observable 的所有值都可用。终于订阅了。
关于rxjs6 - RXJS : Observable stream piped to groupBy() followed by concatMap(); data for subsequent keys lost,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52562029/
给定输入: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 将数字按奇数或偶数分组,然后按小于或大于 5 分组。 预期输出: [[1, 3, 5], [2, 4], [6, 8, 10
编辑: @coldspeed、@wen-ben、@ALollz 指出了我在字符串 np.nan 中犯的新手错误。答案很好,所以我不删除这个问题来保留那些答案。 原文: 我读过这个问题/答案 What'
我试图概括我提出的问题 here . mlb 数据框看起来像 Player Position Salary Year 0 Mike Wit
我认为我不需要共享整个数据框,但基本上,这是有问题的代码行(当然,已经导入了 pandas) divstack = df[df['Competitor']=='Emma Slabach'].group
我面临下一个问题:我有组(按 ID),对于所有这些组,我需要应用以下代码:如果组内位置之间的距离在 3 米以内,则需要将它们添加在一起,因此将创建一个新组(代码如何创建我在下面显示的组)。现在,我想要
我有以下数据: ,dateTime,magnitude,occurrence,dateTime_s 1,2017-11-20 08:00:09.052260,12861,1,2017-11-20 08
我按感兴趣的列对 df 进行分组: grouped = df.groupby('columnA') 现在我只想保留至少有 5 名成员的组: grouped.filter(lambda x: len(x
数据是一个时间序列,许多成员 ID 与许多类别相关联: data_df = pd.DataFrame({'Date': ['2018-09-14 00:00:22',
选择 u.UM_TOKEN_NO 、u.UM_FULLNAME、u.SECTOR、u.department_name、t.TS_PROJECT_CODE、sum(t.TS_TOTAL_HRS) 来自
我有这两个表: +---------------+-------------+---------------------+----------+---------+ | items_ordered |
我正在使用 groupby 和 sum 快速汇总两个数据集 一个包含: sequence shares 1 100 2 200 3 50 1 2
这个问题在这里已经有了答案: list around groupby results in empty groups (3 个答案) itertools groupby object not out
我有一组行,我想按标识符的值进行分组 - 存在于每一行中 - 然后对将作为结果的组进行进一步的隔离处理。 我的数据框是这样的: In [50]: df Out[50]: groupkey b
假设您要在全局范围内销售产品,并且希望在某个主要城市的某个地方设立销售办事处。您的决定将完全基于销售数字。 这将是您的(简化的)销售数据: df={ 'Product':'Chair', 'Count
我有一个将数据分组两次的查询: var query = (from a in Context.SetA() from b in Context.SetB().Where(x => x.aId == a
我有一个这种格式的数据框: value identifier 2007-01-01 0.087085 55 2007-01-01 0.703249
这个问题在这里已经有了答案: python groupby behaviour? (3 个答案) 关闭 4 年前。 我有一个这样的列表 [u'201003', u'200403', u'200803
在 Python 中,我可以使用 itertools.groupby 将具有相同键的连续元素分组。 : >>> items = [(1, 2), (1, 5), (1, 3), (2, 9), (3,
无法翻译以下 GroupBy 查询并将引发错误:不支持客户端 GroupBy IEnumerable ids = new List { 1, 2, 3 }; var q = db.Comments.W
考虑一个 Spark DataFrame,其中只有很少的列。目标是对其执行 groupBy 操作,而不将其转换为 Pandas DataFrame。等效的 Pandas groupBy 代码如下所示:
我是一名优秀的程序员,十分优秀!