gpt4 book ai didi

akka - 将 Akka HTTP 连接到 Akka 流

转载 作者:行者123 更新时间:2023-12-02 09:19:29 30 4
gpt4 key购买 nike

我想使用 Akka HTTP 构建一个连接到现有接收器(带有 Kafka react 流)的 REST 服务,但我不知道如何将 HTTP 流链接到 Akka 流接收器...

我应该选择使用 Flows 的低级 Akka HTTP API 吗?

我的要求是:

  • 整个流程的背压
  • 当所有事件都被 kafka 接收器确认时,响应代码为 200
  • 当背压太高时为 500 ?是否可以 ?

这是我的代码当前代码

// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)

// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)

val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}

Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)

最佳答案

有几种方法可以解决这个问题。一种建议是将数据直接从您的请求实体流式传输到您的 kafka 接收器中。 extractDataBytes 指令可以帮助您做到这一点(更多信息 here )。

尝试按照下面的示例进行一些操作。我添加了一个 ??? 流程,以允许您进行特定于案例的转换,以正确拆分/转换您的请求实体字节。您可以使用 Framing.delimiter 之类的工具来分割实体字节流(更多信息 here )。

  (extractDataBytes & extractMaterializer) { (byteSrc, mat) =>
val f = byteSrc.via(???).runWith(kafkaSink)(mat)
onComplete(f){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}

或者,如果您想将实体解码到某个域对象,您可以执行类似的操作

  (entity(as[Event]) & extractMaterializer) { (event, mat) =>
val f = Source.single(event).via(???).runWith(kafkaSink)(mat)
onComplete(f){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}

回到你的最后一个问题,如果卡夫卡背压,你的流将永远不会完成。您应该期望服务器在配置的请求超时后返回 500(引用下面的文档):

A default request timeout is applied globally to all routes and can be configured using the akka.http.server.request-timeout setting (which defaults to 20 seconds).

关于akka - 将 Akka HTTP 连接到 Akka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42484553/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com