gpt4 book ai didi

java - 非阻塞 Java 异步处理 - 如何限制内存使用?

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:30:47 25 4
gpt4 key购买 nike

几年后我又回到了 Java,很高兴看到新的 java.net.http.HttpClient 中引入了非阻塞异步支持。在AWS Java SDK 2.0 .我多年前在 session 上听说过响应式编程的概念,但没有太多机会将这些想法应用到实践中。

我有一个问题似乎很适合使用这种编程风格:基本上我想通过 HTTP 下载一堆文件(比如 10,000 个)并将它们写回 S3。

我用过 failsafe实现非阻塞异步 http GET 的重试,并且通过 S3 异步客户端将这些与上传组合起来很简单(参见下面的草图)。

但是,我不确定如何正确地限制程序的内存使用:如果文件下载速度快于写回速度,则没有应用反压和防止内存不足异常的机制S3.

我熟悉针对此问题的一些传统阻塞解决方案 - 例如使用信号量来限制并发下载的数量,或者将下载写出到 S3 上传线程将从中拉出的某个有界阻塞队列。但是,如果我要使用这种阻塞机制来应用背压,那么它首先会让我质疑使用非阻塞 IO 的优势。

是否有更惯用的“响应式(Reactive)”方式来实现相同的目标?

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class BackupClient {
private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
private final HttpClient httpClient = HttpClient.newBuilder().build();
private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();

public runBackup(List<URI> filesToBackup) {
List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
.map(backupClient::submitBackup)
.collect(Collectors.toList());

futures.forEach(CompletableFuture::join);
}

private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
.thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
.bucket("my-bucket")
.key(uri.toASCIIString())
.build(), AsyncRequestBody.fromString(httpResponse.body())));
}


private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
final HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.timeout(Duration.ofMinutes(2))
.GET()
.build();

final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
.withMaxRetries(4)
.withDelay(Duration.ofSeconds(1))
.handleResultIf(response -> 200 != response.statusCode());

return Failsafe.with(retryPolicy)
.getStageAsync(context -> {
if (context.getAttemptCount() > 0) {
LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
}
return this.httpClient.sendAsync(request, handler);
});
}
}

最佳答案

既然您需要控制资源(内存)消耗,那么 Semaphore 是实现这一目标的合适工具。当你想使用非阻塞计算时,你所需要的只是异步信号量。流行的库(rxjava, react 流)在内部使用异步信号量来构造 react 流,但不将其作为单独的类提供。当 react 流的订阅者调用 Flow.Subscription.request(n) , 相当于 Semaphore.release(n) .类似于 Semaphore.acquire()然而,是隐藏的。它由发布者在内部调用。

这种设计方案的缺点是资源反馈只能在生产者和最近的消费者之间建立。如果有一条生产者和消费者的链条,那么每个环节的资源消耗都得单独控制,整体的资源消耗会变大N倍,其中N是环节数。

如果你负担得起,那么你可以使用 rxjava 或任何其他 react 流库的实现。如果没有,那么您必须使用唯一的异步库,它允许用户完全访问 asynchronous Semaphore implementation : DF4J (是的,我是作者)。它不包含对您的问题的直接解决方案,但是有一个示例,其中异步网络服务器通过异步信号量限制同时连接的数量,请参阅 ConnectionManager.java .

关于java - 非阻塞 Java 异步处理 - 如何限制内存使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56568754/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com