- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个源流,通常希望在项目到达时发出它们。但是还有另一个可观察到的——我们称之为“门”。当门关闭时,源项目应该缓冲并仅在门打开时释放。
我已经能够编写一个函数来执行此操作,但它似乎比需要的更复杂。我必须使用 Observable.Create
方法。我假设有一种方法可以使用 Delay
或 Buffer
方法仅使用几行功能更强大的代码来实现我的目标,但我不知道如何实现。 Delay
似乎特别有前途,但我不知道如何有时延迟,有时允许一切立即通过(零延迟)。同样,我认为我可以使用 Buffer
,然后使用 SelectMany
;当门打开时,我会有长度为 1 的缓冲区,而当门关闭时,我会有更长的缓冲区,但我还是不知道如何让它工作。
这是我构建的适用于我所有测试的内容:
/// <summary>
/// Returns every item in <paramref name="source"/> in the order it was emitted, but starts
/// caching/buffering items when <paramref name="delay"/> emits true, and releases them when
/// <paramref name="delay"/> emits false.
/// </summary>
/// <param name="delay">
/// Functions as "gate" to start and stop the emitting of items. The gate is opened when true
/// and closed when false. The gate is open by default.
/// </param>
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay) =>
Observable.Create<T>(obs =>
{
ImmutableList<T> buffer = ImmutableList<T>.Empty;
bool isDelayed = false;
var conditionSubscription =
delay
.DistinctUntilChanged()
.Subscribe(i =>
{
isDelayed = i;
if (isDelayed == false)
{
foreach (var j in buffer)
{
obs.OnNext(j);
}
buffer = ImmutableList<T>.Empty;
}
});
var sourceSubscription =
source
.Subscribe(i =>
{
if (isDelayed)
{
buffer = buffer.Add(i);
}
else
{
obs.OnNext(i);
}
});
return new CompositeDisposable(sourceSubscription, conditionSubscription);
});
这是另一个通过测试的选项。它非常简洁,但没有使用 Delay 或 Buffer 方法;我需要手动进行延迟/缓冲。
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay) =>
delay
.StartWith(false)
.DistinctUntilChanged()
.CombineLatest(source, (d, i) => new { IsDelayed = d, Item = i })
.Scan(
seed: new { Items = ImmutableList<T>.Empty, IsDelayed = false },
accumulator: (sum, next) => new
{
Items = (next.IsDelayed != sum.IsDelayed) ?
(next.IsDelayed ? sum.Items.Clear() : sum.Items) :
(sum.IsDelayed ? sum.Items.Add(next.Item) : sum.Items.Clear().Add(next.Item)),
IsDelayed = next.IsDelayed
})
.Where(i => !i.IsDelayed)
.SelectMany(i => i.Items);
这些是我的测试:
[DataTestMethod]
[DataRow("3-a 6-b 9-c", "1-f", "3-a 6-b 9-c", DisplayName = "Start with explicit no_delay, emit all future items")]
[DataRow("3-a 6-b 9-c", "1-f 2-f", "3-a 6-b 9-c", DisplayName = "Start with explicit no_delay+no_delay, emit all future items")]
[DataRow("3-a 6-b 9-c", "1-t", "", DisplayName = "Start with explicit delay, emit nothing")]
[DataRow("3-a 6-b 9-c", "1-t 2-t", "", DisplayName = "Start with explicit delay+delay, emit nothing")]
[DataRow("3-a 6-b 9-c", "5-t 10-f", "3-a 10-b 10-c", DisplayName = "When delay is removed, all cached items are emitted in order")]
[DataRow("3-a 6-b 9-c 12-d", "5-t 10-f", "3-a 10-b 10-c 12-d", DisplayName = "When delay is removed, all cached items are emitted in order")]
public void DelayWhile(string source, string isDelayed, string expectedOutput)
{
(long time, string value) ParseEvent(string e)
{
var parts = e.Split('-');
long time = long.Parse(parts[0]);
string val = parts[1];
return (time, val);
}
IEnumerable<(long time, string value)> ParseEvents(string s) => s.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries).Select(ParseEvent);
var scheduler = new TestScheduler();
var sourceEvents = ParseEvents(source).Select(i => OnNext(i.time, i.value)).ToArray();
var sourceStream = scheduler.CreateHotObservable(sourceEvents);
var isDelayedEvents = ParseEvents(isDelayed).Select(i => OnNext(i.time, i.value == "t")).ToArray();
var isDelayedStream = scheduler.CreateHotObservable(isDelayedEvents);
var expected = ParseEvents(expectedOutput).Select(i => OnNext(i.time, i.value)).ToArray();
var obs = scheduler.CreateObserver<string>();
var result = sourceStream.DelayWhile(isDelayedStream);
result.Subscribe(obs);
scheduler.AdvanceTo(long.MaxValue);
ReactiveAssert.AreElementsEqual(expected, obs.Messages);
}
[TestMethod]
public void DelayWhile_SubscribeToSourceObservablesOnlyOnce()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable<int>();
var delay = scheduler.CreateHotObservable<bool>();
// No subscriptions until subscribe
var result = source.DelayWhile(delay);
Assert.AreEqual(0, source.ActiveSubscriptions());
Assert.AreEqual(0, delay.ActiveSubscriptions());
// Subscribe once to each
var obs = scheduler.CreateObserver<int>();
var sub = result.Subscribe(obs);
Assert.AreEqual(1, source.ActiveSubscriptions());
Assert.AreEqual(1, delay.ActiveSubscriptions());
// Dispose subscriptions when subscription is disposed
sub.Dispose();
Assert.AreEqual(0, source.ActiveSubscriptions());
Assert.AreEqual(0, delay.ActiveSubscriptions());
}
[TestMethod]
public void DelayWhile_WhenSubscribeWithNoDelay_EmitCurrentValue()
{
var source = new BehaviorSubject<int>(1);
var emittedValues = new List<int>();
source.DelayWhile(Observable.Return(false)).Subscribe(i => emittedValues.Add(i));
Assert.AreEqual(1, emittedValues.Single());
}
// Subscription timing issue?
[TestMethod]
public void DelayWhile_WhenSubscribeWithDelay_EmitNothing()
{
var source = new BehaviorSubject<int>(1);
var emittedValues = new List<int>();
source.DelayWhile(Observable.Return(true)).Subscribe(i => emittedValues.Add(i));
Assert.AreEqual(0, emittedValues.Count);
}
[TestMethod]
public void DelayWhile_CoreScenario()
{
var source = new BehaviorSubject<int>(1);
var delay = new BehaviorSubject<bool>(false);
var emittedValues = new List<int>();
// Since no delay when subscribing, emit value
source.DelayWhile(delay).Subscribe(i => emittedValues.Add(i));
Assert.AreEqual(1, emittedValues.Single());
// Turn on delay and buffer up a few; nothing emitted
delay.OnNext(true);
source.OnNext(2);
source.OnNext(3);
Assert.AreEqual(1, emittedValues.Single());
// Turn off delay; should release the buffered items
delay.OnNext(false);
Assert.IsTrue(emittedValues.SequenceEqual(new int[] { 1, 2, 3 }));
}
最佳答案
编辑:我忘记了使用基于Join
和Join
的运算符(如WithLatestFrom
) 当有两个冷的可观察对象时。不用说,下面提到的关于缺乏交易的批评比以往任何时候都更加明显。
我会推荐这个,它更像是我原来的解决方案,但使用了 Delay
重载。它通过了除 DelayWhile_WhenSubscribeWithDelay_EmitNothing
之外的所有测试。为了解决这个问题,我将创建一个接受起始默认值的重载:
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay, bool isGateClosedToStart)
{
return source.Publish(_source => delay
.DistinctUntilChanged()
.StartWith(isGateClosedToStart)
.Publish(_delay => _delay
.Select(isGateClosed => isGateClosed
? _source.TakeUntil(_delay).Delay(_ => _delay)
: _source.TakeUntil(_delay)
)
.Merge()
)
);
}
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return DelayWhile(source, delay, false);
}
旧答案:
我最近读了一本书,批评 Rx 不支持事务,我第一次尝试解决这个问题就是一个很好的例子,为什么:
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return source.Publish(_source => delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => _delay
.Select(isGateClosed => isGateClosed
? _source.Buffer(_delay).SelectMany(l => l)
: _source)
.Switch()
)
);
}
那应该可以工作,除了有太多东西依赖于 delay
observable,而且订阅顺序很重要:在这种情况下,Switch
在 Buffer
结束之前切换,因此当延迟门关闭时不会有任何结果。
这可以通过以下方式解决:
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return source.Publish(_source => delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => _delay
.Select(isGateClosed => isGateClosed
? _source.TakeUntil(_delay).Buffer(_delay).SelectMany(l => l)
: _source.TakeUntil(_delay)
)
.Merge()
)
);
}
我的下一次尝试通过了你所有的测试,并且使用了你想要的 Observable.Delay
重载:
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => source
.Join(_delay,
s => Observable.Empty<Unit>(),
d => _delay,
(item, isGateClosed) => isGateClosed
? Observable.Return(item).Delay(, _ => _delay)
: Observable.Return(item)
)
.Merge()
);
}
Join
可以简化为 WithLatestFrom
,如下所示:
public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => source
.WithLatestFrom(_delay,
(item, isGateClosed) => isGateClosed
? Observable.Return(item).Delay(_ => _delay)
: Observable.Return(item)
)
.Merge()
);
}
关于system.reactive - 如何在另一个可观察对象发出 true 时缓冲项目,并在 false 时释放它们,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43421284/
我正在使用 MediaPlayer 在我的应用程序中播放在线 mp3 文件中的一些声音。 但是,即使在播放完成后,我也会在控制台中收到一行又一行的回调和缓冲。 10-24 08:08:48.467
我有一个简单的多边形。 dfr p = st_polygon(list(as.matrix(dfr))) > pbuf = st_buffer(p, .4) > plot(pbuf) > plot(
这可能又是一些愚蠢的问题,也许这确实是我所缺少的东西,但我很难让 glMultiDrawArrays 在 OpenGL4 中工作。 我发现了很多这样的解释: for (int i = 0; i #i
这仅仅是根据网络速度调整预缓冲内容量的问题吗?你是否在一开始就为此调整一次,每秒......? 或者它更复杂 - 对您的网络速度记录历史进行采样并取平均值/中值并对其进行调整? 最佳答案 您的第二段总
嗨,我正在使用 FFmpeg Autogen C#。当我使用 mkv 输出作为文件并使用 h264 rtsp 流作为输入时,一切正常。编解码器是 libx264 ffmpeg.avio_open(
我需要多次遍历几个文本文件的行。目前这是通过多个 with open("file.txt") as f: for line in f: # do something 虽然性能还
昨天给同学们写了一个xinetd小练习:做一个反向回显程序。 为了学习新东西,我尝试实现一个 Haskell 解决方案。琐碎的main = forever $ interact reverse不起作用
我正在阅读《实时渲染第三版》中的遮挡剔除部分,但我无法理解它是如何工作的。一些问题: “Z 金字塔”有何贡献?为什么我们需要多种分辨率的 Z 缓冲区?在书中,它的显示如下(左侧): 八叉树结构与用于一
我通过串行端口与设备通信。 我已成功获取 InputStream 并读取设备发送的内容。 但问题是,我根本不知道何时停止阅读并继续执行另一项任务。 这是简化的代码: inputStream = ser
我有以下代码: func (q *Queue) GetStreams(qi *QueueInfo) { channel := make(chan error, len(qi.AudioChun
在我调用 -play 之前,有没有办法让 MPMusicPlayerController 缓冲内容?还是在您设置队列时默认执行此操作? AVAudioPlayer 有 -prepareToPlay 方
我正在编写一个数据库 备份函数,从System.Diagnostics.Process 对象 读取StandardOutput (StreamReader) 属性。我已成功写入普通文件。 //This
我有一个 wpf 应用程序,其中所有 viewModel 都继承自实现 INotifyPropertyChanged 的类 NotifyPropertyChangeClass(见下文)。 我想限制
我需要类似于 withLatestFrom 的东西,对应于下图: ---------A-----------------B-- -1-2-3------4------5-6-7-8---- -----
有没有办法缓冲 OutputStream,在返回之前修改它?这是我的代码片段: public ServletOutputStream getOutputStream() throws IOExcept
目前我们有实现服务器通信协议(protocol)缓冲的需求。如果有人对此有任何意见,他们可以向我提供任何意见吗。 最佳答案 请查看以下 Protocol Buffer 链接。 http://code.
所以我目前正在开发一个 Java 应用程序,该应用程序应该将特定事件记录到数据库中。我希望每分钟最多有 15 到 20 次插入,基本上我想知道我是否应该为每个插入语句建立一个新连接,或者只要应用程序正
请考虑以下代码,包括两个线程 buffering_thread(用一条消息填充缓冲区指针)和 sending_thread(清空缓冲区): #include "msg.cpp" msg * buffe
是否可以在线播放由两个或多个视频文件组成的视频? 由于我原来的帖子不够清楚,这里有扩展的解释和问题。 我的站点托管在 Linux/Apache/PHP 服务器上。我有 FLV/F4V 格式的视频文件。
这是我用于缓冲和转换传入事件的代码: public Publisher> logs(String eventId) { ConnectableObservable connectableObs
我是一名优秀的程序员,十分优秀!