gpt4 book ai didi

system.reactive - 从多个任务中创建一个可观察对象

转载 作者:行者123 更新时间:2023-12-04 07:07:37 24 4
gpt4 key购买 nike

我想创建一个接受 S 的方法并返回 Observable<T>在完成三个(异步)任务后,以一个值完成。

这三个任务都在队列上运行,在不同的线程中有自己的消费者。

这是我的想法:

  • 任务 1 收到 <S, Subject<T>>通过队列并计算来自 S 的值并设置在 Subject<T> 中与 onNext然后调用 onComplete() .
  • 任务 2 和 3 收到 <S, Subject<Void>>并调用onComplete()一旦他们完成了他们的工作。

任务 1、2 和 3 可以独立运行,并以随机顺序完成。

现在我有三个主题,一个是T , 和两个 Void .我想返回第一个,但只让它发出值 T一旦所有 任务完成。这是因为我不希望可观察对象的任何订阅者对 T 做任何事情。直到所有任务完成。

组合 Subjects 以实现此行为的正确方法是什么?我可以使用 CountDownLatch 轻松破解它们等等,但希望有一种 rx-native 的方法来解决这个问题。

我计划通过队列使用 Subjects 作为回调是正确的方法吗?我曾经使用 CompletableFuture<T>为此,但我想迁移到 RX。

最佳答案

我不完全确定主题在哪里发挥作用,但您可以使用 When + And + Then 同步这三个任务>

public IObservable<T> MyMethod<S, T>(S incoming) {

//Create a new plan
return Observable.When(

//Start with Task one which will return an T from an S
Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))

//Add in Task two which returns a System.Reactive.Unit
.And(Observable.FromAsync(() => /*Do Task 2*/))

//Same for Task 3
.And(Observable.FromAsync(() => /*Do Task 3*/))

//Only emit the item from the first Task.
.Then((ret, _, __) => ret))

//Finally we only want this to process once, then we will reuse the
//existing value for subsequent subscribers
.PublishLast().RefCount();
}

上面的代码会等到所有三个项目都完成后才会发出。需要注意的一件事是,在 Rx 中,Void 对象实际上是一个 System.Reactive.Unit,因此如果没有值,您应该返回它。

关于system.reactive - 从多个任务中创建一个可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32422201/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com