gpt4 book ai didi

java - 如何在没有主题的 RX 中创建反馈循环?

转载 作者:行者123 更新时间:2023-12-02 05:21:39 26 4
gpt4 key购买 nike

更新:我已经完全重新表述了问题和代码以响应 Alexei Kaigorodov 的答案,并且在他更新之前它并不对应。

这里存在一些关于 SO 的现有问题以及互联网上的文章,通常涉及如何在 RX 流中创建某种反馈循环。其中大多数问题的解决方法是改变对根本问题的观点,从而导致现有运营商可以处理的流程的不同设计。例如:

  • 真正的反馈循环可以通过使用 Subject 来实现,但不鼓励这样做,因为它会牺牲功能纯度及其优点。
  • 反馈循环通常只是状态的伪装,可以通过scan运算符来维护。
  • 反馈循环可能只是嵌套流上下文的一种伪装,这可以通过嵌套 flatMaps 来实现
然而,我有时会遇到一个问题,我认为这是一个基本的反馈循环,应该这样建模,所以我一直在思考这个问题,并且我对这个一般性问题感到好奇:真正的反馈循环在考虑到 RX 模型的功能纯度和单向性质,理论上与它最不兼容?或者在这种情况下我是否需要像 Alexei 建议的那样转向通用数据流库?

我尝试了许多有趣的技巧,如何将反馈偷偷带入 RX 流中。除了使用 Subject 之外,我能做的最好的事情就是使用 defer 运算符,如下所示:

public class Main {
public static void main(String[] args) throws InterruptedException {
// simulate hot observable emitting external events
final Observable<Message> sourceStream = Observable
.interval(200, MILLISECONDS)
.map(signal -> new SourceMessage(signal));

// feedback needs to be referencable through final variable in the
// next step, but we also can't define it yet, so prepare a mutable
// container for it
final MutableObservableHolder feedback = new MutableObservableHolder();

// accumulate source and feedback messages in the state
final Observable<State> stateStream = sourceStream
.mergeWith(Observable.defer(() -> feedback.get()))
.scan(new State(), (state, message) -> state.update(message))
.share(); // prevents infinite subscription loops

feedback.set(stateStream
.flatMap(state -> {
if (state.isAddNextSumAgainWithDelay()) {
return stateStream
.take(1)
.map(nextState -> new FeedbackMessage(nextState.getPayload()))
.delay(100, MILLISECONDS);
} else {
return Observable.empty();
}
}));

Disposable subscription = stateStream
.take(10)
.subscribe(System.out::println);

while (!subscription.isDisposed()) {
Thread.sleep(1000);
}
}
}

(整个可运行项目发布于 https://github.com/calaveraInfo/literate-octo-happiness )

有趣的是,很难提炼出阻碍普通 RX 运算符(operator)解决某些问题的本质。上面的示例非常接近我能想到的最正交的示例:对上游消息的有效负载进行求和并生成中间和流,并且在某些情况下将总和本身添加到总和中,但将其作为单独的步骤进行(我添加了延迟请突出显示它)。然而,如果可以将其添加到 RX,那么即使这也可以通过某种 scanFlatMap 或 flatMapScan 运算符来实现。

我的反馈循环的实现是否安全,还是一个危险的地雷黑客行为,在某些情况下可能会炸毁我的脸?为什么没有 flatMapScan 运算符,如果它看起来如此基本,并且请求它是一个好主意吗? RX 可以建模的内容是否存在一些固有的限制,或者每个问题始终只是正确运算符(operator)的问题?

更新:我所说的 flatMapScan 是指将 flatMap 和扫描作为一个步骤进行的可能性。动机是,当它们是两个不同的运算符时,它们的组合功能在这两种情况下都会有所减少: 1. .flatMap().scan() 可以将每个上游消息扩展为流,然后进行扫描,但 flatMap 不能使用扫描状态来决定如何展开消息,2.在.scan().flatMap()中,展开可以基于扫描状态,但展开的流不会被扫描。

但是我不确定 API 会是什么样子,因为它需要同时处理几件事。

最佳答案

你的代码看起来很神秘。即使你设法让它发挥作用,它仍然很难维护。 Rx 和 Nio2 使用不同的异步计算模型,因此将 Rx 与 Nio2 一起使用只会使编程变得复杂。

我建议要么使用纯Nio2,要么使用我的异步库df4j ,它具有 Nio2 和 rx-java2 的适配器。

关于java - 如何在没有主题的 RX 中创建反馈循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56257922/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com