gpt4 book ai didi

scala - Monix 如何通过 flatMap 运算符使用背压?

转载 作者:行者123 更新时间:2023-12-01 03:06:58 27 4
gpt4 key购买 nike

Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source 的背压.

请参阅此测试代码:

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test


class MonixBackpressureWithGroupByTest2 {
@Test
def test(): Unit = {
val source = Observable.range(0,130)

val backPressuredStream = source.map(x => {
println("simple log first map - " + x)
x
})
.asyncBoundary(OverflowStrategy.BackPressure(5))
.map { i =>

println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
((i % 3) toString) -> i
}
.groupBy{case (k, v) => k}
.flatMap(x => {
val mapWithSleep = x.map{case groupedMsg@(key, value) =>
Thread.sleep(2000)
println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
groupedMsg
}

mapWithSleep

})

backPressuredStream.share.subscribe(
(keyAndValue: (String, Long)) => Continue
)

global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
println("========sleep 1 second ============")
})

Thread.currentThread().join()

}

}

输出:
...

========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...

其中出现一些背压不匹配:
之后: sleep 2 second for every message ...背压给三项 after backpressure map - ...
怎么可以 sleep 2 second for every message ...after backpressure map - ... 有一对一的关系在背压方面?

另一个疑惑:为什么 sleep 2 second for every message 的日志输出 (0, 72), (0, 75), (0,78)但是这样的事情 (0, 72), (1, 73), (2,74) ?

谢谢。

莫尼克斯版本:
"io.monix" %% "monix" % "3.0.0-RC1"

最佳答案

您所看到的行为正是您所期望的。

为了快速总结您的应用程序的作用,让我用我的话解释一下:

您有一个 Observable生成数字,并对每个元素做一些副作用。

接下来,按 _ % 3 对元素进行分组.

接下来,您在每个组的 Observable 内执行更多副作用( sleep 和写入控制台)。 .

那么,你flatMap各组的Observable ,导致一个单一的,平坦的 Observable .

那么为什么您一开始只看到第一组(其中 _ % 3 == 0 )将内容打印到控制台? ***

答案在flatMap : 当看documentationObservable ,您会发现以下对 flatMap 的描述:

final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]

Alias for concatMap.

[...]

想想 Observable就像你会想到的 List s 一秒钟:当您连接时 List s,你最终会得到一个 List首先包含第一个 List 的元素,后跟第二个 List 的元素, 等等。

在 Monix 中,对 Observable 实现了相同的行为。通过等待第一个 ObservableflatMap 内生产(阅读: concatMap )操作发送“完成” - 信号。只有这样,第二个 Observable被消耗等等。

或者,简单地说,flatMap关心产生的序列 Observable s。

但是什么时候做 Observable s 在您的 flatMap操作“完成”?为此我们必须了解如何 groupBy有效 - 因为那是它们的来源。

对于 groupBy工作虽然 Observable s 被懒惰地评估,它必须将传入的元素存储在缓冲区中。我对此不是 100% 确定,但如果 groupBy对于任何分组 Observable 就像我认为的那样工作拉下一个元素,通过原来的 Observable无限期直到它找到属于该组的元素,将属于该缓冲区中其他组的所有先前(但不是必需的)元素保存以备后用。

所有这一切意味着 groupBy在源 Observable 之前无法知道是否已找到组的所有元素信号完成,然后它将使用所有剩余的缓冲元素,然后将完成信号发送到分组 Observable s。

简单来说:Observable s 由 groupBy 制作直到来源Observable才完成完成。

将所有这些信息放在一起时,您会明白只有当源 Observable(您的 Observable.range(0, 130))完成时,第一个分组 Observable也将完成,因为 flatMap只有然后所有其他分组 Observable s 将被使用。

因为我从你的最后一个问题中知道你正在尝试使用 flatMap 构建一个网络套接字。是个坏主意 - 您的来源 Observable传入的请求永远不会完成,有效地只为您将遇到的第一个 IP 地址提供服务。

你必须做的是使用 mergeMap . concatMap 相比时 mergeMap不关心元素的顺序,而是“先到先得” - 规则适用。

*** : 当你完成我的解释并希望理解如何 groupByflatMap有效,你会明白我为什么写“一开始”!

关于scala - Monix 如何通过 flatMap 运算符使用背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55843648/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com