gpt4 book ai didi

javascript - RX : unfold array to multiple streams

转载 作者:行者123 更新时间:2023-11-29 15:39:27 27 4
gpt4 key购买 nike

我有一个包含数组的流,其中的每个元素都有一个 id。我需要将其拆分为每个 ID 的流,当源流不再携带该 ID 时,它将完成。

例如具有这三个值的输入流序列

[{a:1}, {b:1}]    [{a:2}, {b:2}, {c:1}]     [{b:3}, {c:2}]

应该返回三个流

a -> 1 2 |
b -> 1 2 3
c -> 1 2

a 在第 3 个值上完成,因为它的 id 已经消失,而 c 在第 2 个值上创建,因为它的 id 已经出现。

我在尝试groupByUntil,有点像

 var input = foo.share();              
var output = input.selectMany(function (s) {
return rx.Observable.fromArray(s);
}).groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return input.filter(
function (s) { return !findkey(s, g.key); }
); }
)

因此,按 id 进行分组,并在输入流不再具有该 id 时处理该组。这似乎可行,但输入的两种用法对我来说看起来很奇怪,就像在使用单个流来控制 groupByUntil 的输入和组的处置时可能存在奇怪的顺序依赖性。

有没有更好的办法?

更新

这里确实存在一个奇怪的时间问题。 fromArray 默认使用 currentThread 调度程序,这将导致来自该数组的事件与来自输入的事件交错。然后在错误的时间评估组的处置条件(在处理先前输入的组之前)。

一个可能的解决方法是执行 fromArray(.., rx.Scheduler.immediate),这将使分组的事件与输入保持同步。

最佳答案

是的,我能想到的唯一选择就是自己管理状态。不过我不知道这样更好。

var d = Object.create(null);
var output = input
.flatMap(function (s) {
// end completed groups
Object
.keys(d)
.filter(function (k) { return !findKey(s, k); })
.forEach(function (k) {
d[k].onNext(1);
d[k].onCompleted();
delete d[k];
});
return Rx.Observable.fromArray(s);
})
.groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return d[g.key] = new Rx.AsyncSubject(); });

关于javascript - RX : unfold array to multiple streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21868213/

27 4 0