gpt4 book ai didi

scala - Akka Http - 主机级客户端 API Source.queue 模式

转载 作者:行者123 更新时间:2023-12-04 21:33:42 24 4
gpt4 key购买 nike

我们开始实现 Source.queue[HttpRequest]文档中提到的模式:http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples

这是文档中的(简化的)示例

val poolClientFlow = Http()
.cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")

val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew
)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
}
}

val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))

文档说明使用 Source.single(request)是一种反模式,应该避免。然而,它并没有阐明使用 Source.queue 的原因和含义。 .

At this place we previously showed an example that used the Source.single(request).via(pool).runWith(Sink.head). In fact, this is an anti-pattern that doesn’t perform well. Please either supply requests using a queue or in a streamed fashion as shown below.



Source.queue 的优点
  • 流程仅实现一次(可能是性能提升?)。但是,如果我正确理解了 akka-http 的实现,每个连接都会具体化一个新的流程,所以这似乎不是什么大问题
  • 使用 OverflowStrategy 进行显式背压处理并匹配 QueueOfferResult

  • Source.queue 的问题

    这些是当我们开始在我们的应用程序中实现这种模式时出现的问题。

    Source.queue 不是线程安全的

    The queue implementation is not thread safe .当我们在不同的路由/参与者中使用队列时,我们有这样的场景:

    入队的请求可以覆盖最新的入队请求,从而导致 Unresolved Future。

    更新

    此问题已在 akka/akka/issues/23081 中得到解决.队列实际上是线程安全的。

    过滤?

    当请求被过滤时会发生什么?例如。当有人更改实现时
    Source.queue[(HttpRequest, Promise[HttpResponse])](
    QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    // only successful responses
    .filter(_._1.isSuccess)
    // failed won't arrive here
    .to(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
    }))

    future 不会解决吗?使用单个请求流,这很简单:
    Source.single(request).via(poolClientFlow).runWith(Sink.headOption)

    队列大小与最大打开请求?
    QueueSize的区别和 max-open-requests不清楚。最后,两者都是缓冲区。我们的实现最终使用了 QueueSize == max-open-requests
    Source.single() 的缺点是什么?

    到目前为止,我发现了两个使用 Source.queue 的原因在 Source.single
  • 性能 - 仅实现一次流程。然而根据this answer这应该不是问题
  • 显式配置背压并处理故障情况。在我看来,ConnectionPool 对过多的负载有足够的处理能力。可以映射产生的 future 并处理异常。

  • 提前致谢,
    向木

    最佳答案

    我将直接回答您的每个问题,然后对整个问题给出一般性的间接回答。

    可能是性能提升?

    你是对的,有一个 Flow为每个实现IncomingConnection但是如果一个 Connection 有多个请求来自它,仍然可以获得性能提升。

    当请求被过滤时会发生什么?

    通常,流在源元素和接收元素之间没有 1:1 的映射。可能有 1:0,如您的示例中所示,或者如果单个请求以某种方式产生多个响应,则可能有 1:many。

    队列大小与最大打开请求?

    该比率取决于向队列提供元素的速度以及将 http 请求处理为响应的速度。没有预先定义的理想解决方案。

    一般重新设计

    大多数情况下是 Source.queue使用是因为某些上游函数正在动态创建输入元素,然后将它们提供给队列,例如

    val queue = ??? //as in the example in your question

    queue.offer(httpRequest1)
    queue.offer(httpRequest2)
    queue.offer(httpRequest3)

    这是糟糕的设计,因为用于创建每个输入元素的任何实体或函数本身都可能是流源的一部分,例如
    val allRequests = Iterable(httpRequest1, httpRequest2, httpRequest3)

    //no queue necessary
    val allResponses : Future[Seq[HttpResponse]] =
    Source(allRequests)
    .via(poolClientFlow)
    .to(Sink.seq[HttpResponse])
    .run()

    现在无需担心队列、最大队列大小等。一切都捆绑到一个漂亮的紧凑流中。

    即使请求的来源是动态的,您通常仍然可以使用 Source。假设我们从控制台标准输入获取请求路径,这仍然可以是一个完整的流:
    import scala.io.{Source => ioSource}

    val consoleLines : () => Iterator[String] =
    () => ioSource.stdin.getLines()

    Source
    .fromIterator(consoleLines)
    .map(consoleLine => HttpRequest(GET, uri = Uri(consoleLine)))
    .via(poolClientFlow)
    .to(Sink.foreach[HttpResponse](println))
    .run()

    现在,即使每行以随机时间间隔输入控制台,流仍然可以在没有队列的情况下 react 性地运行。

    我见过的唯一一个队列,或者 Source.ActorRef ,因为当您必须创建传递给第三方 API 的回调函数时,这是绝对必要的。此回调函数必须将传入元素提供给队列。

    关于scala - Akka Http - 主机级客户端 API Source.queue 模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44241920/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com