gpt4 book ai didi

kotlin - 为什么 Flux.flatMap() 不等待内部发布者完成?

转载 作者:行者123 更新时间:2023-12-02 13:16:29 25 4
gpt4 key购买 nike

您能否解释一下 HttpClient.response() 返回的 Flux/Mono 到底发生了什么?我认为 http 客户端生成的值在 Mono 完成之前不会传递到下游,但我看到生成了大量请求,最终以 reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquirePendingLimitException: Pending acquire queue has reached它的最大大小为 8 异常。如果我用 Mono.fromCallable { } 替换对 testRequest() 的调用,它会按预期工作(项目正在一个接一个地处理)。

我错过了什么?

测试代码:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}

fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}

最佳答案

当您像这样 ConnectionProvider.create("meh", 4) 创建 ConnectionProvider 时,这意味着连接池的最大连接数为 4,最大挂起请求数为 8。请参阅here有关此的更多信息。

当您使用 flatMap 时,这意味着将此 Flux 发出的元素异步转换为 Publisher,然后通过合并将这些内部发布者扁平化为单个 Flux,这允许它们交错 参见 here有关此的更多信息。

那么发生的情况是您尝试同时运行所有请求。

所以你有两个选择:

  • 如果您想使用 flatMap,请增加待处理请求的数量。
  • 如果你想保留待处理请求的数量,你可以考虑使用 concatMap 而不是 flatMap,这意味着 转换由此发出的元素Flux 异步进入 Publisher,然后将这些内部发布者扁平化为单个 Flux,按顺序并使用连接保持顺序。查看更多here关于这个。

关于kotlin - 为什么 Flux.flatMap() 不等待内部发布者完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63424997/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com