gpt4 book ai didi

scala - akka-http:从 http 路由向 akka sink 发送元素

转载 作者:行者123 更新时间:2023-12-05 00:52:43 25 4
gpt4 key购买 nike

如何从 Akka HTTP 路由向 Akka Sink 发送元素/消息?我的 HTTP 路由仍然需要返回一个正常的 HTTP 响应。

我想这需要一个流分支/连接点。正常的 HTTP 路由是来自 HttpRequest -> HttpResponse 的流。我想添加一个分支/连接点,以便 HttpRequests 可以将事件触发到我的单独接收器并生成正常的 HttpResponse。

下面是一个非常简单的单路由 akka-http 应用程序。为简单起见,我使用了一个简单的 println 接收器。我的生产用例,显然会涉及一个不那么琐碎的接收器。

def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)

// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val route: akka.http.scaladsl.server.Route =
path("hello" / Segment) { p =>
get {
// I'd like to send a message to an Akka Sink as well as return an HTTP response.
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}

val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)

println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()

bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

编辑:或者在使用低级 akka-http API 时,如何将特定消息从特定路由处理程序发送到接收器?
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("my-akka-http-test")
val executor = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()(actorSystem)

// I would like to send elements to this sink in response to HTTP GET operations.
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))

case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")

case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")

case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}

val serverSource = Http().bind(interface = "localhost", port = 8080)

val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)

connection handleWithSyncHandler requestHandler
// this is equivalent to
// connection handleWith { Flow[HttpRequest] map requestHandler }
}).run()

println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
scala.io.StdIn.readLine()

bindingFuture
.flatMap(_.unbind())(executor) // trigger unbinding from the port
.onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

最佳答案

如果 你要发全HttpRequest对于您的水槽,我想说最简单的方法是使用 alsoTo组合器。结果将是类似的东西

val mySink: Sink[HttpRequest, NotUsed] = ???

val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)

仅供引用: alsoTo实际上隐藏了一个 Broadcast阶段。

如果 相反,您需要有选择地从特定子路由向 Sink 发送消息,您别无选择,只能为每个传入请求实现一个新流。请参阅下面的示例
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val route: akka.http.scaladsl.server.Route =
path("hello" / Segment) { p =>
get {

(extract(_.request) & extractMaterializer) { (req, mat) ⇒
Source.single(req).runWith(sink)(mat)

complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
}

另外,请记住,您始终可以完全放弃高级 DSL,并使用 lower-level streams DSL 为您的整个路线建模。 .这将导致更冗长的代码 - 但会让您完全控制您的流实现。

编辑:下面的例子
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val handlerFlow =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val partition = b.add(Partition[HttpRequest](2, {
case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0
case _ ⇒ 1
}))
val merge = b.add(Merge[HttpResponse](2))

val happyPath = Flow[HttpRequest].map{ req ⇒
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
}

val unhappyPath = Flow[HttpRequest].map{
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")

case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")

case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}

partition.out(0).alsoTo(sink) ~> happyPath ~> merge
partition.out(1) ~> unhappyPath ~> merge

FlowShape(partition.in, merge.out)
})

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)

关于scala - akka-http:从 http 路由向 akka sink 发送元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42444054/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com