gpt4 book ai didi

c# - 为什么我的 Rx.NET observable 似乎生成了整个序列两次?

转载 作者:太空宇宙 更新时间:2023-11-03 12:16:44 27 4
gpt4 key购买 nike

我有一个随机失败的单元测试,我无法解释。这涉及使用 Rx.NET 的可观察序列和我为转换序列而制作的扩展方法。首先,让我展示一下测试是如何失败的:

Machine.Specifications.SpecificationException:   Expected: System.Collections.Generic.List`1[System.Int32]:{  [8],  [10],  [11]}  But was:  System.Collections.Generic.List`1[System.Int32]:{  [8],  [10],  [11],  [8],  [10],  [11]}

OK, so you see, I get the entire sequence twice instead of once. Here's the test:

[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
Because of = () => source
.ShutterCurrentReadings().Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory = new List<int>();
static List<int> expectedElements = new List<int> {8, 10, 11};
static IObservable<char> source;

}

SubscribeAndWaitForCompletion() 是一个扩展方法,定义如下:

public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}

你会注意到那里有一个 .Trace() 调用,另一个在扩展方法中,这会通过 NLog 生成关于可观察序列的日志记录,这是跟踪输出:

20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Subscribe()20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Subscribe()20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8)20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(8)20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10)20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(10)20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11)20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(11)20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted()20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnCompleted()20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Dispose()20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose()Child test failed

This is pretty much what I would expect. I get one trace output from inside my extension method, then another on the transformed sequence outside the extension method. Each element in the sequence flows through the system exactly once, just as expected. And yet, I get the entire sequence captured twice in my test.

I had better provide the extension method so we can see what it does. Here it is:

    public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}

因此这里的目的是从数据流中挑选出电流传感器的读数。读数采用 Znn 格式(字面量“Z”后跟一个或两个十进制数字后跟一个换行符。扩展方法将原始输入字符序列转换为表示当前读数的整数序列。过滤器使用 Rx Buffer 运算符缓冲它认为可能是有效传感器读数的字符。缓冲区在看到“Z”字符时打开,在看到非数字字符时关闭。这是通过匹配和解析进行双重检查在正则表达式中,然后如果结果全部通过,则将其转换为整数并在输出序列中发出。

谁能看出为什么我的结果中可能会出现双重数据?

更新:与调查相关的附加代码。

    public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}

Trace 扩展方法可在 NuGet 包 TA.ASCOM.ReactiveCommunications(我的一个)中找到,但这是来源:

    public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}

我怀疑我可能从其他人那里复制了这段代码,但我似乎没有记下是谁。

最佳答案

编辑:

这是一种在 LinqPad 中模拟问题的方法,无需使用 MSpec/NChrunch (?) runner:

void Main()
{
//static initializers
List<int> expectedElements = new List<int> { 8, 10, 11 };
List<int> elementHistory = new List<int>();
IObservable<char> source;

//simulated continuous running of MSpec test
for (int i = 0; i < 20; i++)
{

//establish
source = "Z8\nZ10\nZ11\n".ToObservable();

//because
source
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));

//it
elementHistory.Dump(i.ToString()); //Linqpad
if(elementHistory.Count > 3)
throw new Exception("Assert.ShouldNotHappen");
}
}

public static class Extensions
{
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}

public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}

public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}

public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
}

这失败了,就像你的场景一样。

我最好的修复建议是将 elementHistory 的初始化移动到 Establish 步骤。您还可以将 source 变量从 establish 移开,这样您的测试将如下所示:

internal class when_a_shutter_current_reading_is_received
{
Establish context = () => elementHistory = new List<int>();
Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory;
static List<int> expectedElements = new List<int> { 8, 10, 11 };

}

您可能还想查看 Microsoft.Reactive.Testing,它为 Rx 查询提供了一些更强大的测试,尽管它不会像您的测试那样简单。


旧答案:

由于缺少 TraceShouldEqualBufferByPredicates 函数,我无法编译您的代码。如果它们来自外部来源,请记录来源。

我猜测问题源于 BufferByPredicates 实现、Trace 实现、Publish 后缺少 Connect ,或静态 elementHistory

我最好的猜测是静态 elementHistory:如果该测试同时运行两次,您就会遇到竞争条件,并且您最终可能会得到双重结果(Establish运行两次,然后因为运行两次,然后会失败。


关于c# - 为什么我的 Rx.NET observable 似乎生成了整个序列两次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49416593/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com