gpt4 book ai didi

scala - akka-stream + akka-http 生命周期

转载 作者:行者123 更新时间:2023-12-04 02:27:03 27 4
gpt4 key购买 nike

TLDR:当我将传出 http 请求作为流的一部分时,是按请求具体化一个流(即使用短期流)还是跨请求使用单个流具体化?

详细信息:我有一个典型的服务,它接受一个 HTTP 请求,将它分散到几个 3rd 方下游服务(不受我控制)并在将结果发回之前聚合结果。我正在使用 akka-http 进行客户端实现并使用 Spray 作为服务器(旧版,随着时间的推移将转移到 akka-http)。示意图:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
这可以通过实现每个请求的流或实现(部分)流一次并跨请求共享来实现。

实现每个请求会产生实现开销 1 并且不清楚如何利用连接池。问题描述here (许多实现可以耗尽池)。我可以将池包装在长时间运行的 http 流中,例如 here并包裹在 mapAsync 中“上游”,但我不清楚错误处理策略。当单个请求失败并且流终止时,它是否也会关闭池?更多,似乎我需要协调请求和响应,因为它们不是按顺序返回的。

// example of stream per request

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }

val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})


// example of stream per request with long running http stream

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]

val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})

跨请求共享流有一个类似的错误处理问题 - 似乎有一些故障模式可以在所有请求进行中降低该流。代码将类似于 host level API ,但队列在整个流前面。

在这种情况下哪种方式更好?

我确实尝试实现这两种解决方案,但在实现的每个阶段都有很多设计选择,因此即使在“正确”的道路上也很容易搞砸。

1虽然我认为它可以忽略不计,它与akka-http服务器的运行方式相同。

最佳答案

一般来说,使用单个连接要好得多 Flow并通过单个 Flow 发送您的所有请求。主要原因是因为新的实现实际上可能会导致新的 Connection每次都形成(取决于您的连接池设置)。

您是正确的,这会导致一些并发症:

订购 :通过提供随机UUID作为元组中的第二个值
您正在传递给连接流,您正在消除将请求与响应相关联的能力。额外的 T元组中的值可以用作“相关ID”来知道哪个HttpResponse你从流动中得到。在您的特定示例中,您可以使用初始 Int来自 Range你创造了:

val responseSource : Source[(Try[HttpResponse], Int), _] = 
Source
.fromIterator( () => Iterator range (0,5) )
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)

现在每个响应都带有原始 Int 值,您可以使用它来处理响应。

错误处理 :您在陈述“单个请求失败并且流被终止”方面是不正确的。单个请求失败不一定会导致流失败。相反,您只会得到一个 (Failure(exception), Int)来自连接流的值。您现在知道哪个 Int 导致了失败,并且您从流程中获得了异常。

关于scala - akka-stream + akka-http 生命周期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46435647/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com