gpt4 book ai didi

scala - Akka Streams 与 Akka HTTP 服务器和客户端

转载 作者:行者123 更新时间:2023-12-02 02:55:19 25 4
gpt4 key购买 nike

我正在尝试在我的 Akka Http 服务器上创建一个端点,它使用外部服务告诉用户它的 IP 地址(我知道这可以更容易地执行,但我这样做是一个挑战)。

最上层不使用流的代码是这样的:

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val requestHandler: HttpRequest => Future[HttpResponse] = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
Http().singleRequest(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))).flatMap { response =>
response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map { string =>
HttpResponse(entity = HttpEntity(MediaTypes.`text/html`,
"<html><body><h1>" + string.utf8String + "</h1></body></html>"))
}
}

case _: HttpRequest =>
Future(HttpResponse(404, entity = "Unknown resource!"))
}

Http().bindAndHandleAsync(requestHandler, "localhost", 8080)

并且运行良好。然而,作为一个挑战,我想限制自己只使用流(没有 Future's)。

这是我想用于这种方法的布局:Source[Request] -> Flow[Request, Request] -> Flow[Request, Response] ->Flow[Response, Response] 并容纳 404 路由,还有 Source[Request] -> 流程[请求,响应]。现在,如果我的 Akka Stream 知识对我有用,我需要使用 Flow.fromGraph 来完成这样的事情,但是,这就是我陷入困境的地方。

future 我可以为各种端点做一个简单的 map 和平面 map ,但在流中这意味着将流分成多个流,我不太确定我会怎么做那。我考虑过使用 UnzipWith 和 Options 或通用的 Broadcast。

任何有关此主题的帮助将不胜感激。

<小时/>

我不知道这是否有必要? -- http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html

最佳答案

您不需要使用Flow.fromGraph。相反,使用 flatMapConcat 的单个 Flow 将起作用:

//an outgoing connection flow
val checkIPFlow = Http().outgoingConnection("checkip.amazonaws.com")

//converts the final html String to an HttpResponse
def byteStrToResponse(byteStr : ByteString) =
HttpResponse(entity = new Default(MediaTypes.`text/html`,
byteStr.length,
Source.single(byteStr)))

val reqResponseFlow = Flow[HttpRequest].flatMapConcat[HttpResponse]( _ match {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
Source.single(HttpRequest(GET, Uri("http://checkip.amazonaws.com/")))
.via(checkIPFlow)
.mapAsync(1)(_.entity.dataBytes.runFold(ByteString(""))(_ ++ _))
.map("<html><body><h1>" + _.utf8String + "</h1></body></html>")
.map(ByteString.apply)
.map(byteStrToResponse)

case _ =>
Source.single(HttpResponse(404, entity = "Unknown resource!"))
})

然后可以使用此流绑定(bind)到传入请求:

Http().bindAndHandle(reqResponseFlow, "localhost", 8080)

而且一切都没有 future ......

关于scala - Akka Streams 与 Akka HTTP 服务器和客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34253591/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com