gpt4 book ai didi

c# - 如何将异常转换为事件并重新订阅有故障的 IObservable?

转载 作者:行者123 更新时间:2023-11-30 15:28:42 25 4
gpt4 key购买 nike

如何将 IObservable 流中的异常转换为普通域对象并透明地重新订阅流?

附录:作为James在评论中指出,我的用例想法类似于在不可靠的来源上有一个应该是连续的流,例如一个网络。如果出现故障,只需尝试重新连接到源,但通知下游处理器。

事实上,这与我在 Translating a piece of asynchronous C# code to F# (with Reactive Extensions and FSharpx) 的另一个问题有关,这又源于 How to implement polling using Observables? .

事实上,现在我想到了,我可以先使用 How to write a generic, recursive extension method in F#? 处的代码("RetryAfterDelay ")(使用更多参数来调整 RetryAfterDelay 行为)并将其与此实现链接起来。当重试次数耗尽时,将产生域错误并重新启动轮询器。诚然,可能会有更有效的方法,但无论如何。 :) ... 或者只提供一个回调函数来记录错误而不是将它们转换为领域事件,好吧,选择比比皆是...

但是回到原来的代码...

例如,如果我有

public enum EventTypeEnum
{
None = 0,
Normal = 1,
Faulted = 2
}

public class Event
{
public EventTypeEnum Type { get; set; }
}

private static IObservable<int> FaultingSequence1()
{
var subject = new ReplaySubject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.OnError(new InvalidOperationException("Something went wrong!"));

return subject;
}

private static IEnumerable<int> FaultingSequence2()
{
for(int i = 0; i < 3; ++i)
{
yield return 1;
}

throw new InvalidOperationException("Something went wrong!");
}

//Additional pondering: Why isn't FaultingSequence2().ToObservable() too be procted by Catch?
//
//This part is for illustratory purposes here. This is the piece I'd like
//behave so that exceptions would get transformed to Events with EventTypeEnum.Faulted
//and passed along to the stream that's been subscribed to while resubscribing to
//FaultingSequence1. That is, the subscribed would learn about the fault through a
//domain event type.
//Retry does the resubscribing, but only on OnError.
var stream = FaultingSequence1().Catch<int, Exception>(ex =>
{
Console.WriteLine("Exception: {0}", ex);
return Observable.Throw<int>(ex);
}).Retry().Select(i => new Event { Type = EventTypeEnum.Normal });

//How to get this to print "Event type: Normal", "Event type: Normal", "Event type: Faulted"?
stream.Subscribe(i => Console.WriteLine("Event type: {0}", i.Type));

这个问题现在真的让我很头疼!有什么建议吗?

最佳答案

有一个运算符叫做 Materialize 它将每个事件转换为 Notification<T> :

OnNext:
OnNext a Notification<T> with Kind OnNext containing a value.

OnError:
OnNext a Notification<T> with Kind OnError containing an exception.
OnCompleted.

OnCompleted:
OnNext a Notification<T> with Kind OnCompleted
OnCompleted.

因此,当调用 OnError 或 OnCompleted 时,订阅仍会完成,但永远不会在订阅者上调用 OnError。所以你可以做这样的事情......

source
.Materialize()
.Repeat();

但是,即使原始订阅自然完成(通过 OnCompleted),这也会重新订阅源

所以也许您仍然希望调用 OnError,但您还希望原始 OnError 中的异常通过 Notification<T> 内的 OnNext 传递。 .为此,您可以使用这样的东西:

source
.Materialize()
.SelectMany(notification =>
notification.Kind == NotificationKind.OnError
? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
: Observable.Return(notification)
)
.Retry();

以这种方式,如果订阅自然完成(通过 OnCompleted),则不会重新订阅源。

设置好之后,就足以将每种类型的通知映射到您要使用的任何域对象:

source
.Materialize()
.SelectMany(notification =>
notification.Kind == NotificationKind.OnError
? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
: Observable.Return(notification)
)
.Retry()
.Map(notification => {
switch (notification.Kind) {
case (NotificationKind.OnNext): return // something.
case (NotificationKind.OnError): return // something.
case (NotificationKind.OnCompleted): return // something.
default: throw new NotImplementedException();
}
});

关于c# - 如何将异常转换为事件并重新订阅有故障的 IObservable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25228706/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com