gpt4 book ai didi

.net - 响应式(Reactive)扩展同步订阅

转载 作者:行者123 更新时间:2023-12-05 00:25:20 25 4
gpt4 key购买 nike

有人可以帮我对 IObserver 进行同步订阅,以便调用方法将阻塞直到订阅完成。
例如:

出版商

public static class Publisher {
public static IObservable<string> NonBlocking()
{
return Observable.Create<string>(
observable =>
{
Task.Run(() =>
{
observable.OnNext("a");
Thread.Sleep(1000);
observable.OnNext("b");
Thread.Sleep(1000);
observable.OnCompleted();
Thread.Sleep(1000);
});

return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}

}

订户
public static class Subscriber{
public static bool Subscribe()
{
Publisher.NonBlocking().Subscribe((s) =>
{
Debug.WriteLine(s);
}, () =>
{
Debug.WriteLine("Complete");
});
// This will currently return true before the subscription is complete
// I want to block and not Return until the Subscriber is Complete
return true;
}

}

最佳答案

您需要使用 System.Reactive.Threading.Task为了这:

把你的 observable 变成一个任务......

var source = Publisher.NonBlocking()
.Do(
(s) => Debug.WriteLines(x),
() => Debug.WriteLine("Completed")
)
.LastOrDefault()
.ToTask();
Do(...).Subscribe()就像 Subscribe(...) .所以 Do只是增加了一些副作用。
LastOrDefault在那里是因为 Task创建者 ToTask将只等待源中的第一项 Observable ,如果没有产生任何项目,它将失败(抛出)。所以, LastOrDefault有效地导致 Task等到源完成,不管它产生什么。

所以在我们有一个任务之后,就等着它吧:
task.Wait(); // blocking

或者使用异步/等待:
await task; // non-blocking

编辑:

Cory Nelson 提出了一个很好的观点:

在最新版本的 C# 和 Visual Studio 中,您实际上可以 await一个 IObservable<T> .这是一个非常酷的功能,但它的工作方式与等待 Task 稍有不同。 .

当您等待任务时,它会导致任务运行。如果多次等待某个任务的单个实例,则该任务将只执行一次。 Observables 略有不同。您可以将可观察对象视为具有多个返回值的异步函数……每次订阅可观察对象时,可观察对象/函数都会执行。因此,这两段代码具有不同的含义:

等待一个 Observable:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe

通过任务等待 Observable:
// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.

因此,从本质上讲,等待 Observable 的工作方式如下:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe

但是, await observable语法在 Xamerin Studio 中不起作用(截至撰写本文时)。如果您使用的是 Xamerin Studio,我强烈建议您使用 ToTask在最后一刻,模拟 Visual Studio 的 await observable 的行为句法。

关于.net - 响应式(Reactive)扩展同步订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24772612/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com