- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我对 ConnectableObservable
的用例感到好奇,并认为将昂贵的排放从冷可观察对象(如从数据库查询)转换为热排放可能会有所帮助。这样就可以避免昂贵的重播,并且可以将一组发射推送给所有运营商和订阅者。
然而,经过一些思想实验后,我担心 flatMaps 中的自引用可能会导致问题。
例如,假设我通过 ConnectableObservable
发出值 1 到 10。但是我用flatMap()
将每个值加到所有值的总和上,然后减去当前值。
ConnectableObservable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.publish();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
source.connect();
我希望我能得到这个输出。
1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45
但是我得到了这个。
1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10
正如我担心的那样,flatMap()
看起来需要重放值,因为它无法处理源的热顺序性质。因此,如果我使用 cache()
运算符,则一切正常,因为缓存值将为每个 flatMap()
运算符重放。
Observable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.cache();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
这些是我的问题:
ConnectableObservable
进程到底发生了什么?它看起来是确定性的,那么它是如何得出这些值的呢?
可以肯定地说 ConnectableObervable
在任何使用它的运算符中对自引用都是危险的吗?在这些情况下,cache()
应该是首选的热门运算符吗?
最佳答案
What exactly happened with this ConnectableObservable process? It looks to be deterministic so how did it come up with those values?
这种设置是不直观的,但会发生这样的情况,即在创建各自的起始值之前,内部和不存在,并且它们中的每一个在创建后只能看到原始序列的一个元素。例如,对于 1,内部求和将仅获取从 2 到 10 的事件。
Is it safe to say that ConnectableObervable can be dangerous to self-reference in any operators that use it? And cache() should be the go-to hot operator in these circumstances?
问题不在于 ConnectableObservable
,而在于 publish
,它对时间敏感且对 Subscriber
敏感:谁在那里接收事件,谁不在那里不会有任何追溯。
关于java - ConnectableObservable 与 flatMap() 自引用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33612564/
我有一个对象Foo,其中包含Bar列表。这些类的描述如下: class Foo { String name; List bars = new ArrayList(); Foo(
根据 Mozilla 开发者网站: flatMap() 方法首先使用映射函数映射每个元素,然后将结果展平到一个新数组中。它与 map 后跟深度为 1 的 flat 相同,但 flatMap 通常非常有
我对无法找到该问题的现有问题感到非常惊讶。这是为什么,鉴于: val p: Int => Option[Int] = Some(_) List(1, 2, 3).flatMap(p) 我得到: :14
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 5 年前。 Improve th
我想知道两种平面映射情况之间是否存在显着差异。 案例 1: someCollection .stream() .map(CollectionElement::getAnotherCol
以下是flatMap的定义取自 scala.util.Success。 final case class Success[+T](value: T) extends Try[T] { def fl
我正在寻找一个函数来展平列表数组。首先,我在 RDD 系统上使用 Apach Spark 函数 flatMap 实现了我的解决方案,但我想在本地执行此操作。但是,我无法找到 的等价物 samples
我想知道是否存在忽略 flatMap 中的结果的函数(在 scala 或 cat 中) .例如。 Some("ignore this").ignoreArgumentFlatMap(Some("res
我正在学习 Scala 并解决了 99 个 Scala 问题。对于以下练习: 展平嵌套列表结构。示例: scala> flatten(List(List(1, 1), 2, List(3, List(
当编译器进入无限循环时,是否有人遇到过使用此类 flatMap 链(或什至更长)的问题。 let what = Future.init { (promise) in promise(.succ
有没有更好的函数方式来写 flatMap ? def flatMap[A,B](list: List[A])(f: A => List[B]): List[B] = list.map(x =>
我试图从两个 中变出笛卡尔积潜在无限然后我通过 limit() 限制的流. 到目前为止,这(大约)是我的策略: @Test void flatMapIsLazy() { Stream.
为什么以下声明对 .map() 有效但不适用于 .flatMap() ? val tupled = input.map(x => (x*2, x*3)) //Compilation error:
我正在寻找可以同时映射和展平 Lists 和 Maybes 的代码。我在 this topic 中发现了这样一个 flatMap 函数: flatMap :: (t -> [a]) -> [t] ->
考虑在某些大小写匹配上编写的 flatMap。例如: list.flatMap( v => v match { case Cond1 => if(something) Some
我无法使用ListKOf平面映射T -> Option。 例如 listOf(1,2,3).k().flatMap { i -> if (i % 2 == 0) Some(i) else None
有人可以解释我如何在RxJava中通过flatMap运算符传递onComplete信号吗? 如果对flatMap运算符进行注释,则可以获取1到10的数字列表,这意味着toList将收到onComple
我正在做一个在线类(class)并误读了一个问题(这就是为什么我认为可以发布这个问题,因为答案与类(class)中的问题无关!)。 data class Trip( val drive
给定作为数据类的二维坐标列表 data class Point(val x: Int, val y:Int) val points: List 和 TornadoFX(Kotlin 中的 JavaFX
这个问题已经有答案了: What is the difference between .stream() and Stream.of? (5 个回答) 已关闭 3 年前。 我有以下代码: List p
我是一名优秀的程序员,十分优秀!