- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
背景
我正在编写一些执行以下操作的软件:
第一次点击“开始”时一切正常,但之后就不行了。第一次单击开始时,我得到如下所示的输出:
这看起来不错,这里没有问题。但是,当我第二次单击“开始”时,我得到以下输出。
现在,我相信我知道为什么会发生这种情况。据我所知,我的观察者从我第一次单击“开始”开始就从未取消订阅,因此所有内容都会打印两次。单击开始按钮时,会发生以下情况:
/// <summary>
/// Starts the test.
/// </summary>
/// <param name="sender">The "start" button.</param>
/// <param name="e">Clicking on the "start" button.</param>
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();
// block on purpose, wait for the task to finish
this.runningTest.Wait();
}
我不熟悉 .NET 中的响应式扩展(使用它还不到一小时)。我基本上使用了 Stephen Cleary 在 C# 中的并发指南中的示例,以便从这里开始。尽管如此,我相信我有点知道问题出在哪里......
拟议计划
如果我可以取消订阅 observable,那么我认为这个问题就会消失。不过,我认为这有点复杂,基于 ReactiveX.io 中使用的 react 模式。听起来我实际上应该只在任务实际存在期间巧妙地观察任务,然后自然取消订阅。这是我在调试这个问题时所得到的,我不能 100% 确定这个取消订阅的东西真的能解决问题。
问题
有什么方法可以取消订阅 Observable 吗?或者,有没有一种方法可以让我只在任务存在期间观察它?
最佳答案
Rx 使用 IDisposable
接口(interface)使您能够取消订阅尚未自然结束的可观察订阅。如果您的 Observable 发送了 OnCompleted
或 OnError
通知,那么订阅会自动为您处理。
这种方法的一个明显优势是您可以创建一个 CompositeDisposable
来聚合您的所有订阅以启用单个退订。这比删除事件处理程序所需的大杂烩要好得多。
在您的代码中,您并没有结束订阅,因此每次点击 button_go
都会创建一个新的订阅。
您可以使用四种解决方案,每种都有所不同。
(1)
请记住,需要调用 .Dispose()
的关键是您的可观察订阅是否自然结束并且您希望它结束。因此,您可以简单地在查询中添加一个 .Take(1)
,使其在产生一个值后自然结束。
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.Take(1)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();
// block on purpose, wait for the task to finish
this.runningTest.Wait();
}
订阅会自动为您处理。
(2)
您可以使用 SerialDisposable
来管理每个订阅。 MSDN 文档将其描述为:
Represents a disposable whose underlying disposable can be swapped for another disposable which causes the previous underlying disposable to be disposed.
您的代码将如下所示:
private SerialDisposable _results = new SerialDisposable();
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
_results.Disposable = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();
// block on purpose, wait for the task to finish
this.runningTest.Wait();
}
(3)
您始终可以确保只创建一次订阅。
private IDisposable _results = null;
private void button_go_Click(object sender, RoutedEventArgs e)
{
if (_results == null)
{
var uiContext = SynchronizationContext.Current;
_results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
}
// start running the test
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();
// block on purpose, wait for the task to finish
this.runningTest.Wait();
}
(4)
最后一种方法是将整个操作封装在一个可观察对象中,并针对每个新订阅发起。这是使用 Rx 的正确方法。 Observable 应该保持自己的状态,以便订阅可以 100% 相互独立。
现在,您的代码使用 this.myTest
和 this.runningTest
,因此它显然具有状态。您应该尝试删除这些。但如果不这样做,您的代码将如下所示:
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
Observable
.Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
{
var subscription =
Observable
.FromEventPattern<TestResultHandler, TestResultArgs>(
h => this.myTest.Results += h,
h => this.myTest.Results -= h)
.Take(1)
.Subscribe(o);
this.runningTest = new Task(() => { this.myTest.Run(); });
this.runningTest.Start();
return subscription;
})
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
// block on purpose, wait for the task to finish
this.runningTest.Wait();
}
理想情况下,您应该将 myTest
的创建和销毁合并到可观察对象中。
所以,我会倾向于做这样的事情:
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
Observable
.Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
{
var myTest = new MyTest();
var subscription =
Observable
.FromEventPattern<TestResultHandler, TestResultArgs>(
h => myTest.Results += h,
h => myTest.Results -= h)
.Take(1)
.Subscribe(o);
myTest.Run();
return subscription;
})
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
}
这最后一个实际上是您问题的答案:“或者,有没有一种方法可以让我只在任务存在期间观察它?”
关于c# - 无法取消订阅 Rx,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36580286/
我用 Cocoapods 创建了一个简单的项目并安装了 RxSwift 和 RxCocoa。 我写了这段代码: import UIKit import RxSwift class ViewContro
我对 Rx 真的很陌生,只是想知道 subscribeOn 的顺序如何影响 Observable //This will not print anything Observable.just("wha
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。 例子 observable A 每秒发出一次“滴答声” observable B 应该每 5 秒
我有以下流。 Observable.just(Unit) // execute immediately .mergeWith(tryAgainRelay) // execute again w
我可以看到 RX 适用于 Android 和 UI 事件处理。我很难看到 RX 在后端提供了什么好处。 RX Java 是为后端处理而设计的,还是这个概念太过分了? 最佳答案 实际上,RxJava 最
我有一个关于什么是更好的 RxJava 模式以保持可观察状态的问题。 为简单起见,假设我们有一个 StateManager需要跟踪系统中某个状态(假设它是一个简单的 bool 标志)并以可观察的方式公
我遇到了以下问题。我有一个从具有 isMember 属性的服务器请求的基本对象。如果该属性为 true,我想向端点 1、2 和 3 发出请求。 如果该属性为 false,我只想向端点 1 发出请求。
我正在使用 RxJava,但也许能够翻译另一个实现的答案。 我有一个 Observable 发出一系列项目,我想将它们分成 10 组。如下所示: observable .buffer(10)
有没有办法计算 RxAndroid 中流中已处理的元素数量? 我有类似的事情: Observable.fromArray(new String[]{"these", "are", "my", "val
如果我的客户端断开连接(错误)超过 10 秒,我会尝试显示一个弹出窗口。但是,当重新连接时,我也会关闭弹出窗口(true)。如果为真,我需要立即关闭弹出窗口。 我认为我需要做的是根据值(假)进行去抖,
收到 onNext() 后如何自动退订? 现在我使用这个代码: rxObservable .compose(bindToLifecycle()) // unsubscribe automaticall
我在 Ettus x310 上有一个简单的 C++ 测试程序,以前可以用,但现在不行了。我试图简单地设置单个 USRP 的两个 channel 的两个中心频率。当我尝试在第二个 channel 上设置
我有一个返回 Observable 的网络调用,我有另一个网络调用,它不是依赖于第一个 Observable 的 rx,我需要以某种方式转换这一切都与 Rx 相关。 Observable respon
我正在尝试根据 Rxjava 中的某些条件创建组列表。 以下是我的回复: { "dates":[ { "date":18, "value":
RX 6800 XT是最近很强的一款显卡,让很多的用户都很喜爱,详细还有很多没有入手的用户对他不是很了解吧,下面就带来了RX 6800 XT评测、跑分、价格、参数、图片,快来一起看看吧。 RX
我有一个重复元素列表,比如: Observable source = Observable.just("A", "B", "A", "C", "C", "A"); 我想按它们的值以及它们出现的次数对它
如果我将 RX-main 包添加到 WPF 应用程序,我会安装以下依赖包: 接收接口(interface) 接收核心 Rx-Linq Rx 平台服务 主要接收 添加 Rx-main 后,可以安装两个与
com.myapp.test.debug E/MessageQueue-JNI:rx.exceptions.OnErrorNotImplementedException 在 rx.Observable
这个问题在这里已经有了答案: RxJava operator that emits if different from last (1 个回答) 关闭 5 年前。 我有一个 bool 型 Obser
首先,我在两个 shell 中同时运行 vnstat -tr -i eth0 和 iftop。奇怪的是,vnstat 中的 rx 比 iftop 中的峰值 rx 大得多。我该如何解决这个问题? vns
我是一名优秀的程序员,十分优秀!