- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如何将 IObservable
流中的异常转换为普通域对象并透明地重新订阅流?
附录:作为James在评论中指出,我的用例想法类似于在不可靠的来源上有一个应该是连续的流,例如一个网络。如果出现故障,只需尝试重新连接到源,但通知下游处理器。
事实上,这与我在 Translating a piece of asynchronous C# code to F# (with Reactive Extensions and FSharpx) 的另一个问题有关,这又源于 How to implement polling using Observables? .
事实上,现在我想到了,我可以先使用 How to write a generic, recursive extension method in F#? 处的代码("RetryAfterDelay ")(使用更多参数来调整 RetryAfterDelay
行为)并将其与此实现链接起来。当重试次数耗尽时,将产生域错误并重新启动轮询器。诚然,可能会有更有效的方法,但无论如何。 :) ... 或者只提供一个回调函数来记录错误而不是将它们转换为领域事件,好吧,选择比比皆是...
但是回到原来的代码...
例如,如果我有
public enum EventTypeEnum
{
None = 0,
Normal = 1,
Faulted = 2
}
public class Event
{
public EventTypeEnum Type { get; set; }
}
private static IObservable<int> FaultingSequence1()
{
var subject = new ReplaySubject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.OnError(new InvalidOperationException("Something went wrong!"));
return subject;
}
private static IEnumerable<int> FaultingSequence2()
{
for(int i = 0; i < 3; ++i)
{
yield return 1;
}
throw new InvalidOperationException("Something went wrong!");
}
//Additional pondering: Why isn't FaultingSequence2().ToObservable() too be procted by Catch?
//
//This part is for illustratory purposes here. This is the piece I'd like
//behave so that exceptions would get transformed to Events with EventTypeEnum.Faulted
//and passed along to the stream that's been subscribed to while resubscribing to
//FaultingSequence1. That is, the subscribed would learn about the fault through a
//domain event type.
//Retry does the resubscribing, but only on OnError.
var stream = FaultingSequence1().Catch<int, Exception>(ex =>
{
Console.WriteLine("Exception: {0}", ex);
return Observable.Throw<int>(ex);
}).Retry().Select(i => new Event { Type = EventTypeEnum.Normal });
//How to get this to print "Event type: Normal", "Event type: Normal", "Event type: Faulted"?
stream.Subscribe(i => Console.WriteLine("Event type: {0}", i.Type));
这个问题现在真的让我很头疼!有什么建议吗?
最佳答案
有一个运算符叫做 Materialize
它将每个事件转换为 Notification<T>
:
OnNext:
OnNext a Notification<T> with Kind OnNext containing a value.
OnError:
OnNext a Notification<T> with Kind OnError containing an exception.
OnCompleted.
OnCompleted:
OnNext a Notification<T> with Kind OnCompleted
OnCompleted.
因此,当调用 OnError 或 OnCompleted 时,订阅仍会完成,但永远不会在订阅者上调用 OnError。所以你可以做这样的事情......
source
.Materialize()
.Repeat();
但是,即使原始订阅自然完成(通过 OnCompleted),这也会重新订阅源。
所以也许您仍然希望调用 OnError,但您还希望原始 OnError 中的异常通过 Notification<T>
内的 OnNext 传递。 .为此,您可以使用这样的东西:
source
.Materialize()
.SelectMany(notification =>
notification.Kind == NotificationKind.OnError
? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
: Observable.Return(notification)
)
.Retry();
以这种方式,如果订阅自然完成(通过 OnCompleted),则不会重新订阅源。
设置好之后,就足以将每种类型的通知映射到您要使用的任何域对象:
source
.Materialize()
.SelectMany(notification =>
notification.Kind == NotificationKind.OnError
? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
: Observable.Return(notification)
)
.Retry()
.Map(notification => {
switch (notification.Kind) {
case (NotificationKind.OnNext): return // something.
case (NotificationKind.OnError): return // something.
case (NotificationKind.OnCompleted): return // something.
default: throw new NotImplementedException();
}
});
关于c# - 如何将异常转换为事件并重新订阅有故障的 IObservable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25228706/
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
我有一个鼠标左键状态流: var leftMouseButton = mouse.Select(x => x.LeftButton).DistinctUntilChanged(); 然后我Window
我有一个“值(value)观”IObservable这是返回 T必须按顺序组合成可变长度数组的元素,我有一个“控制”IObservable这告诉我下一个数组必须有多长。删除一个元素、重复它或将结果打乱
微软推出了 IObservable interface到 BCL 与 .NET Framework 4,我想,“太棒了,终于,我必须使用它!”因此,我深入挖掘并阅读帖子和文档,甚至实现了该模式。 这样
我有一个 IObservable 类型的流(热可观察)并想将其分成 IObservable 的两个可观察对象和 IObservable 我天真地尝试了以下但我只得到 flowStream人口稠密。 I
在 Windows Phone 7 上,IObservable 有一个新版本的 BufferWithTimeOrCount 扩展方法,它返回“流的流”而不是以前的“列表流”。我在尝试使用新方法或旧方法
更新:查看底部的示例 我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给 OnNext .可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的
我想创建一个可用于表示动态计算值的类,而另一个表示值的类可以是这些动态计算值的源(主题)。目标是当主题发生变化时,计算值会自动更新。 在我看来,使用 IObservable/IObserver 是可行
我有一个返回接口(interface) IObservable 的方法(在 silverlight 中)并希望将其转换为另一个 IObservable ? 那么我需要用什么来代替“CONVERT_SO
这两种方法有什么区别,每种方法的最佳情况是什么?我知道他们都能够附加一个函数来处理来自 IObservable 的排放,但我并不真正理解除此之外的差异。 编辑 对不起,我应该指定的。 IObserva
我的系统有很多状态对象——连接状态、CPU 负载、登录用户等等。所有此类事件都合并到单个可观察流中。 我想制作一个管理实用程序来显示系统的实际状态并显示所有这些计数器。 我如何创建一个包含所有计数器的
我有一个 IObservable我把它变成一个IObservable使用一些中间步骤: var observedXDocuments = from b in observedBytes
Although it is possible to attach an observer to multiple providers, the recommended pattern is to a
在我当前正在开发的系统中,我有许多被定义为接口(interface)和基类的组件。系统的每个部分都有一些与系统其他部分交互的特定点。 例如,数据准备组件准备了一些数据,最终需要进入数据处理部分,通信组
我有一个 IObservable包含 XML 文档(的片段)。我想把一个变成另一个。因此,例如,假设我有以下从我的 IObservable 推送的片段(每行包含一个片段): 获取以下文件: 我一
我有一个 IObservable这给了我字节数组中不确定数量的字节。我想知道我是如何从那开始返回 IObservable 的每个字节数组中有一定数量的字节。假设我们一次需要 10 个字节。 也就是说,
我在我的一个项目中使用了 IObserver/IObservable 接口(interface)。 CommandReader 是一个 IObservable,它不断从流中读取数据,然后将其传递给它的
例如,考虑一下: public IDisposable Subscribe(IObserver observer) { return eventStream.Where
当我写 .Subscribe 时我经常发现 Resharper 为我选择了以下重载,位于 mscorlib 中,Version=4.0.0.0: namespace System { public
我是一名优秀的程序员,十分优秀!