gpt4 book ai didi

scala - 在 Play Scala 中使用迭代器和枚举器将数据流式传输到 S3

转载 作者:行者123 更新时间:2023-12-04 18:40:18 24 4
gpt4 key购买 nike

我正在 Scala 中构建一个 Play Framework 应用程序,我想在其中将字节数组流式传输到 S3。我正在使用 Play-S3库来做到这一点。文档部分的“分段文件上传”在这里是相关的:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
bucket initiateMultipartUpload BucketFile(fileName, mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
bucket completeMultipartUpload (uploadTicket, partUploadTickets)

我试图在我的应用程序中做同样的事情,但使用 Iteratee s 和 Enumerator s。

流和异步性使事情有点复杂,但这是我目前所拥有的(注 uploadTicket 在代码前面定义):
val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
case result =>
result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
case Success(x) => x.map(d => println("Success"))
case Failure(t) => throw t
}
}

一切都编译并运行,没有发生任何事故。事实上, "Success"被打印出来,但没有文件出现在 S3 上。

最佳答案

您的代码可能存在多个问题。由于 map 导致有点不可读方法调用。您将来的作文可能有问题。另一个问题可能是由于所有块(除了最后一个)应该至少为 5MB 的事实引起的。

下面的代码未经测试,但显示了不同的方法。迭代方法是一种您可以创建小的构建块并将它们组合成操作管道的方法。

为了使代码编译,我添加了一个特征和一些方法

trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???

在这里我们创建了几个部分
// Create 5MB chunks
val chunked = {
val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
Enumeratee.grouped(take5MB transform Iteratee.consume())
}

// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
case ((counter, _), bytes) => (counter + 1) -> bytes
}

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete =
Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload

运行它是通过简单地连接零件来完成的
// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete

请注意,我没有测试任何代码,可能在我的方法中犯了一些错误。然而,这显示了处理问题的不同方式,应该可以帮助您找到一个好的解决方案。

请注意,此方法会等待一个部分完成上传,然后再进行下一部分。如果从您的服务器到 amazon 的连接比从浏览器到您的服务器的连接慢,则此机制会减慢输入速度。

您可以采取另一种方法,无需等待 Future部分上传完成。这将导致您使用 Future.sequence 的另一个步骤。将上传 future 的序列转换为包含结果序列的单个 future 。结果将是一种机制,一旦您有足够的数据,就会将零件发送到亚马逊。

关于scala - 在 Play Scala 中使用迭代器和枚举器将数据流式传输到 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27586997/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com