gpt4 book ai didi

scala - 使用喷雾客户端进行多个请求时的 akka 超时

转载 作者:行者123 更新时间:2023-12-04 14:57:57 26 4
gpt4 key购买 nike

使用喷雾 1.3.2 和 akka 2.3.6。 (akka 仅用于喷雾)。
我需要读取大文件,并为每一行发出一个 http 请求。
我用迭代器逐行读取文件,并为每个项目发出请求。
它在某些线路上成功运行,但在某些时候它开始失败:akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms] .
我首先以为我使服务过载,所以我将“spray.can.host-connector.max-connections”设置为 1。它运行得慢得多,但我遇到了同样的错误。
这里的代码:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
mainType = "application",
subType = "edn",
compressible = true,
binary = false,
fileExtensions = Seq("edn")))

val pipeline = (
addHeader("Accept", "application/json")
~> sendReceive
~> unmarshal[PipelineResponse])

def postData(data: String) = {
val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
val responseFuture: Future[PipelineResponse] = pipeline(request)
responseFuture
}

dataLines.map { d =>
val f = postData(d)
f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
f.map { p => someMoreLogic(d, p) }
}

aggrigateResults(dataLines)

我这样做是因为我不需要整个数据,只需要一些聚合。

我该如何解决这个问题并保持完全异步?

最佳答案

Akka ask timeout 是通过 firstCompletedOf 实现的,所以当 ask 初始化时定时器就开始了。

您似乎在做的是为每条线(在 map 期间)生成一个 Future - 因此您的所有调用几乎同时执行。当 future 被初始化时,超时开始计数,但是所有生成的 Actor 都没有留下执行器线程来完成他们的工作。因此,询问超时。

我建议使用更灵活的方法,而不是“一次性”处理——有点类似于使用迭代器或 akka-streams:Work Pulling Pattern . ( Github )

您提供您已经拥有的迭代器 Epic .介绍一个 Worker Actor ,它将执行调用和一些逻辑。如果您生成 N workers那么,最多会有N并行处理的行(并且处理管道可能涉及多个步骤)。通过这种方式,您可以确保不会使执行程序过载,并且不会发生超时。

关于scala - 使用喷雾客户端进行多个请求时的 akka 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26958936/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com