gpt4 book ai didi

scala - 如何使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件?

转载 作者:行者123 更新时间:2023-12-04 04:10:25 25 4
gpt4 key购买 nike

在过去的几天里,我一直在尝试找出使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件的最佳方法。

最初我从 Future-Based Variant 开始看起来像这样:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}

那还好,但是一旦我了解了更多关于纯 Akka Streams 的知识,我想尝试使用 Flow-Based Variant创建从 Source[HttpRequest] 开始的流.起初这完全让我难倒,直到我偶然发现了 flatMapConcat流量转换。这最终变得更加冗长:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}

def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}

然后我想变得有点棘手并使用 Content-Disposition标题。

回到基于 future 的变体:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}

但是现在我不知道如何使用基于 future 的变体来做到这一点。这是我得到的:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}

所以现在我有一个 Source将发出一个或多个 (ByteString, File)每个元素 File (我说每个 File 因为没有理由原始 Source 必须是单个 HttpRequest )。

有没有办法把这些和路由到一个动态 Sink ?

我在想类似 flatMapConcat , 如:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???

这样我才能完成 downloadViaFlow2和:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}

最佳答案

该解决方案不需要 flatMapConcat。如果您不需要文件写入的任何返回值,那么您可以使用 Sink.foreach :

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
val file = destinationFile(downloadDir, httpResponse)
httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
val request = HttpRequest(uri=uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()

source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.runWith(Sink.foreach(writeFile(downloadDir)))
}

请注意 Sink.foreach创建 Futures来自 writeFile功能。因此没有太多的背压。 writeFile 可能会被硬盘驱动器减慢,但流将继续生成 Futures。要控制它,您可以使用 Flow.mapAsyncUnordered (或 Flow.mapAsync):
val parallelism = 10

source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.ignore)

如果要累积总计数的 Long 值,则需要结合 Sink.fold :
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.fold(0L)(_ + _))

当请求源枯竭时,折叠将保持运行总和并发出最终值。

关于scala - 如何使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34912143/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com