- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
据我了解,下游的其余部分需要在线程池中的线程上处理(我将其设置为1024)
这是我的代码。
Flux<String> ips =
Flux.fromIterable(items).map(Item::getIp);
ips
.publishOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(1024)))
.map(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
return response.code();
} catch (Exception e) {
}
return -1;
})
.subscribe(System.out::println);
出于某种原因,与以下代码相比,此代码非常慢:
appRules
.stream()
.parallel()
.map(Item::getIp)
.forEach(ip -> {
try {
Request request = new Request.Builder().url("https://" + ip + ":443").build();
Response response = okHttpClient.newCall(request).execute();
System.out.println(response.code());
} catch (Exception e) {
}
System.out.println(-1);
});
为什么?当您受到 IO 限制时,同时处理项目流的正确方法是什么? (而不是CPU)
最佳答案
执行速度较慢的原因是Reactor管道执行默认是单线程的。因此,当您使用 Flux.publishOn 运算符时,您只需说您希望在给定线程池中的线程上执行管道的这一部分,但它不会同时在单独的线程上执行每个项目。
实现并发的一种选择是使用 parallel Flux ,它创建了所谓的“rails”,其中数据可以并行流动,但它主要用于 CPU 密集型操作。
更好的选择是将阻塞代码包装在 Mono 中并将其委托(delegate)给专用线程池,类似于您所做的,只是这次每个任务都会获得自己的线程:
private static void reactorProcess()
{
ExecutorService executor = Executors.newFixedThreadPool(1024);
Flux.range(1, 1024)
.flatMap(a -> Mono.fromRunnable(() -> simulateHttpCall())
.subscribeOn(Schedulers.fromExecutor(executor)))
.blockLast();
executor.shutdown();
}
private static void simulateHttpCall()
{
try
{
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + ": " + ZonedDateTime.now());
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
我还要指出,Java 并行流并不是这种处理的可行替代方案。它使用 ForkJoinPool默认情况下,它也适用于 CPU 密集型操作,并且仅使用与计算机中 CPU 核心数量相同的线程。
除此之外,如果您想充分利用响应式(Reactive)编程的功能,您应该考虑使用支持非阻塞 IO 的 HTTP 客户端,例如 WebClient从 Spring 开始。通过使用非阻塞 HTTP 客户端,您无需再担心定义线程池,因为不会阻塞任何线程,并且固定的少量线程将能够服务数千个并发请求。
关于java - react 堆项目 : downstream is slow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57168517/
我已阅读 [PCA 文档]( http://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html)sciki
据我了解,下游的其余部分需要在线程池中的线程上处理(我将其设置为1024) 这是我的代码。 Flux ips = Flux.fromIterable(items).map(Item::
我正在尝试在 Electron 中保护离线内容的播放。 我正在尝试使用: https://github.com/castlabs/downstream_electron 连同他们与 Widevine
我已经开始使用 Git 并遇到了“上游”和“下游”这两个术语。我以前见过这些,但从未完全理解它们。这些术语在 SCM(Software Configuration Management 工具)和源代码
据我所知,“聚合下游测试结果”功能并没有按预期工作(并且很难找到有用的文档)。我想实现非常相似的功能: 作业Build并行触发作业T1、T2(其中T1执行FindBugs,T2 PMD)。 场景 1:
事件文档 http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelEvent.html 例如:可以同时获取两个事件吗? @
我目前有一个多分支项目,我希望“开发”分支构建触发另一个顶级 Maven Jenkins 作业。多分支项目中的目标保持在最低限度(构建和单元测试),而顶级 Maven 项目被配置为运行各种报告(“站点
有人可以向我解释一下,如何在 netty 中处理“下游异常”?根据 javadoc 没有下游异常: http://docs.jboss.org/netty/3.1/api/org/jboss/nett
我在两个 Amazon EC2 实例(版本 1.8.0)上有一个实时 Couchbase 集群,以及大约 5 个应用程序服务器,每个服务器上都运行 PHP 并带有 moxi 客户端。有时,Moxi 在
TLDR:使用错误的参数顺序调用上游函数。我如何确保这是通过测试捕获的? 这是我的设置的最小示例: # functions.py def inner(age, name): if age >
我正在编写一个 SBT 插件。我想使用 Circe JSON 库,但它需要 Scala 2.10 上的 Macro Paradise 编译器插件。 通常,您将编译器插件添加到 build.sbt 中,
我有一个(Flowable)项目流要使用单个公共(public)资源并行处理,并且之后必须处置该资源。我尝试使用 Single.using() 运算符,但它甚至在处理流中的第一个项目之前就释放了资源。
我是 RxJava 的新手,有一个问题我无法找到合适的解决方案,即使经过数小时的研究。 它是这样的:我正在开发一个银行应用程序,它连接到某个服务器以接收定价更新、处理这些更新并将它们保存在某个内部数据
我的 Hudson 项目似乎没有正确聚合下游测试结果,我想知道我是否错过了某个配置步骤。我有两个项目,Foo 和 Foo-Tests,这两个项目都是自由式工作。 在项目 Foo 我有以下配置: 选中“
如果我启动我的 phoenix 应用程序的一个实例并用请求点击它,我的插件将适本地停止。但是,在测试环境中做同样的事情,停止并不会阻止下游的插件被调用,这会导致我的测试失败。我认为问题可能来自我在测试
我是一名优秀的程序员,十分优秀!