- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
几年后我又回到了 Java,很高兴看到新的 java.net.http.HttpClient 中引入了非阻塞异步支持。在AWS Java SDK 2.0 .我多年前在 session 上听说过响应式编程的概念,但没有太多机会将这些想法应用到实践中。
我有一个问题似乎很适合使用这种编程风格:基本上我想通过 HTTP 下载一堆文件(比如 10,000 个)并将它们写回 S3。
我用过 failsafe实现非阻塞异步 http GET 的重试,并且通过 S3 异步客户端将这些与上传组合起来很简单(参见下面的草图)。
但是,我不确定如何正确地限制程序的内存使用:如果文件下载速度快于写回速度,则没有应用反压和防止内存不足异常的机制S3.
我熟悉针对此问题的一些传统阻塞解决方案 - 例如使用信号量来限制并发下载的数量,或者将下载写出到 S3 上传线程将从中拉出的某个有界阻塞队列。但是,如果我要使用这种阻塞机制来应用背压,那么它首先会让我质疑使用非阻塞 IO 的优势。
是否有更惯用的“响应式(Reactive)”方式来实现相同的目标?
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class BackupClient {
private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
private final HttpClient httpClient = HttpClient.newBuilder().build();
private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();
public runBackup(List<URI> filesToBackup) {
List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
.map(backupClient::submitBackup)
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
}
private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
.thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
.bucket("my-bucket")
.key(uri.toASCIIString())
.build(), AsyncRequestBody.fromString(httpResponse.body())));
}
private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
final HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.timeout(Duration.ofMinutes(2))
.GET()
.build();
final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
.withMaxRetries(4)
.withDelay(Duration.ofSeconds(1))
.handleResultIf(response -> 200 != response.statusCode());
return Failsafe.with(retryPolicy)
.getStageAsync(context -> {
if (context.getAttemptCount() > 0) {
LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
}
return this.httpClient.sendAsync(request, handler);
});
}
}
最佳答案
既然您需要控制资源(内存)消耗,那么 Semaphore 是实现这一目标的合适工具。当你想使用非阻塞计算时,你所需要的只是异步信号量。流行的库(rxjava, react 流)在内部使用异步信号量来构造 react 流,但不将其作为单独的类提供。当 react 流的订阅者调用 Flow.Subscription.request(n) , 相当于 Semaphore.release(n) .类似于 Semaphore.acquire()然而,是隐藏的。它由发布者在内部调用。
这种设计方案的缺点是资源反馈只能在生产者和最近的消费者之间建立。如果有一条生产者和消费者的链条,那么每个环节的资源消耗都得单独控制,整体的资源消耗会变大N倍,其中N是环节数。
如果你负担得起,那么你可以使用 rxjava 或任何其他 react 流库的实现。如果没有,那么您必须使用唯一的异步库,它允许用户完全访问 asynchronous Semaphore implementation : DF4J (是的,我是作者)。它不包含对您的问题的直接解决方案,但是有一个示例,其中异步网络服务器通过异步信号量限制同时连接的数量,请参阅 ConnectionManager.java .
关于java - 非阻塞 Java 异步处理 - 如何限制内存使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56568754/
我有一个 ServiceBusQueue(SBQ),它获取大量消息负载。我有一个具有 accessRights(manage) 的 ServiceBusTrigger(SBT),它不断轮询来自 SBQ
在下面给出的结果集中,有 2 个唯一用户 (id),并且查询中可能会出现更多此类用户: 这是多连接查询: select id, name, col1Code, col2Code, col2Va
我正在用 Python 2.7.3 编写一个带有 GRequests 的小脚本和 lxml 可以让我从各种网站收集一些收藏卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回
我想知道何时实际使用删除级联或删除限制以及更新级联或更新限制。我对使用它们或在我的数据库中应用感到很困惑。 最佳答案 在外键约束上使用级联运算符是一个热门话题。 理论上,如果您知道删除父对象也将自动删
下面是我的输出,我只想显示那些重复的名字。每个名字都是飞行员,数字是飞行员驾驶的飞机类型。我想显示驾驶不止一架飞机的飞行员的姓名。我正在使用 sql*plus PIL_PILOTNAME
我正在评估不同的移动框架,我认为 nativescript 是一个不错的选择。但我不知道开发过程是否存在限制。例如,我对样式有限制(这并不重要),但我想知道将来我是否可以有限制并且不能使用某些 nat
我正在尝试使用 grails 数据绑定(bind)将一些表单参数映射到我的模型中,但我认为在映射嵌入式集合方面可能存在一些限制。 例如,如果我提交一些这样的参数,那么映射工作正常: //this wo
是否可以将 django 自过滤器起的时间限制为 7 天。如果日期超过 7 天,则不应用过滤器 最佳答案 timesince 的源代码位于 django/django/utils/timesince.
我想在我的网站上嵌入一个 PayPal 捐赠按钮。但问题是我住在伊朗——这个国家受到制裁,人们不使用国际银行账户或主要信用卡。 有什么想法吗?请帮忙! 问候 沮丧 最佳答案 您可以在伊朗境内使用为伊朗
这是我的查询 select PhoneNumber as _data,PhoneType as _type from contact_phonenumbers where ContactID = 3
这个问题在这里已经有了答案: What is the maximum number of parameters passed to $in query in MongoDB? (4 个答案) 关闭
我的一个项目的 AndroidManifest.xml 变得越来越大(> 1000 行),因为我必须对某些文件类型使用react并且涵盖所有情况变得越来越复杂。我想知道 list 大小是否有任何限制。
在使用 Sybase、Infomix、DB2 等其他数据库产品多年后使用 MySQL 5.1 Enterprise 时;我遇到了 MySQL 不会做的事情。例如,它只能为 SELECT 查询生成 EX
这个问题在这里已经有了答案: What is the maximum number of parameters passed to $in query in MongoDB? (4 个回答) 关闭5年
通常我们是在{$apache}/conf/httpd.conf中设置Apache的参数,然而我们并没有发现可以设置日志文件大小的配置指令,通过参考http://httpd.apache.org/do
我正在搜索最大的 Android SharedPreferences 键值对,但找不到任何好的答案。其次,我想问一下,如果我有一个键,它的字符串值限制是多少。多少字符可以放入其中。如果我需要频繁更改值
我目前正在试验 SoundCloud API,并注意到我对/tracks 资源的 GET 请求一次从不返回超过 200 个结果。关于这个的几个问题: 这个限制是故意的吗? 有没有办法增加这个限制? 如
我正在与一家名为 Dwolla 的金融技术公司合作,该公司提供了一个 API,用于将银行信息附加到用户并收取/发送 ACH 付款。 他们需要我将我的 TLS 最低版本升级到 1.2(禁用 TLS 1.
我在 PHP 中有一个多维数组,如下所示: $array = Array ( [0] => Array ( [bill] => 1 ) [1] => Array ( [
我在获取下一个查询的第一行时遇到了问题: Select mar.Title MarketTitle, ololo.NUMBER, ololo.Title from Markets mar JOIN(
我是一名优秀的程序员,十分优秀!