- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
请遵守以下单元测试:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace UnitTests
{
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, e => Assert.Fail(), () =>
{
Assert.AreEqual(100, i);
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, () =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
tcs.TrySetResult(null);
}, Assert.Fail);
tcs.Task.Wait();
}
}
}
SubscribeCancel
和
SubscribeThrow
超时,因为从未调用
OnError
回调,因此对任务的等待永远不会结束。
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
try
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
obs.OnCompleted();
}
catch (Exception exc)
{
obs.OnError(exc);
}
});
}
var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
try
{
...
}
catch (Exception e)
{
DoErrorLogic();
tcs.TrySetException(e);
}
}, e =>
{
DoErrorLogic();
tcs.TrySetException(e);
}, () =>
{
DoCompletedLogic();
tcs.TrySetResult(null);
});
tcs.Task.Wait();
最佳答案
此行为是设计使然。如果订阅者抛出异常(顺便说一句,这是一种不好的做法),则Rx框架会正确地说明它已死,并不再与之通信。如果取消订阅,这也不是错误-只是请求不发送任何其他类型的事件-Rx很荣幸。
编辑以回应评论
我认为文档中没有简单的参考要指出的-您所看到的行为是内在的,它是隐式的。我能得到的最接近的结果是将您指向AnonymousSafeObserver和AutoDetatchObserver的源代码。后者有一个说明性的场景,可能会有所帮助,但其中有些涉及。
也许类比会有所帮助。想象一下,数据流事件是由新闻通讯社发送报纸。订户是家庭。
订阅者引发异常
报社愉快地分发报纸,直到一天,其中一名订户-琼斯先生-放下汽油,他的房屋爆炸而杀死琼斯先生并摧毁了房屋(引发了未经处理的例外)。报社意识到,他不能再将报纸交付给琼斯先生,也不能发送终止通知,报刊供应也没有问题(因此OnError或OnCompleted不合适),报社的订阅人数减少了一个。
将此与报纸印刷商不经意间使用易燃墨水进行对比,并导致工厂起火。现在,糟糕的新闻发布商确实必须向所有无限期停止供应的订户发送解释性说明(OnError)。
订阅者取消订阅
琼斯先生正在接受订阅的报纸,直到有一天他认为自己厌倦了无数令人沮丧的故事,并要求取消订阅。报社有义务。他没有给琼斯先生发任何便条,以说明报纸已停止印刷版本(没有OnCompleted),但没有。他也没有给琼斯先生发送任何说明该报纸已经倒闭的便条(没有OnError)-他只是按照琼斯先生的要求停止发送报纸。
对Edit3的回应
我同情你的斗争。我注意到在您的整个代码中,您一直在尝试将TPL(任务)惯用语与Rx网格化。这样的尝试通常感觉很笨拙,因为它们实际上是完全不同的世界。像这样的段落很难评论:
我开始相信,当将异步可观察序列集成到其他方式的同步代码中时,应该编写这样的代码(通常在一个地方或另一个地方在服务器端就是这种情况):
与布兰登精心设计的断言完全一致,我无法想到在您尝试使用服务器端在服务器端集成异步代码和同步代码的真正实例。对我来说,这感觉像是设计的气味。习惯上讲,人们会尝试使代码保持被动状态-进行订阅,并让订阅者以被动方式处理工作。我想不起来有必要以您描述的方式过渡到同步代码。
当然,查看您在Edit3中编写的代码,尚不清楚您要实现的目标。对订户中的错误做出反应不是源的责任。这是尾巴在摇狗。需要在那里以确保订阅者的服务连续性的异常处理程序应该在订阅处理代码中,而不是在可观察的源中-异常处理程序应该只关注免受流氓观察者行为的影响。这种逻辑在上面链接的AnonymousSafeObserver中实现,并且由大多数Rx提供的运算符使用。可观察的对象很可能具有处理其源数据连续性的逻辑-但这是另一个问题,并不是您在代码中要解决的问题。
无论您试图通过调用ToTask
或Wait
桥接到同步代码的任何地方,都有可能需要仔细考虑您的设计。
我认为提供更具体的问题说明(可能是从您要解决的现实情况中得出)可以为您提供更多有用的建议。您说的“ SqlDataReader”示例...
最终,人们可以通过订阅它来直接使用可观察的[包装SqlDataReader],但是由于周围的大多数代码仍然是同步的,因此他们不得不在某个时候等待结束(阻塞线程)。
...突出显示了您所处的设计困境。在这种情况下,您推断使用IEnumerable<T>
界面-或要求使用IObservable<List<T>>
显然会更好。但是关键是要放宽眼界,实际上您试图将SqlDataReader包装在一个可观察的包装器中实际上是一种设计气味-因为这是为响应特定的一次性请求而提供的固定数据。这可能是异步情况,但实际上不是被动情况。与更典型的反应性情况相反,例如“在获得股票X时将价格发送给我”,在这种情况下,您将完全根据订户的源头设置将来的数据流,然后订阅者做出反应。
关于c# - 为什么从给定的订阅者抛出时从未调用过OnError回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23862676/
来自 java docs 公共(public) FileWriter(String fileName) 抛出 IOException 抛出: IOException - 如果指定的文件存在但它是目录而
我使用以下代码将我的 .net 客户端(基于 CQL)连接到 3 节点 Cassandra 集群。我以 30 条记录/秒的速度(从 RabbitMQ)获取数据,并且它们顺利地存储在 cassandra
如果在读取文件时缺少字段,我应该捕获 NoSuchElementException。如果缺少一个字段,我只需要跳到文件的下一行。我的问题是,我在哪里实现我的 try/catch 代码来做到这一点?这是
我正在尝试使用 ASP.NET MVC 实现 OpeinID 登录。我正在尝试按照 http://blog.nerdbank.net/2008/04/add-openid-login-support-
学习使用 Java 进行 xml 解析,并且正在编写一个测试程序来尝试各种东西。所有测试 System.out.println() 都是我在控制台中所期望的,除了 childElement 返回 [n
我正在尝试使用 SwingUtilities 创建 JFrame Thread tt = new Thread(new Runnable() { public void run
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能是on-topi
我写了这段代码: MethodInfo method2 = typeof(IntPtr).GetMethod( "op_Explicit", Bind
我开始学习 Java,并且正在根据书本做一些练习。在执行此操作时,我遇到了以下错误:线程“main”java.util.InputMismatchException 中出现异常。我正在编写一个简单的程
我有一个文本文件,其中前两行是整数 m 和 n,然后有 m 行,每行都有 n 管道分隔值。我编写了一个程序,读取文件并使用文件中的值创建 m*n 数组,它工作了无数次,然后突然,使用相同的代码,使用相
所以我尝试使用在另一个类中生成的 bean 以在主应用程序中使用 package com.simon.spring.basics.properties; import org.spri
我还没有完成这个应用程序,但我希望在我的手机上看到它的样子。但是,它会强制关闭并引发 InstantiationException。 logcat 异常: 09-19 20:13:47.987: D/
我想从 UIViewController 加载一个基于 SwiftUI 的 View ,该 View 读取包本地的 json。仅 swiftUI 项目中的代码和绑定(bind)工作正常,当我利用 UI
'java.net.SocketTimeoutException:连接超时' 循环一段时间后我收到此错误。为什么我会收到 SocketTimeoutException?我该如何修复这个错误? @Ove
当有 null 值时抛出 ArgumentNullException() 是个好主意吗? This thread 没有提到在 null 上抛出的最明显的异常。 谢谢 最佳答案 ArgumentNull
我得到这个异常: NullReferenceException Object reference not set to an instance of an object at Namespace
所以其中一个方法的描述如下: public BasicLinkedList addToFront(T data) This operation is invalid for a sorted list
我正在使用 Intellij Idea,当我去生成 JavaDocs(通过工具 -> 生成 JavaDoc)时,我抛出了一个 IllegealArgumentException,没有关于发生了什么问题
我正在学习 C++ 中的互斥锁,但以下代码(摘自 N. Josuttis 的“C++ 标准库”)有问题。 我不明白为什么它会阻塞/抛出除非我在主线程中添加this_thread::sleep_for(
我正在试验 JavaFX 标签和组,通过鼠标拖动将它们移动到屏幕上。新节点从一些线程添加到动画组。但是,有时我会突然看到以下异常 - 我假设,当某些节点重叠时。但是不知道是什么问题……因为不涉及我的代
我是一名优秀的程序员,十分优秀!