- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我认为这个问题的实现相当复杂,到现在3个月了还没有弄清楚。我在另一个问题中重新表述了它:RxJS5 - How can I cache the last value of a aggregate stream, without using Subjects and whilst excluding those that have completed? ,希望它能更简洁地总结我所追求的内容。谢谢大家的帮助!
我现在已经学会了如何通过扫描将一系列部分应用的状态操作函数映射到我的状态来处理 RxJS 中的状态。然而,我现在想更进一步,通过数据平面(pubsub 层)在“生产者”和“消费者”之间以声明方式连接流,而不通过主题对它们进行管道传输。
网上有很多教程对此进行了介绍,但它们基本上都只是在每个流名称或 channel 的中间层中强制使用 publish
作为Subject.next。这是我当前的实现,但它需要为每个永不结束的流创建一个 ReplaySubject(1) (用于缓存迟到的消费者的值),因为任何获取该流的消费者都会收到一个引用,如果主题被删除,该引用将无效当当前没有制作人流向该名称的 channel 时。
我想直接连接流,并让消费者接收特定名称的所有事件已发布流的聚合流。这仍然需要一个主题在初始注册生产者流中进行管道传输(生产者流的传入流,格式为 {name, stream}
)。
然后,我想将所有同名流合并为一个流,每个注册消费者接收该流作为引用(想象一个过滤器服务接收对 filter.add
的引用,它是以下内容的合并)所有活跃的生产者创建过滤器) - 但如果具有相同名称的新生产者也注册了,则以 react 方式重新合并该流,并且消费者到该流的链接仍然有效。任何迟到的消费者还应该收到该聚合流的最后缓存值。
通过这种方式,每次在 pubsub 层上公开新流时,都需要动态重新评估每个聚合流,因此将流包装在“getActive”函数(如 here )中不起作用,因为这样是必要的,并且仅在首次获取流时发生一次,而不是在每次发布新流时为所有消费者重新评估。
结果应该是一个流:
基本上,我需要“trimToActiveOnly”函数。
function getStream(all$, name) {
return all$
.filter(x => x.name === name)
.map(x => x.stream)
.map(trimToActiveOnly) // in this way, should be dynamically re-evaluated for all
// consumers every time new stream is published or stream ends,
// not just run once when the stream is first 'got'
.flatMapLatest(x => Rx.Observable.merge(x)) // where x is all currently active streams for a particular name, with finished/errored ones discarded
.publish().refCount(); //this is re-evaluated when a new stream is published or when one of the aggregated streams concludes. So the aggregate stream itself never concludes but may go cold if nothing is subscribed.
}
// desired behavior as followed
const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');
const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
const fooSub = foo$.subscribe(x => console.log('foo: ' + x)); // should receive cached 'world'
const barSub = bar$.subscribe(x => console.log('bar: ' + x)); // should receive cached 'testing'
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
barSourceB$.onNext('123'); // barSub should now receive '123' as the aggregated active streams are dynamically re-evaluated on each publish of a new stream!
我也有一个这样的 JSBin here .
最佳答案
虽然不完全相同,但与Rx: a zip-like operator that continues after one of the streams ended?有相似之处
您有一个 Observable<{name, <Observable>}>
。您可以申请filter
从而将其减少为仅满足特定条件的子流。然后您可以使用map
要排除名称,请访问 Observable<Observable>
,然后您可以使用任何您喜欢的归约来组合这些值(例如 zip
按索引进行匹配, combineLatest
在子流产生值时重新发出某种聚合,或者 flatMapLatest
进行展平到单个流中)。我想您会想要使用类似 active
的东西链接中的转换,因为您需要将已完成的流踢出,因为它们会干扰许多组合方法。
例如,考虑以下方法:
function getStream(all$, name) {
return active(all$.filter(x => x.name === name).map(x => x.stream))
.flatMapLatest(x => Rx.Observable.merge(x));
}
从这里,您可以像这样进行设置:
const publishStream$ = new Rx.Subject();
const foo$ = getStream(publishStream$, 'foo');
const bar$ = getStream(publishStream$, 'bar');
然后您可以订阅:
const fooSub = foo$.subscribe(x => console.log('foo: ' + x));
const barSub = bar$.subscribe(x => console.log('bar: ' + x));
您可以推送数据(您不会真正使用主题,这只是示例):
const fooSourceA$ = new Rx.Subject();
const fooSourceB$ = new Rx.Subject();
const barSourceA$ = new Rx.Subject();
const barSourceB$ = new Rx.Subject();
publishStream$.onNext({ name: 'foo', stream: fooSourceA$ });
publishStream$.onNext({ name: 'foo', stream: fooSourceB$ });
publishStream$.onNext({ name: 'bar', stream: barSourceA$ });
publishStream$.onNext({ name: 'bar', stream: barSourceB$ });
fooSourceA$.onNext('hello');
fooSourceA$.onNext('world');
barSourceA$.onNext('testing');
barSourceB$.onNext('123');
如果我正确理解了你想要什么,这就能完成工作。为了解释这里发生了什么,getStream
获取包含名称和流的对象流,仅筛选具有指定名称的对象,丢弃名称并仅保留流,然后通过 active
运行它。来自链接答案的函数,其功能类似于扫描缩减。结果将是流数组的流,我们随后使用 flatMapLatest
将其展平为单个流。和merge
。这意味着给定名称的所有流将聚合到一个流中,可以根据需要订阅该流。
这是整个工作过程: https://jsbin.com/liwiga/2/edit?js,console,output
需要注意的是使用 flatMapLatest
和merge
输出为active
如果输入冷的可观察值,这样的方法将无法正常工作(因为每次发出事件可观察值数组时,订阅都会重新开始)。
关于javascript - 在 RxJS 中,如何在没有主题的情况下直接链接 pub 子系统中的生产者和消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36579699/
Rust 支持 pub和 pub(super) . pub使父模块可以访问项目...和pub(super)似乎也做同样的事情。我试过玩下面的例子,并交换 pub和 pub(super)似乎没有效果:
我正在尝试运行 this拖放 API。当我尝试运行 dart 文件(使用 dart 编辑器)时,出现以下错误: could not start pub serve or connect to pub
根据pub get docs pub get 和 pub Upgrade 之间的主要区别是: If a lockfile already exists, pub get uses the versio
我see函数的可见性可以在模块内声明为 pub(self)。这与没有 pub 属性的私有(private)函数有何不同?如果它们没有区别,为什么会存在这种语法? 最佳答案 pub(restricted
我正在运行 pub install 以便我可以使用作为 dart sdk 一部分的 intl 包(我需要使用 DateFormat 类)。我已经在我的项目中安装了其他包,例如 XML 解析器和演示文稿
假设我在 Rust 包中有这个文件层次结构: src/... src/m1/mod.rs src/m1/path/m2.rs 拥有以下行的实际区别是什么: pub mod path::m2; 在我的文
其中一个比另一个更安全吗? 最佳答案 id_rsa.pub 和 id_dsa.pub 是 id_rsa 和 id_dsa 的公钥。 如果您询问与 SSH 相关的问题,id_rsa 是 RSA key
pub 是 Dart 的包管理器。 Flutter 是一个使用 Dart 的移动应用 SDK。如何创建依赖于或以 Flutter 为目标的包? 最佳答案 要从 pub 包中声明对 Flutter 的依
id_rsa.pub 文件基本上是 Linux 主文件夹下 .ssh 文件夹中的一些加密文本,用于公钥加密。它使用 .pub 文件格式。但为什么这种文件格式也恰好是 Microsoft Publish
使用当前版本的angular.dart.tutorial , 使用 Chapter_04 对于“pub serve”,main.dart.js 是一个 42337 行的文件,示例运行良好。 对于“pu
假设我正在运行我自己的 pub-dartlang 实例对于私有(private)酒吧供稿;如何在 pubspec.yaml 中注明哪些软件包来自私有(private)提要与 pub.dartlang.
我们通过运行脚本来实现DART自动化,该脚本导航到项目文件夹(.yaml文件所在的位置)并运行“pub get”和“pub build”。它工作了一段时间,但现在(两种情况下)我们都得到了: Unha
我刚刚安装了 Dart Editor 并创建了一个简单的 Web 应用程序,当我单击运行时它显示此错误并且没有任何 react 。 “无法启动 pub 服务或连接到 pub” dart editor
当我尝试在 android studio 中打开大部分已下载的 flutter 应用程序时,它一直告诉我“Pub get”尚未运行,当我按下“获取依赖项”或升级依赖项时,它会向我显示该错误。我已经把a
在哪里可以找到用于搜索和列出已安装的打包版本和可用升级版本的命令行工具? 我想不出任何其他的软件包管理工具,它们不包含列出已安装版本或查找可用升级的命令,而无需实际升级。 例如,如果我的pubspec
前两天升级到 Flutter 2.0 开始面对 Execution failed for task ':app:compileFlutterBuildDebug'由于一些包裹。 然后我降级了,但错误仍
下面提供了相关的程序代码。我经常在开源项目中看到这样的代码,可以在Linux和Windows上运行。有人告诉我,这是为了避免编译警告。真的是这样吗? class Base { public:
我有一个带有简单测试代码的函数,例如: exports.helloPubSub = (event, context) => { const message = event.data
无法解析 URL“https://pub.dartlang.org”。错误 (69):无法“发布升级”flutter 工具。五秒后重试...(还剩 9 次尝试) 这是我创建或升级flutter时flu
我正在尝试制作一个脚本来控制农场中的 Android 设备,adb connect 通过 OpenSTF 建立连接。 .设备接受来自客户端的连接的要求之一是在 OpenSTF 中提供 adbkey.p
我是一名优秀的程序员,十分优秀!