gpt4 book ai didi

java - 如何在 RxJava 中将 AbstractOnSubscribe 转换为具有反压支持的 Operator?

转载 作者:行者123 更新时间:2023-11-30 03:15:56 26 4
gpt4 key购买 nike

我扩展了AbstractOnSubscribe创建我自己的OnSubscribeObservable.create(OnSubscribe<T>) 一起使用我命名为 OnSubscribeInputStreamToLines 基本上是 InputStream并调用onNext对于每一行。

问题是,我还需要使用 InputStream 来做到这一点成为他人的一部分Observable .

简单的解决方案是执行以下操作:

Observable<InputStream> isObservable = ...;

isObservable
.flatMap(is -> Observable.create(new OnSubscribeInputStreamToLines(is)));

问题是这不会真正有效,因为它会为每个 inputStream 创建一个 Observable。我想我可以使用 Observable.lift 来做到这一点.

有没有办法让我可以轻松转换我的 OnSubscribeInputStreamToLinesOperator

我最担心的是背压问题,因为我会调用onNext对于 InputStream 的每一行尽管AbstractOnSubscribe支持背压,我找不到AbstractOperator其作用相同。

谢谢

最佳答案

这里的区别在于,OnSubscribeInputStreamToLines 是 Observable 世界的入口点,而 lift 是现有序列的中间运算符。此外,整个吞吐量可能由 InputStream 后面的 IO 操作或操作中的字符串处理主导,因此我不会担心那个薄包装器。

AbstractOnSubscribe 捕获运算符的生成器方面,它可以帮助您构建背压感知值发射器(通常是冷源),您可以在其中起草如何、何时以及发射什么值。

另一方面,

AbstractOperator 不能以这种方式泛化,因为 Operator 可以更自由地与上游值和下游请求进行交互。它们针对特定任务进行了高度定制,并且几乎没有共同点。它们可以由一组原语构建,但仅此而已(我已经编写了数百个)。

所以不要害怕 flatMap ping 事物。

关于java - 如何在 RxJava 中将 AbstractOnSubscribe 转换为具有反压支持的 Operator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32583342/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com