gpt4 book ai didi

c# - 无法取消订阅 Rx

转载 作者:太空宇宙 更新时间:2023-11-03 21:12:46 25 4
gpt4 key购买 nike

背景

我正在编写一些执行以下操作的软件:

  1. 用户点击“开始”。
  2. 启动一个任务来执行一些工作并启动事件以更新 GUI。
  3. 可观察对象使用来自任务的事件,并将数据打印到 GUI 中的富文本框。

第一次点击“开始”时一切正常,但之后就不行了。第一次单击开始时,我得到如下所示的输出:

Clicking START once

这看起来不错,这里没有问题。但是,当我第二次单击“开始”时,我得到以下输出。

Clicking START again

现在,我相信我知道为什么会发生这种情况。据我所知,我的观察者从我第一次单击“开始”开始就从未取消订阅,因此所有内容都会打印两次。单击开始按钮时,会发生以下情况:

    /// <summary>
/// Starts the test.
/// </summary>
/// <param name="sender">The "start" button.</param>
/// <param name="e">Clicking on the "start" button.</param>
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();

// block on purpose, wait for the task to finish
this.runningTest.Wait();
}

我不熟悉 .NET 中的响应式扩展(使用它还不到一小时)。我基本上使用了 Stephen Cleary 在 C# 中的并发指南中的示例,以便从这里开始。尽管如此,我相信我有点知道问题出在哪里......

拟议计划

如果我可以取消订阅 observable,那么我认为这个问题就会消失。不过,我认为这有点复杂,基于 ReactiveX.io 中使用的 react 模式。听起来我实际上应该只在任务实际存在期间巧妙地观察任务,然后自然取消订阅。这是我在调试这个问题时所得到的,我不能 100% 确定这个取消订阅的东西真的能解决问题。

问题

有什么方法可以取消订阅 Observable 吗?或者,有没有一种方法可以让我只在任务存在期间观察它?

最佳答案

Rx 使用 IDisposable 接口(interface)使您能够取消订阅尚未自然结束的可观察订阅。如果您的 Observable 发送了 OnCompletedOnError 通知,那么订阅会自动为您处理。

这种方法的一个明显优势是您可以创建一个 CompositeDisposable 来聚合您的所有订阅以启用单个退订。这比删除事件处理程序所需的大杂烩要好得多。

在您的代码中,您并没有结束订阅,因此每次点击 button_go 都会创建一个新的订阅。

您可以使用四种解决方案,每种都有所不同。

(1)

请记住,需要调用 .Dispose() 的关键是您的可观察订阅是否自然结束并且您希望它结束​​。因此,您可以简单地在查询中添加一个 .Take(1),使其在产生一个值后自然结束。

private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.Take(1)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();

// block on purpose, wait for the task to finish
this.runningTest.Wait();
}

订阅会自动为您处理。

(2)

您可以使用 SerialDisposable 来管理每个订阅。 MSDN 文档将其描述为:

Represents a disposable whose underlying disposable can be swapped for another disposable which causes the previous underlying disposable to be disposed.

您的代码将如下所示:

private SerialDisposable _results = new SerialDisposable();

private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
_results.Disposable = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();

// block on purpose, wait for the task to finish
this.runningTest.Wait();
}

(3)

您始终可以确保只创建一次订阅。

private IDisposable _results = null;

private void button_go_Click(object sender, RoutedEventArgs e)
{
if (_results == null)
{
var uiContext = SynchronizationContext.Current;
_results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
}

// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();

// block on purpose, wait for the task to finish
this.runningTest.Wait();
}

(4)

最后一种方法是将整个操作封装在一个可观察对象中,并针对每个新订阅发起。这是使用 Rx 的正确方法。 Observable 应该保持自己的状态,以便订阅可以 100% 相互独立。

现在,您的代码使用 this.myTestthis.runningTest,因此它显然具有状态。您应该尝试删除这些。但如果不这样做,您的代码将如下所示:

private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
Observable
.Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
{
var subscription =
Observable
.FromEventPattern<TestResultHandler, TestResultArgs>(
h => this.myTest.Results += h,
h => this.myTest.Results -= h)
.Take(1)
.Subscribe(o);
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();
return subscription;
})
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

// block on purpose, wait for the task to finish
this.runningTest.Wait();
}

理想情况下,您应该将 myTest 的创建和销毁合并到可观察对象中。

所以,我会倾向于做这样的事情:

private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
Observable
.Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
{
var myTest = new MyTest();
var subscription =
Observable
.FromEventPattern<TestResultHandler, TestResultArgs>(
h => myTest.Results += h,
h => myTest.Results -= h)
.Take(1)
.Subscribe(o);
myTest.Run();
return subscription;
})
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
}

这最后一个实际上是您问题的答案:“或者,有没有一种方法可以让我只在任务存在期间观察它?”

关于c# - 无法取消订阅 Rx,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36580286/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com