gpt4 book ai didi

scala - 在 Spray 中完成请求处理后是否可以安装回调?

转载 作者:行者123 更新时间:2023-12-05 00:32:45 26 4
gpt4 key购买 nike

我正在尝试从 Spray 提供大型临时文件。 HTTP 请求完成后,我需要删除这些文件。到目前为止,我找不到这样做的方法......

我使用的代码类似于 this或者这个:

          respondWithMediaType(`text/csv`) {
path("somepath" / CsvObjectIdSegment) {
id =>
CsvExporter.export(id) { // loan pattern to provide temp file for this request
file =>
encodeResponse(Gzip) {
getFromFile(file)
}
}
}
}

所以本质上它调用 getFromFile 来完成 Future 中的路由。问题在于,即使 Future 已完成,网络请求仍未完成。我尝试编写一个类似于 getFromFile 的函数,我会在 FutureonComplete 中调用 file.delete() > 但它有同样的问题 - 如果文件足够大,Future 会在客户端完成文件下载之前完成。

下面是Spray的getFromFile供引用:

/**
* Completes GET requests with the content of the given file. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !). If the file cannot be found or read the request is rejected.
*/
def getFromFile(file: File)(implicit settings: RoutingSettings,
resolver: ContentTypeResolver,
refFactory: ActorRefFactory): Route =
getFromFile(file, resolver(file.getName))

我不能使用 file.deleteOnExit(),因为 JVM 可能暂时不会重新启动,并且临时文件将一直放置在周围浪费磁盘空间。

另一方面,这是一个更普遍的问题——有没有办法在 Spray 中安装回调,以便在请求处理完成时可以释放资源或更新统计/日志等。

最佳答案

感谢@VladimirPetrosyan 的指点。以下是我的实现方式:

路线是这样的:

trait MyService extends HttpService ... with CustomMarshallers {

override def routeSettings = implicitly[RoutingSettings]

...

get {
respondWithMediaType(`text/csv`) {
path("somepath" / CsvObjectIdSegment) {
filterInstanceId => // just an ObjectId
val tempResultsFile = CsvExporter.saveCsvResultsToTempFile(filterInstanceId)
respondWithLastModifiedHeader(tempResultsFile.lastModified) {
encodeResponse(Gzip) {
complete(tempResultsFile)
}
}
}
}

我混合在其中的特征进行解码产生分块响应:

import akka.actor._
import spray.httpx.marshalling.{MarshallingContext, Marshaller}
import spray.http.{MessageChunk, ChunkedMessageEnd, HttpEntity, ContentType}
import spray.can.Http
import spray.http.MediaTypes._
import scala.Some
import java.io.{RandomAccessFile, File}
import spray.routing.directives.FileAndResourceDirectives
import spray.routing.RoutingSettings
import math._

trait CustomMarshallers extends FileAndResourceDirectives {

implicit def actorRefFactory: ActorRefFactory
implicit def routeSettings: RoutingSettings

implicit val CsvMarshaller =
Marshaller.of[File](`text/csv`) {
(file: File, contentType: ContentType, ctx: MarshallingContext) =>

actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
val defaultChunkSize = min(routeSettings.fileChunkingChunkSize, routeSettings.fileChunkingThresholdSize).toInt

private def getNumberOfChunks(file: File): Int = {
val randomAccessFile = new RandomAccessFile(file, "r")
try {
ceil(randomAccessFile.length.toDouble / defaultChunkSize).toInt
} finally {
randomAccessFile.close
}
}

private def readChunk(file: File, chunkIndex: Int): String = {
val randomAccessFile = new RandomAccessFile(file, "r")
val byteBuffer = new Array[Byte](defaultChunkSize)
try {
val seek = chunkIndex * defaultChunkSize
randomAccessFile.seek(seek)
val nread = randomAccessFile.read(byteBuffer)
if(nread == -1) ""
else if(nread < byteBuffer.size) new String(byteBuffer.take(nread))
else new String(byteBuffer)
} finally {
randomAccessFile.close
}
}

val chunkNum = getNumberOfChunks(file)

val responder: ActorRef = ctx.startChunkedMessage(HttpEntity(contentType, ""), Some(Ok(0)))(self)

sealed case class Ok(seq: Int)

def stop() = {
log.debug("Stopped CSV download handler actor.")
responder ! ChunkedMessageEnd
file.delete()
context.stop(self)
}

def sendCSV(seq: Int) =
if (seq < chunkNum)
responder ! MessageChunk(readChunk(file, seq)).withAck(Ok(seq + 1))
else
stop()

def receive = {
case Ok(seq) =>
sendCSV(seq)
case ev: Http.ConnectionClosed =>
log.debug("Stopping response streaming due to {}", ev)
}
}
}
}
}
}

创建临时文件,然后 actor 开始流式传输 block 。每当收到来自客户端的响应时,它都会发送一个 block 。每当客户端断开连接时,都会删除临时文件并关闭 actor。

这需要您在spray-can 中运行您的应用,我认为如果您在容器中运行它是行不通的。

一些有用的链接: example1 , example2 , docs

关于scala - 在 Spray 中完成请求处理后是否可以安装回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22827029/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com