gpt4 book ai didi

scala - 使用 Akka Http 转换 Slick Streaming 数据并发送分块响应

转载 作者:行者123 更新时间:2023-12-03 20:53:10 25 4
gpt4 key购买 nike

目的是从数据库流式传输数据,对这个数据块执行一些计算(这个计算返回某个案例类的 Future )并将这个数据作为分块响应发送给用户。目前,我可以在不执行任何计算的情况下流式传输数据并发送响应。但是,我无法执行此计算然后流式传输结果。

这是我实现的路线。

def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}

函数 getRds 返回映射到 case 类中的表的行(使用 slick)。现在考虑将每一行作为输入并返回另一个案例类的 Future 的函数计算。就像是
def compute(x: Tweet) : Future[TweetNew] = ?

如何在变量 src 上实现此函数并将此计算的分块响应(作为流)发送给用户。

最佳答案

您可以使用 mapAsync 转换源:

val src =
Source.fromPublisher(db.stream(getRds))
.mapAsync(parallelism = 3)(compute)

complete(src)

根据需要调整并行度。

请注意,您可能需要配置一些设置,如 Slick documentation 中所述。 :

Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.



因此,例如,如果您使用的是 PostgreSQL,那么您的 Source可能如下所示:
val src =
Source.fromPublisher(
db.stream(
getRds.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
).transactionally
)
).mapAsync(parallelism = 3)(compute)

关于scala - 使用 Akka Http 转换 Slick Streaming 数据并发送分块响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47991866/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com