gpt4 book ai didi

javascript - 在 RxJS 中,如何在没有主题的情况下直接链接 pub 子系统中的生产者和消费者?

转载 作者:行者123 更新时间:2023-12-03 07:12:24 25 4
gpt4 key购买 nike

编辑

我认为这个问题的实现相当复杂,到现在3个月了还没有弄清楚。我在另一个问题中重新表述了它:RxJS5 - How can I cache the last value of a aggregate stream, without using Subjects and whilst excluding those that have completed? ,希望它能更简洁地总结我所追求的内容。谢谢大家的帮助!

原版

我现在已经学会了如何通过扫描将一系列部分应用的状态操作函数映射到我的状态来处理 RxJS 中的状态。然而,我现在想更进一步,通过数据平面(pubsub 层)在“生产者”和“消费者”之间以声明方式连接流,而不通过主题对它们进行管道传输。

网上有很多教程对此进行了介绍,但它们基本上都只是在每个流名称或 channel 的中间层中强制使用 publish 作为Subject.next。这是我当前的实现,但它需要为每个永不结束的流创建一个 ReplaySubject(1) (用于缓存迟到的消费者的值),因为任何获取该流的消费者都会收到一个引用,如果主题被删除,该引用将无效当当前没有制作人流向该名称的 channel 时。

我想直接连接流,并让消费者接收特定名称的所有事件已发布流的聚合流。这仍然需要一个主题在初始注册生产者流中进行管道传输(生产者流的传入流,格式为 {name, stream})。

然后,我想将所有同名流合并为一个流,每个注册消费者接收该流作为引用(想象一个过滤器服务接收对 filter.add 的引用,它是以下内容的合并)所有活跃的生产者创建过滤器) - 但如果具有相同名称的新生产者也注册了,则以 react 方式重新合并该流,并且消费者到该流的链接仍然有效。任何迟到的消费者还应该收到该聚合流的最后缓存值。

通过这种方式,每次在 pubsub 层上公开新流时,都需要动态重新评估每个聚合流,因此将流包装在“getActive”函数(如 here )中不起作用,因为这样是必要的,并且仅在首次获取流时发生一次,而不是在每次发布新流时为所有消费者重新评估。

结果应该是一个流:

  • 永远不会完成;
  • 聚合特定名称的所有事件流的结果,并丢弃不再事件的流;
  • 延迟更新已收到对该流的引用的所有使用者,这样,如果发布了同名的新流,使用者的引用仍然有效,并会重新评估以合并到该新流中;
  • 缓存最后的合并结果,以便新的消费者将收到订阅时最后发出的值。

基本上,我需要“trimToActiveOnly”函数。

function getStream(all$, name) {
return all$
.filter(x => x.name === name)
.map(x => x.stream)
.map(trimToActiveOnly) // in this way, should be dynamically re-evaluated for all
// consumers every time new stream is published or stream ends,
// not just run once when the stream is first 'got'
.flatMapLatest(x => Rx.Observable.merge(x)) // where x is all currently active streams for a particular name, with finished/errored ones discarded
.publish().refCount(); //this is re-evaluated when a new stream is published or when one of the aggregated streams concludes. So the aggregate stream itself never concludes but may go cold if nothing is subscribed.
}

// desired behavior as followed
const publishStream$ = new Rx.Subject();

const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');

const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');

const fooSub = foo$.subscribe(x => console.log('foo: ' + x)); // should receive cached 'world'
const barSub = bar$.subscribe(x => console.log('bar: ' + x)); // should receive cached 'testing'

publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
barSourceB$.onNext('123'); // barSub should now receive '123' as the aggregated active streams are dynamically re-evaluated on each publish of a new stream!

我也有一个这样的 JSBin here .

最佳答案

虽然不完全相同,但与Rx: a zip-like operator that continues after one of the streams ended?有相似之处

您有一个 Observable<{name, <Observable>}> 。您可以申请filter从而将其减少为仅满足特定条件的子流。然后您可以使用map要排除名称,请访问 Observable<Observable> ,然后您可以使用任何您喜欢的归约来组合这些值(例如 zip 按索引进行匹配, combineLatest 在子流产生值时重新发出某种聚合,或者 flatMapLatest 进行展平到单个流中)。我想您会想要使用类似 active 的东西链接中的转换,因为您需要将已完成的流踢出,因为它们会干扰许多组合方法。

例如,考虑以下方法:

function getStream(all$, name) {
return active(all$.filter(x => x.name === name).map(x => x.stream))
.flatMapLatest(x => Rx.Observable.merge(x));
}

从这里,您可以像这样进行设置:

const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');

然后您可以订阅:

const fooSub = foo$.subscribe(x => console.log('foo: ' + x));
const barSub = bar$.subscribe(x => console.log('bar: ' + x));

您可以推送数据(您不会真正使用主题,这只是示例):

const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
barSourceB$.onNext('123');

如果我正确理解了你想要什么,这就能完成工作。为了解释这里发生了什么,getStream获取包含名称和流的对象流,仅筛选具有指定名称的对象,丢弃名称并仅保留流,然后通过 active 运行它。来自链接答案的函数,其功能类似于扫描缩减。结果将是流数组的流,我们随后使用 flatMapLatest 将其展平为单个流。和merge 。这意味着给定名称的所有流将聚合到一个流中,可以根据需要订阅该流。

这是整个工作过程: https://jsbin.com/liwiga/2/edit?js,console,output

需要注意的是使用 flatMapLatestmerge输出为active如果输入冷的可观察值,这样的方法将无法正常工作(因为每次发出事件可观察值数组时,订阅都会重新开始)。

关于javascript - 在 RxJS 中,如何在没有主题的情况下直接链接 pub 子系统中的生产者和消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36579699/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com