- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source
的背压.
请参阅此测试代码:
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test
class MonixBackpressureWithGroupByTest2 {
@Test
def test(): Unit = {
val source = Observable.range(0,130)
val backPressuredStream = source.map(x => {
println("simple log first map - " + x)
x
})
.asyncBoundary(OverflowStrategy.BackPressure(5))
.map { i =>
println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
((i % 3) toString) -> i
}
.groupBy{case (k, v) => k}
.flatMap(x => {
val mapWithSleep = x.map{case groupedMsg@(key, value) =>
Thread.sleep(2000)
println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
groupedMsg
}
mapWithSleep
})
backPressuredStream.share.subscribe(
(keyAndValue: (String, Long)) => Continue
)
global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
println("========sleep 1 second ============")
})
Thread.currentThread().join()
}
}
...
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...
sleep 2 second for every message ...
背压给三项
after backpressure map - ...
sleep 2 second for every message ...
与
after backpressure map - ...
有一对一的关系在背压方面?
sleep 2 second for every message
的日志输出
(0, 72), (0, 75), (0,78)
但是这样的事情
(0, 72), (1, 73), (2,74)
?
"io.monix" %% "monix" % "3.0.0-RC1"
最佳答案
您所看到的行为正是您所期望的。
为了快速总结您的应用程序的作用,让我用我的话解释一下:
您有一个 Observable
生成数字,并对每个元素做一些副作用。
接下来,按 _ % 3
对元素进行分组.
接下来,您在每个组的 Observable
内执行更多副作用( sleep 和写入控制台)。 .
那么,你flatMap
各组的Observable
,导致一个单一的,平坦的 Observable
.
那么为什么您一开始只看到第一组(其中 _ % 3 == 0
)将内容打印到控制台? ***
答案在flatMap
: 当看documentation为 Observable
,您会发现以下对 flatMap
的描述:
final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Alias for concatMap.
[...]
Observable
就像你会想到的
List
s 一秒钟:当您连接时
List
s,你最终会得到一个
List
首先包含第一个
List
的元素,后跟第二个
List
的元素, 等等。
Observable
实现了相同的行为。通过等待第一个
Observable
在
flatMap
内生产(阅读:
concatMap
)操作发送“完成” - 信号。只有这样,第二个
Observable
被消耗等等。
flatMap
关心产生的序列 Observable
s。
Observable
s 在您的
flatMap
操作“完成”?为此我们必须了解如何
groupBy
有效 - 因为那是它们的来源。
groupBy
工作虽然
Observable
s 被懒惰地评估,它必须将传入的元素存储在缓冲区中。我对此不是 100% 确定,但如果
groupBy
对于任何分组
Observable
就像我认为的那样工作拉下一个元素,通过原来的
Observable
无限期直到它找到属于该组的元素,将属于该缓冲区中其他组的所有先前(但不是必需的)元素保存以备后用。
groupBy
在源
Observable
之前无法知道是否已找到组的所有元素信号完成,然后它将使用所有剩余的缓冲元素,然后将完成信号发送到分组
Observable
s。
Observable
s 由 groupBy
制作直到来源Observable
才完成完成。
Observable.range(0, 130)
)完成时,第一个分组
Observable
也将完成,因为
flatMap
只有然后所有其他分组
Observable
s 将被使用。
flatMap
构建一个网络套接字。是个坏主意 - 您的来源
Observable
传入的请求永远不会完成,有效地只为您将遇到的第一个 IP 地址提供服务。
mergeMap
. 与
concatMap
相比时
mergeMap
不关心元素的顺序,而是“先到先得” - 规则适用。
groupBy
和
flatMap
有效,你会明白我为什么写“一开始”!
关于scala - Monix 如何通过 flatMap 运算符使用背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55843648/
我想为 monix.reactive.Observable 编写一个拆分函数.它应该拆分一个源 Observable[A]变成一对新的 (Observable[A], Observable[A]) ,
我正在尝试理解 Monix 中的任务调度原则。以下代码(来源:https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3)按预期仅生成
我正在尝试使用 monix 来并行化某些操作,然后执行错误处理 假设我正在尝试解析和验证这样的几个对象 def parseAndValidateX(x: X) Task[X] 和 def parseA
我正在使用 monix 任务,我正在 try catch Throwable,然后将其转换为自定义错误。我已删除/更改代码以使其简单且相关。这是代码(问题跟在代码片段之后): import io.ne
我正在使用 monix 任务,我正在 try catch Throwable,然后将其转换为自定义错误。我已删除/更改代码以使其简单且相关。这是代码(问题跟在代码片段之后): import io.ne
我正在尝试使用 monix 3.0.0-RC1 构建响应式应用程序。 比如a有一个Int的Seq,第二个元素是错误的。我可以使用 Oservable.raiseError(...) 来处理这个问题:
我尝试执行拆分单 Observable在 Monix 中按键,然后分组到最后 n每个 GrouppedObservable 中的事件并将其发送以进行进一步处理。问题是要分组的键的数量可能是无限的,这会
Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source 的背压. 请参阅此测试代码: import java
我有一个分页资源,我想用 Monix 递归地使用它。我想要一个 Observable,它将发出下载的元素并递归地使用页面。这是一个简单的例子。它当然不起作用。它发出第一页,然后是第一页 + 第二页,然
我目前正在致力于实现对 API 的客户端 http 请求,并决定为此任务探索 sttp 和 monix。由于我是 Monix 的新手,我仍然不确定如何运行任务并检索它们的结果。我的目标是获得一系列 h
我是一名优秀的程序员,十分优秀!