gpt4 book ai didi

scala - Akka Stream 在 Flow 中使用 HttpResponse

转载 作者:行者123 更新时间:2023-12-05 00:52:12 24 4
gpt4 key购买 nike

我想利用一个简单的 Flow 从 http 服务收集一些额外的数据,并用结果增强我的数据对象。下图说明了这个想法:

val httpClient = Http().superPool[User]()

val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}

val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}

val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}

val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))

我有一个问题来理解机制和之间的区别
Flow 中的流媒体性质和物化/ future 。

以下想法没有向我解释:
  • http://doc.akka.io/docs/akka-http/current/scala/http/implications-of-streaming-http-entity.html
  • akka HttpResponse read body as String scala

  • 我如何从响应中获取值到新的用户对象中,
    所以我可以在以下步骤中处理该对象。

    感谢帮助。

    更新:

    我正在使用远程 akka http 服务器评估代码,使用下面的代码进行解析,在立即到 10 秒之间响应请求。
    这导致了一些“EnhancedUser”实例在最后出现的效果,但那些花费太长时间回答的实例却缺少它们的值。

    我在某个时候将 .async 添加到 cityResponse 解析器的末尾,结果输出花费了更长的时间,但是是正确的。

    这种行为的原因是什么,它如何与接受的答案结合在一起?
    val cityResponse = Flow[(Try[HttpResponse], User)].map {
    case (Failure(ex), member) => member
    case (Success(response), member) => {
    Unmarshal(response.entity).to[String] onComplete {
    case Success(s) => member.city = Some(s)
    case Failure(ex) => member.city = None
    }
    }
    member
    }.async // <<-- This changed the behavior to be correct, why?

    最佳答案

    根据您从“cityRequestEndpoint”获取的实体的性质,您可以使用两种不同的策略:

    基于流

    处理这种情况的典型方法是始终假设来自源端点的实体可以包含 N 条数据,其中 N 事先未知。这通常是要遵循的模式,因为它是现实世界中最通用的,因此也是“最安全的”。

    第一步是转换HttpResponse从端点到数据源:

    val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] = 
    (response, user) => response match {
    case Failure(_) => Source single (None -> user)
    case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
    }

    上面的代码是我们不假设 N 大小的地方, r.entity.dataBytes可能是 0 ByteString 的来源值,或者可能是无限数量的值。但我们的逻辑不在乎!

    现在我们需要组合来自 Source 的数据。这是 Flow.flatMapConcat 的一个很好的用例它采用源流并将其转换为值流(类似于 Iterables 的 flatMap):
    val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] = 
    Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource

    剩下要做的就是转换 (ByteString, User) 的元组进入 EnhancedUser .注意:我假设低于 UserEnhancedUser 的子类这是从问题逻辑推断出来的:
    val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser = 
    (byteStr, user) =>
    byteStr
    .map(s => EnhancedUser(user.data, s))
    .getOrElse(user)

    val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] =
    Flow[(ByteString, User)] map convertByteStringToUser

    现在可以组合这些组件:
    val useEnhancementGraph =
    userSource
    .via(cityRequest)
    .via(httpClient)
    .via(cityByteStrFlow)
    .via(cityUserFlow)
    .via(processEnhancedUser)
    .to(Sink foreach println)

    基于 future

    我们可以使用 Futures 来解决问题,类似于您在原始问题中引用的堆栈问题。我不推荐这种方法有两个原因:
  • 它假设只有 1 个 ByteString 来自端点。如果端点将多个值作为 ByteStrings 发送,那么它们都会连接在一起,并且在创建 EnhancedUser 时可能会出现错误。 .
  • 它在 ByteString 数据的实现上设置了人为超时,类似于 Async.await (这几乎总是应该避免的)。

  • 要使用基于 Future 的方法,对原始代码的唯一重大更改是使用 Flow.mapAsync 而不是 Flow.map处理 Future 的事实正在函数中创建:
    val parallelism = 10

    val timeout : FiniteDuration = ??? //you need to specify the timeout limit

    val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] =
    _ match {
    case (Failure(ex), user) =>
    Future successful user
    case (Success(resp), user) =>
    resp
    .entity
    .toStrict(timeout)
    .map(byteStr => new EnhancedUser(user.data, byteStr))
    }

    val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
    Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)

    关于scala - Akka Stream 在 Flow 中使用 HttpResponse,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43236367/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com