gpt4 book ai didi

java - RxJava 2 也许不产生输出

转载 作者:行者123 更新时间:2023-12-02 02:08:28 25 4
gpt4 key购买 nike

我有一个消费者和多个生产者 - 在消费者尝试执行一些昂贵的操作之前需要对记录进行批处理。

这是我尝试使用 RxJava 窗口和归约函数实现上述内容:

import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class Rxtest {
private PublishSubject<String> bus = PublishSubject.create();
private Maybe<Observable<String>> maybe;

public Rxtest() {
maybe = bus
.window(10, TimeUnit.SECONDS, 10)
.reduce( (fObs, sObs) -> fObs.zipWith(sObs, (f, s) -> f + s));

maybe.subscribe(obs -> obs.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done here")));
}

public static void main(String[] args) throws InterruptedException {
Rxtest test = new Rxtest();
Stream.iterate(0, i -> i+1)
.forEach(i -> {
test.bus.onNext(String.valueOf(i));
return;
});
Thread.sleep(100000L);
}
}

我期望的输出:

12345678910
11121314151617181920

....
....
Done here

相反,我什么也没得到。

最佳答案

以下内容将批量处理 10 秒的请求并处理它们。

observable
.buffer( 10, TimeUnit.SECONDS )
.subscribe( listOfString -> process( listOfString ) );

这将批量处理 10 秒或 100 个项目并处理它们:

observable
.buffer( 10, TimeUnit.SECONDS, 100 )
.subscribe( listOfString -> process( listOfString ) );

如果这两种情况都不能涵盖的话,真的不清楚你想要什么。

关于java - RxJava 2 也许不产生输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50401179/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com