gpt4 book ai didi

scala - Akka HTTP 连接池在几个小时后挂起

转载 作者:行者123 更新时间:2023-12-01 13:37:58 24 4
gpt4 key购买 nike

我有一个 HTTP 连接池,它在运行几个小时后挂起:

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = {
val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host)
Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew)
.via(pool).toMat(Sink.foreach {
case ((Success(res), p)) => p.success(res)
case ((Failure(e), p)) => p.failure(e)
})(Keep.left).run
}

我将项目排队:

private def enqueue(uri: Uri): Future[HttpResponse] = {
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = uri) -> promise

queue.offer(request).flatMap {
case Enqueued => promise.future
case _ => Future.failed(ConnectionPoolDroppedRequest)
}
}

然后像这样解析响应:

private def request(uri: Uri): Future[HttpResponse] = {
def retry = {
Thread.sleep(config.dispatcherRetryInterval)
logger.info(s"retrying")
request(uri)
}

logger.info("req-start")
for {
response <- enqueue(uri)

_ = logger.info("req-end")

finalResponse <- response.status match {
case TooManyRequests => retry
case OK => Future.successful(response)
case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString))
}
} yield finalResponse
}

如果 Future 成功,则此函数的结果总是被转换:

def get(uri: Uri): Future[Try[JValue]] = {
for {
response <- request(uri)
json <- Unmarshal(response.entity).to[Try[JValue]]
} yield json
}

一段时间内一切正常,然后我在日志中看到的只是 req-start 而没有 req-end。

我的akka​​配置是这样的:

akka {
actor.deployment.default {
dispatcher = "my-dispatcher"
}
}

my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"

fork-join-executor {
parallelism-min = 256
parallelism-factor = 128.0
parallelism-max = 1024
}
}

akka.http {
host-connection-pool {
max-connections = 512
max-retries = 5
max-open-requests = 16384
pipelining-limit = 1
}
}

我不确定这是配置问题还是代码问题。我的并行度和连接数如此之高,因为没有它我的请求/秒率非常低(我想尽可能快地请求 - 我有其他速率限制代码来保护服务器)。

最佳答案

您没有使用从服务器返回的响应的实体。引用以下文档:

Consuming (or discarding) the Entity of a request is mandatory! If accidentally left neither consumed or discarded Akka HTTP will assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. A client should consume the Entity regardless of the status of the HttpResponse.

实体以 Source[ByteString, _] 的形式出现,需要运行它以避免资源匮乏。

如果不需要读取实体,使用实体字节的最简单方法是丢弃它们,使用

res.discardEntityBytes()

(您可以通过添加 - 例如 - .future().map(...) 来附加回调)。

This page in the docs描述了所有替代方案,包括如何在需要时读取字节。

--- 编辑

在提供更多代码/信息后,很明显资源消耗不是问题。此实现中还有另一个大危险信号,即重试方法中的 Thread.sleep。这是一个阻塞调用,很可能会耗尽底层 actor 系统的线程基础设施。

docs 中提供了为什么这是危险的完整解释。 .

尝试更改它并使用 akka.pattern.after ( docs )。示例如下:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri))

关于scala - Akka HTTP 连接池在几个小时后挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42481876/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com