gpt4 book ai didi

java - 将 Akka 流传递给上游服务以进行填充

转载 作者:行者123 更新时间:2023-12-01 09:39:48 25 4
gpt4 key购买 nike

我需要调用上游服务(Azure Blob 服务)将数据推送到 OutputStream,然后我需要通过 akka 将其转回客户端。如果没有 akka(只有 servlet 代码),我只会获取 ServletOutputStream 并将其传递给 azure 服务的方法。

我可以尝试偶然发现的最接近的,显然这是错误的,是这样的

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});

ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

这个想法是我正在调用上游服务以通过调用填充输出流
blobClient.download(os);

似乎 lambda 函数被调用并返回,但随后它失败了,因为没有数据或其他东西。好像我不应该让那个 lambda 函数做这项工作,但也许返回一些做这项工作的对象?没有把握。

如何做到这一点?

最佳答案

这里真正的问题是 Azure API 不是为背压而设计的。输出流无法向 Azure 发回信号,表明它尚未准备好接收更多数据。换句话说:如果 Azure 推送数据的速度比您能够使用它的速度快,那么某处肯定会出现一些难看的缓冲区溢出故障。

接受这个事实,我们可以做的下一件最好的事情是:

  • 使用 Source.lazySource仅在有下游需求时才开始下载数据(也就是正在运行源并且正在请求数据)。
  • download调用其他线程,以便它继续执行而不会阻止返回源。一种方法是使用 Future (我不确定 Java 最佳实践是什么,但应该可以正常工作)。尽管最初并不重要,但您可能需要选择 system.dispatcher 以外的执行上下文。 - 这完全取决于是否download是否阻塞。

  • 如果此 Java 代码格式错误,我提前道歉 - 我将 Akka 与 Scala 一起使用,所以这一切都来自查看 Akka Java API 和 Java 语法引用。

    ResponseEntity responseEntity = HttpEntities.create(
    ContentTypes.APPLICATION_OCTET_STREAM,
    preAuthData.getFileSize(),

    // Wait until there is downstream demand to intialize the source...
    Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
    StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
    })
    );

    sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());

    关于java - 将 Akka 流传递给上游服务以进行填充,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61037458/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com