gpt4 book ai didi

java - RxJava rx.exceptions.MissingBackpressureException 与过滤器和 map

转载 作者:塔克拉玛干 更新时间:2023-11-03 00:21:46 26 4
gpt4 key购买 nike


我刚开始接触 RxJava/RxAndroid,但在理解如何正确处理背压方面遇到了一些问题。

我有一个可观察到的文件扫描器,它可以扫描目录并发出文件。应尽快处理这些文件,不要跳过任何文件。

所以管道看起来像这样: Observable<File> -> Filter<File, Boolean> {check if file is of type .xyz}

不幸的是,我收到了 rx.exceptions.MissingBackpressureException 错误。所以我阅读了有关背压的信息,如果我理解正确的话,无损选项只是缓冲区和窗口。

我试过了 onBackpressureBuffer(), buffer() and window() .虽然所有 onBackpressureX()命令似乎没有效果,buffer() 将项目分组为 List<File> .我的问题是:

  1. 我应该如何过滤这些组? filter(<List<File>>, Boolean)没有意义...
  2. 我如何在我的文件扫描器中实现可观察到的背压处理,以便它等待我的管道/运营商/订阅者有容量?
  3. 使用例如map()进入 XYZ 实体并将它们存储在单独的列表中,而不是活跃的订阅者,但作为运营商的副作用?

一些反馈甚至提示会有很大帮助,我们将不胜感激。

最佳答案

我想我找到了问题的解决方案:此代码无效:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer(10000)
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getAbsolutePath().endsWith("xyz");
}
})
.buffer(100)
.subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

但是这段代码正在工作:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
.onBackpressureBuffer(10000)
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getAbsolutePath().endsWith("xyz");
}
})
.buffer(100)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

看来 subscribeOn()observeOn() 的顺序有很大的不同!

我的第三个问题有点离题,但仍然悬而未决。也许有人可以对此发表评论。

关于java - RxJava rx.exceptions.MissingBackpressureException 与过滤器和 map ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34253798/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com