gpt4 book ai didi

scala - 如何将akka http与akka流绑定(bind)?

转载 作者:行者123 更新时间:2023-12-02 09:23:42 26 4
gpt4 key购买 nike

我尝试使用流而不是纯参与者来处理 http 请求,并且我提供了以下代码:

trait ImagesRoute {

val log = LoggerFactory.getLogger(this.getClass)

implicit def actorRefFactory: ActorRefFactory
implicit def materializer: ActorMaterializer

val source =
Source
.actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.toMat(Sink.asPublisher(true))(Keep.both)

val route = {
pathPrefix("images") {
pathEnd {
post {
entity(as[Image]) { image =>

val (ref, publisher) = source.run()

val addFuture = Source.fromPublisher(publisher)

val future = addFuture.runWith(Sink.head[Option[Image]])

ref ! image

onComplete(future.mapTo[Option[Image]]) {
case Success(img) =>
complete(Created, img)

case Failure(e) =>
log.error("Error adding image resource", e)
complete(InternalServerError, e.getMessage)
}
}
}
}
}
}
}

我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用参与者与路线交互,使用询问模式,然后在参与者内部进行流式传输一切。

有什么想法吗?

最佳答案

如果您只期望实体提供 1 个图像,则无需从 ActorRef 创建 Source,也不需要 Sink.asPublisher ,您可以简单地使用 Source.single:

def imageToComplete(img : Option[Image]) : StandardRoute = 
img.map(i => complete(Created, i))
.getOrElse {
log error ("Error adding image resource", e)
complete(InternalServerError, e.getMessage
}

...

entity(as[Image]) { image =>

val future : Future[StandardRoute] =
Source.single(image)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.runWith(Sink.head[Option[Image]])
.map(imageToComplete)

onComplete(future)
}

进一步简化您的代码,您仅处理 1 个图像的事实意味着流是不必要的,因为仅 1 个元素不需要背压:

val future : Future[StandardRoute] = ImageRepository.add(image)
.map(imageToComplete)

onComplete(future)

在您指出的评论中

"this is just a simple example, but the stream pipeline should be bigger doing a lot of things like contacting external resources and eventually back pressure things"

这仅适用于您的实体是图像流的情况。如果您只为每个 HttpRequest 处理 1 个图像,则永远不会应用背压,并且您创建的任何流都将是 slower version of a Future

如果您的实体实际上是图像流,那么您可以将其用作流的一部分:

val byteStrToImage : Flow[ByteString, Image, _] = ???

val imageToByteStr : Flow[Image, Source[ByteString], _] = ???

def imageOptToSource(img : Option[Image]) : Source[Image,_] =
Source fromIterator img.toIterator

val route = path("images") {
post {
extractRequestEntity { reqEntity =>

val stream = reqEntity.via(byteStrToImage)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.via(Flow.flatMapConcat(imageOptToSource))
.via(Flow.flatMapConcat(imageToByteStr))

complete(HttpResponse(status=Created,entity = stream))
}
}
}

关于scala - 如何将akka http与akka流绑定(bind)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39624578/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com