gpt4 book ai didi

javascript - RxJS:如何组成一棵响应式观察者树?

转载 作者:行者123 更新时间:2023-11-29 14:44:06 25 4
gpt4 key购买 nike

我正在尝试使用响应式编程来创建一组可重用的组件来组装处理树,例如决策树。

树中的每个节点都可以有子节点。当前节点可以依次将当前流值传递给子节点,并对结果进行一些操作,决定是继续下一个子节点,还是直接返回上游。

       parent
|
+----+-----+
O1 O2 O3
|
+--+--+
C1 C2 C3

假设我有一个 Observable 事件。我想将一组观察者序列应用于事件,并连接结果:

event stream:    --e1---------e2--------e3------------------------->
observer stream: --O1---O2---O3----|-->
vvvvvvvvvvvv ????? vvvvvvvvvvvvvvv
--O1(e1)--O2(e1)--O3(e1)--O1(e2)--O2(e2)--O3(e3)-->

这可能吗?这应该是可组合的,例如,O1 可以由子观察者 [C1, C2, C3] 组成,它会发出如下流:

                 --C1(O1(e1))--C2(O1(e1))--C3(O1(e1))--O2(e1)-->

O1 需要能够阻止其子项发出值。例如,如果 C1 返回失败代码,我们不应该执行 C2。然后 O1 会发出一个值,parent 会决定它是否应该“发出”来自 O1 的结果,或者继续到 O2。

Here is a codepen这演示了我的目的,您需要打开控制台才能看到错误。

我的计划是使用像 takeUntil() 和 takeLast() 这样的方法来执行“继续或返回”逻辑,如下所示:

// apply event to children until one of them returns 'SUCCESS'
nodeStream(childStreams) {
return eventStream // Where does this come from?
.concatMap(childStreams)
.takeWhile(event => event.result !== 'SUCCESS')
.takeLast();
}

我已经达到了这样的程度,但我无法全神贯注于如何连接观察者(O1 等):

function node(eventSource) {
let childOne = new Rx.Subject().map(e => e + '1');
let childTwo = new Rx.Subject().map(e => e + '2');

let children = Rx.Observable.from([childOne, childTwo]);

let resultSource =
// Here be dragons
// How to concatenate the child observers?
eventSource.concatMap(function(e) {
return children;
});

return resultSource;
}

// Expected Output:
// onNext: event1
// onNext: event2
// onComplete
function testEvent() {
// This will be hot in the real world
let eventSource = Rx.Observable.of('event');

let resultSource = node(eventSource);

resultSource.subscribe(
function(x) {
console.log('onNext:', x);
},
function(e) {
console.log('onError:', e);
},
function() {
console.log('onComplete');
}
);
}

我对如何应用 child 感到困惑。我希望能够构建多个节点流的层次结构(参见上面的 ascii 图),所以显然上面的示例不起作用,因为在每个节点的构建时源流和子流都不能存在:

let c1 = nodeStream();
let c2 = nodeStream();
let c3 = nodeStream();
let o1 = nodeStream([c1, c2, c3]);
let o2 = nodeStream();
let o3 = nodeStream();
let parent = nodeStream([o1, o2, o3]);

eventSource.subscribe(parent);

我正在使用 RxJS,但我认为解决方案与所有响应式库都类似。

更新

对于任何好奇的人,我现在通过让我的树中的每个节点返回一个 Promise 来解决这个问题。对于这种类型的结构, promise 模型更容易推理。然后,我用 Rx.Observable.fromPromise()

包裹了整棵树

最佳答案

关于您具体要问的问题(我不太清楚,如果我误解了什么,请原谅我),您的观察者是函数,对吗?

以下内容应该会为您提供接近您想要的内容。

var observerArray = [f1, f2, f3];
return this.eventSource
.concatMap(function(Ex){
return observerArray.map(function (fx){return fx(Ex);});
})

你的输出应该是f1(E1), f2(E1), f3(E1), f1(E2), f2(E2), f3(E2)...

更新:刚刚看到您在问题中添加了详细信息。答案是一样的。发生的事情是你将拥有一个树数据结构而不是一个数组。只要该树数据结构具有遍历树并将所有节点收集到一个数组中的映射函数,您就可以使用相同的代码。

var observerTree = //some object or library that implements a tree structure;
return this.eventSource
.concatMap(function(Ex){
return observerTree.map(function (fx){return fx(Ex);});
})

现在如何编写 map 取决于您的树数据结构如何,但它应该非常简单。

关于javascript - RxJS:如何组成一棵响应式观察者树?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34755761/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com