gpt4 book ai didi

c# - 控制在使用 SubscribedOn 之后处理 Rx 订阅的线程

转载 作者:太空宇宙 更新时间:2023-11-03 11:10:44 25 4
gpt4 key购买 nike

我有一个 Rx 订阅,我 SubscribeOn 使用不同的线程来防止它阻塞。但是,由于资源管理问题,我希望阻止该订阅的处理。我一直无法弄清楚如何在控制台应用程序或 winforms 应用程序的上下文中完成此操作(我有两个用例)。下面是一个简化案例的工作代码,它模拟了我正在做的事情:

internal class Program
{

private static void Log(string msg)
{
Console.WriteLine("[{0}] " + msg, Thread.CurrentThread.ManagedThreadId.ToString());
}

private static void Main(string[] args)
{

var foo = Observable.Create<long>(obs =>
{
Log("Subscribing starting.. this will take a few seconds..");
Thread.Sleep(TimeSpan.FromSeconds(2));
var sub =
Observable.Interval(TimeSpan.FromSeconds(1))
.Do(_ => Log("I am polling..."))
.Subscribe(obs);
return Disposable.Create(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(3));
sub.Dispose();
Log("Disposing is really done now!");
});
});

Log("I am subscribing..");
var disp = foo.SubscribeOn(NewThreadScheduler.Default).Subscribe(i => Log("Processing " + i.ToString()));
Log("I have returned from subscribing...");

// SC.Current is null in a ConsoleApp :/ Can I get a SC that uses my current thread?
//var dispSynced = new ContextDisposable(SynchronizationContext.Current, disp);
Thread.Sleep(TimeSpan.FromSeconds(5));
Log("I'm going to dispose...");
//dispSynced.Dispose();
disp.Dispose();
Log("Disposed has returned...");
Console.ReadKey();
}
}

当上面的代码运行时,我得到:

[10] I am subscribing..
[10] I have returned from subscribing...
[11] Subscribing starting.. this will take a few seconds..
[6] I am polling...
[6] Processing 0
[6] I am polling...
[6] Processing 1
[10] I'm going to dispose...
[10] Disposed has returned...
[13] I am polling...
[6] I am polling...
[13] I am polling...
[14] Disposing is really done now!

因此,我要做的就是让 [10] Disposed has returned... 成为打印的最后一行,表明 Dispose 调用正在阻塞。

Rx 附带的 ContextDisposable 似乎非常适合我的用例,但我不知道如何获得代表我当前线程的 SynchronizationContext。有没有一种方法可以使用 ContextDisposable 来做我想做的事情,或者我是否需要一种完全不同的方法?

最佳答案

如果您查看 SubscribeOn 的源代码,就会发现 dispose 函数将在指定的调度程序上进行调度。尝试这样的事情:

private static IObservable<long> GetObservable(IScheduler scheduler)
{
return Observable.Create<long>(obs =>
{
var disposables = new CompositeDisposable();

disposables.Add(
Disposable.Create(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(3));
Log("Disposing is really done now!");
}));

disposables.Add(
scheduler.Schedule(() =>
{
Log("Subscribing starting.. this will take a few seconds..");
Thread.Sleep(TimeSpan.FromSeconds(2));
disposables.Add(
Observable.Interval(TimeSpan.FromSeconds(1)).Do(_ => Log("I am polling...")).Subscribe(obs));
}));

return disposables;
});


private static void Main(string[] args)
{
var foo = GetObservable(NewThreadScheduler.Default);

Log("I am subscribing..");
var disp = foo.Subscribe(i => Log("Processing " + i.ToString()));
Log("I have returned from subscribing...");

// SC.Current is null in a ConsoleApp :/ Can I get a SC that uses my current thread?
//var dispSynced = new ContextDisposable(SynchronizationContext.Current, disp);
Thread.Sleep(TimeSpan.FromSeconds(5));
Log("I'm going to dispose...");
//dispSynced.Dispose();
disp.Dispose();
Log("Disposed has returned...");
Console.ReadKey();
}

关于c# - 控制在使用 SubscribedOn 之后处理 Rx 订阅的线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14131306/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com