gpt4 book ai didi

java - 有没有更好的方法来使用 RxJava 获取两个 Observable 的交集

转载 作者:行者123 更新时间:2023-11-30 10:50:43 24 4
gpt4 key购买 nike

使用 RxJava,我有一个源 Observable,它发出许多我希望与另一个发出相同类型的 Observable 相交的项目。在考虑了多种选择之后,看起来最连贯的结构方式应该是这样的:

Observable<String> source = ...emits 20 items

Observable.create(subscriber -> {
source
.buffer(5)
.subscribe(things -> {
tocheck.getMatches(things) //emits 3 matches
.subscribe(subscriber::onNext, subscriber::onError, () -> {});
}, subscriber::onError, subscriber::onCompleted));

这里的预期输出是当我订阅生成的 Observable 时,我发出了 12 个项目。由于 getMatches 的约定,我需要缓冲结果。

从表面上看,这似乎可行,但它似乎不是最干净的方法。过滤器似乎不适用于此处,因为出于性能原因我无法对每个项目运行相交检查。我试过使用 flatMap,但 getMatches 可观察对象完成了流,而不是来自源可观察对象的完成通知。

有没有更好的方法来构建它?

编辑:澄清这种代码风格发生了什么:

Observable<String> source = ...emits 20 items

source
.buffer(5)
.flatMap(this::getMatches); //final observable would emit a total of 12 items

这显然更简洁,但是当我添加一些日志记录时(假设数据大小与原始代码段相同:

source
.doOnEach(notification -> {
log.trace("Processing {}", notification.getValue());
})
.buffer(5)
.flatMap(this::getMatches)
.doOnEach(notification -> {
log.trace("Processing after match {}", notification.getValue());
});

我得到 20 个“正在处理”日志实例,然后奇怪的是只有几行来自“处理后”日志(我预计是 12 行)。它似乎比应有的时间更早地调用 on complete 。也许我的结构有问题?

最佳答案

所以看起来 AndroidEx 是正确的。我正在使用 Redis Lettuce 响应式(Reactive) API,但它看起来并不正常。上面添加的代码片段是构造两个 Observable 的交集的正确方法。

关于java - 有没有更好的方法来使用 RxJava 获取两个 Observable 的交集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34968698/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com