gpt4 book ai didi

scala - 在流中链接 Akka-http-client 请求

转载 作者:行者123 更新时间:2023-12-04 06:33:03 24 4
gpt4 key购买 nike

我想使用 akka-http-client 作为 Stream 链接 http 请求。链中的每个 http 请求都取决于先前请求的成功/响应,并使用它来构建新请求。如果请求不成功,Stream 应返回不成功请求的响应。

如何在 akka-http 中构建这样的流?
我应该使用哪个 akka-http 客户端级 API?

最佳答案

如果您正在制作网络爬虫,请查看 this post .这个答案解决了一个更简单的情况,例如下载分页资源,其中下一页的链接位于当前页面响应的标题中。

您可以使用 Source.unfoldAsync 创建一个链接源 - 一个项目通向下一个项目方法。这需要一个函数,它接受一个元素 S并返回 Future[Option[(S, E)]]确定流是否应该继续发射 E 类型的元素,将状态传递给下一次调用。

在你的情况下,这有点像:

  • 取首字母 HttpRequest
  • 生产 Future[HttpResponse]
  • 如果响应指向另一个 URL,则返回 Some(request -> response) , 否则 None

  • 然而 ,有一个问题,那就是如果它不包含指向下一个请求的指针,它将不会从流中发出响应。

    为了解决这个问题,您可以将函数传递给 unfoldAsync返回 Future[Option[(Option[HttpRequest], HttpResponse)]] .这使您可以处理以下情况:
  • 当前响应是错误
  • 当前响应指向另一个请求
  • 当前响应未指向另一个请求

  • 下面是一些带注释的代码,概述了这种方法,但首先是初步的:

    在 Akka 流中将 HTTP 请求流式传输到响应时,您需要确保响应正文被消耗掉,否则会发生不好的事情(死锁等)。如果您不需要正文,您可以忽略它,但在这里我们使用一个函数来转换 HttpEntity从(潜在)流到严格实体:
    import scala.concurrent.duration._

    def convertToStrict(r: HttpResponse): Future[HttpResponse] =
    r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

    接下来,使用几个函数来创建 Option[HttpRequest]来自 HttpResponse .这个例子使用了类似 Github 的分页链接的方案,其中 Links header 包含,例如: <https://api.github.com/...> rel="next" :
    def nextUri(r: HttpResponse): Seq[Uri] = for {
    linkHeader <- r.header[Link].toSeq
    value <- linkHeader.values
    params <- value.params if params.key == "rel" && params.value() == "next"
    } yield value.uri

    def getNextRequest(r: HttpResponse): Option[HttpRequest] =
    nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))

    接下来,我们将传递给 unfoldAsync 的实际函数.它使用 Akka HTTP Http().singleRequest() API 取一个 HttpRequest并产生一个 Future[HttpResponse] :
    def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
    reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
    // handle the error case. Here we just return the errored response
    // with no next item.
    if (response.status.isFailure()) Future.successful(Some(None -> response))

    // Otherwise, convert the response to a strict response by
    // taking up the body and looking for a next request.
    else convertToStrict(response).map { strictResponse =>
    getNextRequest(strictResponse) match {
    // If we have no next request, return Some containing an
    // empty state, but the current value
    case None => Some(None -> strictResponse)

    // Otherwise, pass on the request...
    case next => Some(next -> strictResponse)
    }
    }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
    }

    请注意,如果我们收到错误响应,则流将不会继续,因为我们返回 None在下一个状态。

    您可以调用它来获取 HttpResponse 的流像这样的对象:
    val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
    Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)

    至于返回最后一个(或错误的)响应的值,您只需要使用 Sink.last ,因为流将在成功完成或第一个错误响应时结束。例如:
    def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)

    关于scala - 在流中链接 Akka-http-client 请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39437718/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com