- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我试图使用 RxJava 来理解 ReactiveX,但我无法理解整个 Reactive 的想法。我的案例如下:
我有 Task
类。它有 perform()
方法,该方法执行 HTTP 请求并通过 executeRequest()
方法获取响应。该请求可以执行多次(定义的重复次数)。我想获取 executeRequest()
的所有结果并将它们合并到 Flowable
数据流中,这样我就可以在 perform 中返回这个
方法。所以最后我希望我的方法返回我的 Flowable
()Task
执行的请求的所有结果。
executeRequest()
返回 Single
因为它只执行一个请求,并且可能只提供一个响应或根本不提供响应(在超时的情况下)。在perform()
中,我为每次重复创建Flowable
数字范围。订阅此 Flowable
我每次重复都会执行一个请求。我还订阅了每个响应 Single
,以便记录响应并将其收集到集合中以供以后使用。现在我有一组 Single
,如何将它们合并到 Flowable
中以在 perform()
中返回它?我尝试使用像 merge()
这样的运算符,但我不理解它的参数类型。
我在网上阅读了一些指南,但它们都非常笼统,或者没有根据我的情况提供示例。
public Flowable<HttpClientResponse> perform() {
Long startTime = System.currentTimeMillis();
List<HttpClientResponse> responses = new ArrayList<>();
List<Long> failedRepetitionNumbers = new ArrayList<>();
Flowable.rangeLong(0, repetitions)
.subscribe(repetition -> {
logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);
Long currentTime = System.currentTimeMillis();
if (durationCap == 0 || currentTime - startTime < durationCap) {
Single<HttpClientResponse> response = executeRequest(method, url, headers, body);
response.subscribe(successResult -> {
logger.info("Received response with code {} in the {}. repetition.", successResult
.statusCode(), repetition + 1);
responses.add(successResult);
},
error -> {
logger.error("Failed to receive response from {}.", url);
failedRepetitionNumbers.add(repetition);
});
waitInterval(minInterval, maxInterval);
} else {
logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
}
});
return Flowable.merge(???);
}
和executeRequest()
private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
headers, JsonNode body) {
CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();
HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.request(method, url, responseFuture::complete);
headers.forEach(request::putHeader);
request.write(body.toString());
request.setTimeout(timeout);
request.end();
return Single.fromFuture(responseFuture);
}
最佳答案
不要在 perform
方法中订阅每个可观察量(每个 HTTP 请求),只需继续像这样链接可观察量即可。您的代码可以简化为类似的内容。
public Flowable<HttpClientResponse> perform() {
// Here return a flowable , which can emit n number of times. (where n = your number of HTTP requests)
return Flowable.rangeLong(0, repetitions) // start a counter
.doOnNext(repetition -> logger.debug("Performing repetition {} of {}", repetition + 1, repetitions)) // print the current count
.flatMap(count -> executeRequest(method, url, headers, body).toFlowable()) // get the executeRequest as Flowable
.timeout(durationCap, TimeUnit.MILLISECONDS); // apply a timeout policy
}
最后,您可以在实际需要执行这一切的地方订阅perform
,如下所示
perform()
.subscribeWith(new DisposableSubscriber<HttpClientResponse>() {
@Override
public void onNext(HttpClientResponse httpClientResponse) {
// onNext will be triggered each time, whenever a request has executed and ready with result
// if you had 5 HTTP request, this can trigger 5 times with each "httpClientResponse" (if all calls were success)
}
@Override
public void onError(Throwable t) {
// any error during the execution of these request,
// including a TimeoutException in case timeout happens in between
}
@Override
public void onComplete() {
// will be called finally if no errors happened and onNext delivered all the results
}
});
关于java - 将多个 ReactiveX 流合并为一个结果流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50759639/
我想从树中创建一个可观察对象。每个节点都会生成一个依赖于其父节点的可观察对象(例如通过 switchMap)。在树的每一层,每个 child 的可观察值都需要合并。 我想转换这段代码,以便它使用可观察
RxJS Observables 是可以随时间同步或异步返回零到无限值的函数。 但是异步行为究竟是如何实现的呢?它是以某种方式使用单独的线程,还是使用 Web API 和 JS 事件循环? 最佳答案
我正在学习 Rx(尤其是 RxSwift),我对架构、层和边界有疑问。 我习惯于分层架构(数据、域、表示),通常是在 MVP 或 VIPER 上。对于这个项目,我使用的是 MVVM,这是 Reacti
我正在使用 Rx.Net 并且我有 Observable 发出时间序列点( double ,时间戳)。每次新点到达时,我想计算过去 30 秒的平均值。我想我需要某种不是基于计数而是基于时间戳的重叠窗口
public class ReactiveExample { public static void main(String[] args) throws InterruptedExceptio
我正在尝试制作带有背压的流动性。我的想法是,在当前项目之一完成处理之前,不会发出新的可流动项目。我正在使用 ResourceSubscriber 和 subscribeWith() 方法来实现这一点。
我很好奇谁开始了 ReactiveX/RxJS? Rx 似乎在很多语言/平台中实现,我很好奇它的起源是什么?首先是哪个项目? RxJS 在 Angular Web 框架中大量使用。它背后是否有像微软、
我试图使用 RxJava 来理解 ReactiveX,但我无法理解整个 Reactive 的想法。我的案例如下: 我有 Task 类。它有 perform() 方法,该方法执行 HTTP 请求并通过
我如何利用 ReactiveX按顺序执行异步调用?即,在第一个调用完成后执行第二个调用。 更具体地说,我正在使用 RxSwift在 iOS 中,我想链接在一起的异步是 UIView 动画(而不是在第一
我在我的 iOS 应用程序中使用 RxSwift 进行缓存,并且有一段这样的代码: let observable = Observable.of(cache.getItem(itemID), netw
我正在使用 ReactiveX在 iOS/Swift 中(RxSwift)。 假设我有一个可观察对象: let dataUpdates = ... 我订阅的: dataUpdates.subscrib
我正在学习 ReactiveX。我删除了错误检查、日志记录和其他部分,以便于阅读。 我有一个以 JSON 格式返回对象集合的服务: getPanels() { return this.http
寻找一种干净的方式来转换源 Observable发出单个 null (或哨兵值)在一段时间内不发射项目后。 例如,如果源 observable 发出 1, 2, 3然后停止发射 10 秒,然后发射 4
我想知道在 Rx 中解决以下问题的规范方法是什么:假设我有两个可观察对象,mouse_down和 mouse_up ,其元素表示鼠标按钮按下。在一个非常简单的场景中,如果我想检测长按,我可以通过以下方
我在创建以下可观察值时遇到问题。 我希望它接收预定义的值数组 我想按不同的事物进行过滤,并能够将它们作为单独的可观察值进行处理。 然后,当需要合并这些过滤后的可观察值时,我想保留原始观察值的顺序 //
我有一个 Observable .我想把它变成一个 Map它告诉我每个不同字符串的出现次数。 Observable 包含约 10 亿个元素,其中 1000 个是不同的(因此不能将整个数据集存储在 RA
swipeRefreshLayout.setOnRefreshListener(() -> { swipeRefreshLayout.setRefreshing(true); retrieveDa
我是 ReactiveX 的新手。我是通过阅读源代码来学习它的。一切都那么清晰,但突然间我得到了一个名为“Consumer”的词,它是一个接口(interface)。它被用来代替观察者。 谁能告诉我它
如何在不使用任何加载器的旧 javascript 应用程序中加载 RxJS? 对于 RxJS 4.x,我可以简单地这样做: RxJS 5 怎么样?他们的文档假设您正在使用某种类型的加载器来处理所有事
我正在寻找一个与 debounce(ms) 类似的运算符,但它触发第一个事件,然后等待 ms 然后触发第一个事件,依此类推上。 基本上,我想过滤发生时间非常接近的事件(就时间而言),但不是等到最后一个
我是一名优秀的程序员,十分优秀!