gpt4 book ai didi

c# - 在向订阅者提供服务之前和之后修改 C# 中的 Rx 事件

转载 作者:行者123 更新时间:2023-11-30 15:17:29 24 4
gpt4 key购买 nike

每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的方法,有效:

var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
subj.Finally(obs.OnCompleted);
return subj.Subscribe(e =>
{
try
{
Preprocess(e);
obs.OnNext(e);
Postprocess(e);
}
catch (Exception ex) { obs.OnError(ex); }
});
});

我的问题:这是正确的做法吗,或者是否有更好的模板/扩展方法?

最佳答案

分析

这里有一些微妙之处。

你的问题的标题是提到修改事件 - 但你不应该真的这样做。我假设事件的预处理和后处理报告,但不要修改它。如果您需要这样做,最好更改 Do下面到 Select并让 Preprocess 返回值的修改副本 -这种不可变的方法更明确,更不容易出错。目前尚不清楚事件的后修改在可观察流中的意义 - 我会避免它并让订阅者报告观察。下面的解决方案中没有任何内容可以从技术上阻止您改变处理方法中的事件 - 但是如果您正在改变并且有多个下游订阅者的行为确实需要非常小心地处理。这种突变不是惯用的 Rx,也不是一般良好的编程习惯。

有许多引入异步性的下游运算符,因此如果您尝试记录最终订阅者的观察结果,则无法保证它会在您的后处理被调用时发生。例如,插入一个简单的 Delay在订阅者可能破坏你之前,因为 Postprocess 将在订阅者看到事件之前被调用。您唯一的保证是在直接下游订阅者从 OnNext 返回后调用 Postprocess。 - 就是这样。你不能说超过那个点的时间,所以它只是非常狭义的后期处理。这就是为什么我会在订阅者中放置“后处理”,执行您认为的“最终”操作或(如果该订阅者是同步的)紧接在它之前。

您调用 Finally不会做任何事情 - 它不会修改它所应用的可观察对象,它会返回一个新的可观察对象,其中包含您要丢弃的行为。

您正在捕获订阅者抛出的异常。这是特别微妙的 - 你不应该这样做,或者发送 OnError之后发送给订阅者,因为订阅者现在处于未知错误状态。我在 How to handle exceptions thrown by observer's onNext in RxJava? 中详细讨论了这个问题(答案适用于 .NET)。您的问题并不清楚您是否应该在订户失败的情况下进行后处理,但是由于您的实现没有尝试这样做,所以我也没有。

您的实现没有通过 OnCompletedOnError来自上游源的事件(在您的情况下为 subj)。

解决方案

尽管有上述注意事项,但这里有一种方法可能对您有用。

您可以使用 Do (或者对于突变,Select 将预处理更改为 Func<T>,如上所述)来处理事物的预处理方面,这使事情变得更容易。这是一个方便的自定义运算符来管理它:

public static class ObservableExtensions
{
public static IObservable<T> Process<T>(
this IObservable<T> source,
Action<T> preprocess,
Action<T> postprocess)
{
return Observable.Create<T>(o =>
source.Do(preprocess).Subscribe(x =>
{
o.OnNext(x);
try
{
postprocess(x);
}
catch (Exception e)
{
o.OnError(e);
}
},
o.OnError,
o.OnCompleted)
);
}
}

Do 会将预处理错误正确传播为 OnError try-catch 将处理后处理错误。如上所述,我们故意不处理订户中的错误。 Create方法将正确执行其余的 Rx 语法。

像这样使用它:

subj.Process(Preprocess, PostProcess)
.Subscribe(/* observer or handlers etc. */);

测试

这里使用响应式(Reactive)测试框架(nuget rx-testing)和断言库Shouldly(nuget shouldly)对这个算子进行一些单元测试:

public class TestProcess : ReactiveTest
{
[Test]
public void ErrorFreeStreamProcessedCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)", "2", "Postprocess(2)"
};

var actual = new List<string>();

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);

var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add($"Postprocess({x})"));

var result = scheduler.CreateObserver<int>();

sut.Do(x => actual.Add(x.ToString())).Subscribe(result);

scheduler.Start();

result.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);

actual.ShouldBe(expected);
}

[Test]
public void ErrorInPreprocessHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Error"
};

var expectedException = new ApplicationException("Error");

var actual = new List<string>();

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);

var sut = xs.Process(
x => actual.Add(x == 1 ? $"Preprocess({x})" : throw expectedException),
x => actual.Add($"Postprocess({x})"));

var result = scheduler.CreateObserver<int>();

sut.Do(x => actual.Add(x.ToString()),
e => actual.Add(e.Message)).Subscribe(result);

scheduler.Start();

result.Messages.AssertEqual(
OnNext(100, 1),
OnError<int>(200, expectedException)
);

actual.ShouldBe(expected);
}

[Test]
public void ErrorInPostprocessHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)", "2", "Error"
};

var expectedException = new ApplicationException("Error");

var actual = new List<string>();

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);

var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add(x == 1 ? $"Postprocess({x})" : throw expectedException));

var result = scheduler.CreateObserver<int>();

sut.Do(x => actual.Add(x.ToString()),
e => actual.Add(e.Message)).Subscribe(result);

scheduler.Start();

result.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnError<int>(200, expectedException)
);

actual.ShouldBe(expected);
}

[Test]
public void ErrorInSubscriberHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)"
};

var expectedException = new ApplicationException("Error");

var actual = new List<string>();

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);

var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add($"Postprocess({x})"));

var result = scheduler.CreateObserver<int>();

sut.Subscribe(
x => { if (x != 1) throw expectedException; else actual.Add(x.ToString()); result.OnNext(x); },
result.OnError,
result.OnCompleted);

try
{
scheduler.Start();
}
catch
{

}

result.Messages.AssertEqual(
OnNext(100, 1)
);

actual.ShouldBe(expected);
}
}

关于c# - 在向订阅者提供服务之前和之后修改 C# 中的 Rx 事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46250468/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com