gpt4 book ai didi

c# - 如何修改 IObservable 以便我收集字符,直到一段时间内没有字符为止?

转载 作者:行者123 更新时间:2023-11-30 19:38:55 25 4
gpt4 key购买 nike

我想编写一个接受 IObvservable<char> 的 Rx 查询并产生 IObservable<string> .应缓冲字符串,直到在指定时间内没有生成任何字符。

数据源是一个串口,我从中捕获了DataReceived。事件并从中产生一个 IObservable<char> .我处理的协议(protocol)基本上是基于字符的,但它的实现不是很一致,所以我需要以各种不同的方式观察字符流。在某些情况下,有一个响应结束终止符(但不是换行符),在一种情况下,我得到一个长度未知的字符串,我知道它已经全部到达的唯一方法是在几百毫秒内没有其他任何东西到达.这就是我要解决的问题。

我发现了

var result = source.Buffer(TimeSpan.FromMilliseconds(200))
.Select(s=>new string(s.ToArray()));

Buffer(TimeSpan)几乎是我所需要的,但不完全是。我需要在每次新字符到达时重置计时器,以便仅在自上一个字符以来经过足够的时间后才生成缓冲区。

拜托,任何人都可以就如何实现这一点提出建议吗?

[更新]当我在等待答案时,我想出了一个我自己的解决方案,它基本上重新发明了 Throttle:

    public virtual IObservable<string> BufferUntilQuiescentFor(IObservable<char> source, TimeSpan quietTime)
{
var shared = source.Publish().RefCount();
var timer = new Timer(quietTime.TotalMilliseconds);
var bufferCloser = new Subject<Unit>();
// Hook up the timer's Elapsed event so that it notifies the bufferCloser sequence
timer.Elapsed += (sender, args) =>
{
timer.Stop();
bufferCloser.OnNext(Unit.Default); // close the buffer
};
// Whenever the shared source sequence produces a value, reset the timer, which will prevent the buffer from closing.
shared.Subscribe(value =>
{
timer.Stop();
timer.Start();
});
// Finally, return the buffered sequence projected into IObservable<string>
var sequence = shared.Buffer(() => bufferCloser).Select(s=>new string(s.ToArray()));
return sequence;
}

我没听懂Throttle正确地,我认为它的行为与实际不同——现在我已经用“大理石图”向我解释了它并且我理解正确,我相信它实际上是一个比我想出的更优雅的解决方案(我也还没有测试我的代码)。不过,这是一个有趣的练习 ;-)

最佳答案

这一切都归功于 Enigmativity - 我只是在这里重复它以配合我正在添加的解释。

var dueTime = TimeSpan.FromMilliseconds(200);
var result = source
.Publish(o => o.Buffer(() => o.Throttle(dueTime)))
.Select(cs => new string(cs.ToArray()));

它的工作方式如下图所示(其中dueTime 对应三个破折号的时间):

source:    -----h--el--l--o----wo-r--l-d---|
throttled: ------------------o------------d|
buffer[0]: -----h--el--l--o--|
buffer[1]: -wo-r--l-d--|
result: ------------------"hello"------"world"

Publish的使用只是为了确保 BufferThrottle共享标的单次认购source .来自 Throttle 的文档:

Ignores the values from an observable sequence which are followed by another value before due time...

Buffer 的过载被使用需要一系列“缓冲区关闭”。每次序列发出一个值时,当前缓冲区结束并开始下一个缓冲区。

关于c# - 如何修改 IObservable<char> 以便我收集字符,直到一段时间内没有字符为止?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30336702/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com