gpt4 book ai didi

c# - 使用 Reactive Extensions 我可以创建一个可观察对象的订阅者,它会阻塞直到满足某些条件或发生超时

转载 作者:行者123 更新时间:2023-11-30 21:44:37 25 4
gpt4 key购买 nike

我可以/应该为此使用响应式扩展吗?基本上我有一个 ESB,我想监视消息(这基本上是我的热可观察对象位于顶部的位置),我想创建一堆订阅者,它们将仅在特定条件下在创建它们的线程中阻塞/暂停满足和/或发生超时。

我知道我可以使用 Tasks/TaskCompletionSource 和一个取消标记来实现这一点,但我认为 RX 似乎很合适。

编辑:超时不是可观察的,而是应该在订阅者处,一旦满足超时/条件,他们可以取消订阅并处理自己,以先到者为准。

编辑 2:订阅者和可观察对象可能会在不同的线程上处理 - 但不一定。如果在订阅者上使用异步/等待,我希望能够在订阅者所在的位置暂停/阻止执行。基本上,一些工作完成的信号

我所追求的是利用订阅者生命周期管理,因此我无需执行任何操作。 IConnectableObservable 似乎在这方面提供了一些希望。

也许这是方钉圆孔的领地。

最佳答案

我认为这就是您所追求的,但不完全确定。示例代码始终是一个有用的澄清器。诀窍是让您的运营商不在源头,而是通过订阅:

async void Main()
{
var hotObservable = new Subject<string>();

Func<string, bool> sub1_Condition = s => s != "Disconnect Sub1";
var subscription1 = hotObservable
.Timeout(TimeSpan.FromSeconds(5))
.TakeWhile(sub1_Condition)
.Subscribe(s => Console.WriteLine($"Sub 1: {s}"), _ => Console.WriteLine("Sub1 Timeout."), () => Console.WriteLine("Sub1 condition met or source ended."));

Func<string, bool> sub2_Condition = s => s != "Disconnect Sub2";
var subscription2 = hotObservable
.Timeout(TimeSpan.FromSeconds(2))
.TakeWhile(sub2_Condition)
.Subscribe(s => Console.WriteLine($"Sub 2: {s}"), _ => Console.WriteLine("Sub2 Timeout."), () => Console.WriteLine("Sub2 condition met or source ended."));

Func<string, bool> sub3_Condition = s => s != "Disconnect Sub3";
var subscription3 = hotObservable
.Timeout(TimeSpan.FromSeconds(7))
.TakeWhile(sub2_Condition)
.Subscribe(s => Console.WriteLine($"Sub 3: {s}"), _ => Console.WriteLine("Sub3 Timeout."), () => Console.WriteLine("Sub3 condition met or source ended."));

hotObservable.OnNext("Hello");
hotObservable.OnNext("Disconnect Sub1");

await Task.Delay(TimeSpan.FromSeconds(3));

hotObservable.OnNext("Just Sub 3 should be left");
hotObservable.OnCompleted();
}

这会产生以下输出:

Sub 1: Hello
Sub 2: Hello
Sub 3: Hello
Sub1 condition met or source ended.
Sub 2: Disconnect Sub1
Sub 3: Disconnect Sub1
Sub2 Timeout.
Sub 3: Just Sub 3 should be left
Sub3 condition met or source ended.

关于c# - 使用 Reactive Extensions 我可以创建一个可观察对象的订阅者,它会阻塞直到满足某些条件或发生超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40745167/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com