- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要使用 webclient 和 Flux 调用其余分页 api。我尝试过以阻塞方式(一个接一个),但我想让它并行。假设一次 10 个并行调用。每次调用都会获取 1000 条记录。
我已经在调用第 0 个请求以从 header 获取总记录数。请求完成后,我需要调用 POST api 发送此响应(1000 条记录)。
如果任何请求完成,则将发送第 11 个请求,依此类推。我已经看到了 asyncRestTemplate
和可监听 futures 的其他示例,但是 asyncRestTemplate
已经被弃用,替代方案是 spring-webflux
还有
如rest template即将被弃用
我做了什么。
totalpages
计数,那么它会给我 500 内部服务器错误)ObjectMapper objmapper = new ObjectMapper();
HttpHeaders headers = partsService.getHeaders();
long totalCount = Long.parseLong(headers.get("total-count").get(0));
log.info(totalCount);
long totalPages = (long) Math.ceil((double) totalCount / 1000);
log.info(totalPages);
// List<Mono<List<Parts>>> parts = new ArrayList<>();
for (long i = 1; i <= 5; i++) {
partsService.fetchAllParts(1000L, i).log().subscribe(partList -> {
try {
// post each request response to another API
log.info(objmapper.writeValueAsString(partList));
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
});
log.info("Page Number:" + i);
}
我希望并行执行而没有任何 outOfmemoryerror
并且不给调用 api 带来太多负担。另外,我尝试一次获取所有页面,但收到 500 内部服务器错误。
我是 Flux(项目 react 器)的新手
Implemented below solution
它不是并行运行的,单个请求大约需要 2 分钟时间,这意味着所有 10 个(并发级别)应该同时完成。
try {
fetchTotalCount().log()
.flatMapMany(totalCount -> createPageRange(totalCount, 1000)).log()
.flatMap(pageNumber -> fetch(1000, pageNumber), 10).log()
.flatMap(response -> create(response))
.subscribe();
} catch (Exception e) {
e.printStackTrace();
}
Logs
2019-07-29T09:00:14,477 INFO [scheduling-1] r.u.Loggers$Slf4JLogger: request(10)
2019-07-29T09:00:14,478 INFO [scheduling-1] r.u.Loggers$Slf4JLogger: request(10)
2019-07-29T09:00:14,479 INFO [scheduling-1] r.u.Loggers$Slf4JLogger: request(unbounded)
2019-07-29T09:00:14,679 INFO [scheduling-1] c.o.q.l.Logging: fetch() execution time: 546 ms
2019-07-29T09:00:17,028 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(74577)
2019-07-29T09:00:17,042 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(1)
2019-07-29T09:00:17,068 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,1) execution time: 24 ms
2019-07-29T09:00:17,078 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(2)
2019-07-29T09:00:17,080 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,2) execution time: 2 ms
2019-07-29T09:00:17,083 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(3)
2019-07-29T09:00:17,087 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,3) execution time: 2 ms
2019-07-29T09:00:17,096 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(4)
2019-07-29T09:00:17,098 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,4) execution time: 1 ms
2019-07-29T09:00:17,100 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(5)
2019-07-29T09:00:17,101 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,5) execution time: 1 ms
2019-07-29T09:00:17,103 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(6)
2019-07-29T09:00:17,106 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,6) execution time: 3 ms
2019-07-29T09:00:17,108 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(7)
2019-07-29T09:00:17,110 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,7) execution time: 2 ms
2019-07-29T09:00:17,113 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(8)
2019-07-29T09:00:17,115 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,8) execution time: 1 ms
2019-07-29T09:00:17,116 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(9)
2019-07-29T09:00:17,118 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,9) execution time: 1 ms
2019-07-29T09:00:17,119 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(10)
2019-07-29T09:00:17,121 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,10) execution time: 1 ms
2019-07-29T09:00:17,123 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onComplete()
2019-07-29T09:09:03,295 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-29T09:09:03,296 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(11)
2019-07-29T09:09:03,296 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,11) execution time: 0 ms
2019-07-29T09:09:03,730 INFO [reactor-http-nio-1] c.o.q.s.Scheduler: 200 OK
2019-07-29T09:09:03,730 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-29T09:09:05,106 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(// data print)
2019-07-29T09:09:05,196 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-29T09:09:05,196 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(12)
2019-07-29T09:09:05,198 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,12) execution time: 1 ms
2019-07-29T09:09:05,466 INFO [reactor-http-nio-1] c.o.q.s.Scheduler: 200 OK
2019-07-29T09:09:05,466 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-29T09:09:09,565 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(// data print)
2019-07-29T09:09:09,730 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-29T09:09:09,730 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(13)
2019-07-29T09:09:09,731 INFO [reactor-http-nio-1] c.o.q.l.Logging: fetch(1000,13) execution time: 0 ms
2019-07-29T09:09:10,049 INFO [reactor-http-nio-1] c.o.q.s.Scheduler: 200 OK
Update
更正调用 API 后,记录即将到来,但在获取最后一页(75)后,我收到 404 未找到错误。
2019-07-30T14:07:50,071 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(75)
2019-07-30T14:07:50,075 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onComplete()
2019-07-30T14:07:50,322 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(200 OK)
2019-07-30T14:07:50,323 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-30T14:07:51,973 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(//data)
2019-07-30T14:07:52,440 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(200 OK)
2019-07-30T14:07:52,440 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-30T14:07:54,522 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(//data)
2019-07-30T14:07:54,699 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(//data)
2019-07-30T14:07:55,075 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(200 OK)
2019-07-30T14:07:55,076 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-30T14:07:55,371 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onNext(200 OK)
2019-07-30T14:07:55,371 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: request(1)
2019-07-30T14:07:55,471 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: cancel()
2019-07-30T14:07:55,472 INFO [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: cancel()
2019-07-30T14:07:55,473 ERROR [reactor-http-nio-1] r.u.Loggers$Slf4JLogger: onError(java.lang.Exception: 4XX received from API)
2019-07-30T14:07:55,473 ERROR [reactor-http-nio-1] r.u.Loggers$Slf4JLogger:
java.lang.Exception: 4XX received from API
最佳答案
Flux.flatMap
有一个参数来设置并发级别,让您可以协调并行化。
在下面的示例中,我使用了虚拟 URL、示例中的一些片段以及一些其他简单代码来演示如何实现此目的:
public static void main(String[] args)
{
fetchTotalCount()
.flatMapMany(totalCount -> createPageRange(totalCount))
.flatMap(pageNumber -> fetch(pageNumber), 5) // 5 is the concurrency level = how many pages we query concurrently
.flatMap(response -> process(response))
.subscribe();
}
private static Mono<Integer> fetchTotalCount()
{
return webClient.get()
.uri("http://www.example.com/get-total-count")
.exchange()
.map(ClientResponse::headers)
.map(headers -> headers.asHttpHeaders().get("total-count").get(0))
.map(Integer::valueOf);
}
private static Flux<Integer> createPageRange(int totalCount)
{
int totalPages = (int) Math.ceil((double) totalCount / 1000);
return Flux.range(1, totalPages);
}
private static Mono<Response> fetch(int pageNumber)
{
return webClient.get()
.uri("http://www.example.com/fetch?page=" + pageNumber)
.retrieve()
.bodyToMono(Response.class);
}
private static Mono<Response> process(Response response)
{
// todo send other http request for the post api here
return Mono.just(response);
}
private static class Response
{
}
关于java - 使用 webclient 和 Flux 进行多个异步剩余分页调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57231837/
我正在编写一个类库来在我无法控制的站点上执行操作。该网站正在接受表单帖子作为输入。 谁能告诉我这两种方法除了上传数据的形式之外是否有区别? System.Net.WebClient.Uploa
用于工作的代码。 有问题的网址是 https://yobit.net/api/3/info 它适用于 IE。它曾经与 webclient 一起使用。它现在在 webclient 中不起作用。我想知道问
因此,我将我的 WebClient 包装在一个 using 语句中。但是我突然想知道,如果我的对象实现了 IDisposable 并且包装在 using 语句中,我是否需要取消订阅事件? 下面是我当前
我正在 VS15 测试版中工作并尝试使用 WebClient。虽然 System.Net 被引用,并且智能感知建议 WebClient 类可用,但在构建时我收到以下错误: The type or na
我想知道是否可以将 cookie 从一个 Web 客户端复制到另一个 Web 客户端。 原因 我正在使用并行 Web 请求,它会在每个新线程上创建 Web 客户端的新实例。 问题 信息敏感,需要使用p
我正在尝试使用 WebClient,但它给我错误,所以我检查了几个论坛(包括这个),他们告诉我把它放在哪里 在文件的顶部: using System.Net 在我想使用 WebClient 的地方之后
我正在尝试使用 WebClient 实现以下场景。使用 RestTemplate 很简单,但我不能再这样做了。 伪java代码中Spring Controller 的相关部分: Mono t1 = w
我正在使用 Spring WebClient 调用休息服务。如下所述的 post 调用代码。 Mono response = client.post()
正在尝试使用 WebClient在 Blazor 项目中。 得到以下错误: 在 blazor.webassembly.js:1 WASM: System.Net.WebException: An ex
我正在使用 ASP.NET Core 并尝试将文件下载到绝对路径。 但我遇到的问题是文件总是被下载到项目目录,文件名本身得到整个路径的名称。 我的代码: string path = @"C:\User
我需要自动化涉及使用登录表单的网站的流程。我需要在登录页面之后的页面中捕获一些数据。 我知道如何对普通页面进行屏幕抓取,但不知道如何抓取安全站点背后的页面。 这可以通过 .NET WebClient
我正在尝试逐步下载一系列序列化数据。目标是从服务器发送一个大块,并在下载时在客户端对其进行部分处理。 我正在使用 System.Net.WebClient 类并将其 AllowReadStreamBu
我在 Windows 桌面应用程序上使用此代码来获取组合框的值,之后我需要选择哪个组合框将使用 JavaScript 使用新信息更新页面 private WebBrowser withEventsFi
我正在尝试通过 C# 代码获取网站的 HTML 源代码。当我使用 Windows 身份验证访问站点时,以下代码有效: using (WebClient client = new WebClient()
我只是使用WebClient.DownloadString(),但速度慢得惊人。最大速度为 40kbs 我尝试将 WebClient.Proxy 设置为 null,但这不起作用,而且我还没有达到最大互
为了利用新的 WebClient API,我在我的 Intellij 项目中包含了 spring-webflux。 dependencies { implementation 'org.spr
我已经开始使用 WebClient,并使用过滤器方法添加请求/响应日志记录: WebClient.builder() .baseUrl(properties.getEndpoint())
我正在使用 WebClient.DownloadFile 将图像下载到本地存储库,如下所示: WebClient myWC = new WebClient();
我尝试使用网络客户端非阻塞方法验证验证码响应。所以它的工作,但我需要我的方法返回 boolean 值而不是异常。我如何从订阅返回值? webClient
这个问题已经有答案了: What does a "Cannot find symbol" or "Cannot resolve symbol" error mean? (18 个回答) 已关闭 3 年
我是一名优秀的程序员,十分优秀!