gpt4 book ai didi

scala - map 和 mapAsync 的区别

转载 作者:行者123 更新时间:2023-12-03 12:27:58 24 4
gpt4 key购买 nike

谁能解释一下 map 和 mapAsync w.r.t AKKA 流之间的区别? In the documentation据说

Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered



为什么我们不能简单地在这里映射?我假设 Flow、Source、Sink 本质上都是 Monadic 的,因此 map 应该可以正常工作,w.r.t 这些性质的延迟?

最佳答案

签名

signatures 中最能突出显示差异。 : Flow.map接受一个返回类型 T 的函数而Flow.mapAsync接受一个返回类型 Future[T] 的函数.

实例

举个例子,假设我们有一个函数可以根据用户 id 在数据库中查询用户的全名:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ??? //implementation unimportant

给定一个 akka 流 SourceUserID我们可以使用的值 Flow.map在流中查询数据库并将全名打印到控制台:
val userIDSource : Source[UserID, _] = ???

val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()

这种方法的一个限制是该流一次只能进行 1 db 查询。这种串行查询将成为“瓶颈”,并且可能会阻止我们的流中的最大吞吐量。

我们可以尝试通过使用 Future 的并发查询来提高性能。 :
def concurrentDBLookup(userID : UserID) : Future[FullName] = 
Future { databaseLookup(userID) }

val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()

这个简单的附录的问题在于我们有效地消除了背压。

Sink 只是拉入 Future 并添加一个 foreach println ,与数据库查询相比,速度相对较快。流将不断地将需求传播到源并在 Flow.map 内产生更多 future 。 .因此, databaseLookup 的数量没有限制。同时运行。不受约束的并行查询最终可能会使数据库过载。
Flow.mapAsync救援;我们可以同时访问并发数据库,​​同时限制同时查找的数量:
val maxLookupCount = 10

val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()

另请注意 Sink.foreach变得更简单了,它不再需要 Future[FullName]但只是一个 FullName反而。

无序异步映射

如果不需要将 UserID 保持到 FullNames 的顺序,那么您可以使用 Flow.mapAsyncUnordered .例如:您只需将所有名称打印到控制台,但不关心它们的打印顺序。

关于scala - map 和 mapAsync 的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35146418/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com