gpt4 book ai didi

system.reactive - 如何在另一个可观察对象发出 true 时缓冲项目,并在 false 时释放它们

转载 作者:行者123 更新时间:2023-12-04 17:52:34 26 4
gpt4 key购买 nike

我有一个源流,通常希望在项目到达时发出它们。但是还有另一个可观察到的——我们称之为“门”。当门关闭时,源项目应该缓冲并仅在门打开时释放。

我已经能够编写一个函数来执行此操作,但它似乎比需要的更复杂。我必须使用 Observable.Create 方法。我假设有一种方法可以使用 DelayBuffer 方法仅使用几行功能更强大的代码来实现我的目标,但我不知道如何实现。 Delay 似乎特别有前途,但我不知道如何有时延迟,有时允许一切立即通过(零延迟)。同样,我认为我可以使用 Buffer,然后使用 SelectMany;当门打开时,我会有长度为 1 的缓冲区,而当门关闭时,我会有更长的缓冲区,但我还是不知道如何让它工作。

这是我构建的适用于我所有测试的内容:

/// <summary>
/// Returns every item in <paramref name="source"/> in the order it was emitted, but starts
/// caching/buffering items when <paramref name="delay"/> emits true, and releases them when
/// <paramref name="delay"/> emits false.
/// </summary>
/// <param name="delay">
/// Functions as "gate" to start and stop the emitting of items. The gate is opened when true
/// and closed when false. The gate is open by default.
/// </param>

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay) =>
Observable.Create<T>(obs =>
{
ImmutableList<T> buffer = ImmutableList<T>.Empty;
bool isDelayed = false;
var conditionSubscription =
delay
.DistinctUntilChanged()
.Subscribe(i =>
{
isDelayed = i;
if (isDelayed == false)
{
foreach (var j in buffer)
{
obs.OnNext(j);
}
buffer = ImmutableList<T>.Empty;
}
});
var sourceSubscription =
source
.Subscribe(i =>
{
if (isDelayed)
{
buffer = buffer.Add(i);
}
else
{
obs.OnNext(i);
}
});
return new CompositeDisposable(sourceSubscription, conditionSubscription);
});

这是另一个通过测试的选项。它非常简洁,但没有使用 Delay 或 Buffer 方法;我需要手动进行延迟/缓冲。

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay) =>
delay
.StartWith(false)
.DistinctUntilChanged()
.CombineLatest(source, (d, i) => new { IsDelayed = d, Item = i })
.Scan(
seed: new { Items = ImmutableList<T>.Empty, IsDelayed = false },
accumulator: (sum, next) => new
{
Items = (next.IsDelayed != sum.IsDelayed) ?
(next.IsDelayed ? sum.Items.Clear() : sum.Items) :
(sum.IsDelayed ? sum.Items.Add(next.Item) : sum.Items.Clear().Add(next.Item)),
IsDelayed = next.IsDelayed
})
.Where(i => !i.IsDelayed)
.SelectMany(i => i.Items);

这些是我的测试:

[DataTestMethod]
[DataRow("3-a 6-b 9-c", "1-f", "3-a 6-b 9-c", DisplayName = "Start with explicit no_delay, emit all future items")]
[DataRow("3-a 6-b 9-c", "1-f 2-f", "3-a 6-b 9-c", DisplayName = "Start with explicit no_delay+no_delay, emit all future items")]
[DataRow("3-a 6-b 9-c", "1-t", "", DisplayName = "Start with explicit delay, emit nothing")]
[DataRow("3-a 6-b 9-c", "1-t 2-t", "", DisplayName = "Start with explicit delay+delay, emit nothing")]
[DataRow("3-a 6-b 9-c", "5-t 10-f", "3-a 10-b 10-c", DisplayName = "When delay is removed, all cached items are emitted in order")]
[DataRow("3-a 6-b 9-c 12-d", "5-t 10-f", "3-a 10-b 10-c 12-d", DisplayName = "When delay is removed, all cached items are emitted in order")]
public void DelayWhile(string source, string isDelayed, string expectedOutput)
{
(long time, string value) ParseEvent(string e)
{
var parts = e.Split('-');
long time = long.Parse(parts[0]);
string val = parts[1];
return (time, val);
}
IEnumerable<(long time, string value)> ParseEvents(string s) => s.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries).Select(ParseEvent);
var scheduler = new TestScheduler();
var sourceEvents = ParseEvents(source).Select(i => OnNext(i.time, i.value)).ToArray();
var sourceStream = scheduler.CreateHotObservable(sourceEvents);
var isDelayedEvents = ParseEvents(isDelayed).Select(i => OnNext(i.time, i.value == "t")).ToArray();
var isDelayedStream = scheduler.CreateHotObservable(isDelayedEvents);
var expected = ParseEvents(expectedOutput).Select(i => OnNext(i.time, i.value)).ToArray();
var obs = scheduler.CreateObserver<string>();
var result = sourceStream.DelayWhile(isDelayedStream);
result.Subscribe(obs);
scheduler.AdvanceTo(long.MaxValue);
ReactiveAssert.AreElementsEqual(expected, obs.Messages);
}

[TestMethod]
public void DelayWhile_SubscribeToSourceObservablesOnlyOnce()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable<int>();
var delay = scheduler.CreateHotObservable<bool>();

// No subscriptions until subscribe
var result = source.DelayWhile(delay);
Assert.AreEqual(0, source.ActiveSubscriptions());
Assert.AreEqual(0, delay.ActiveSubscriptions());

// Subscribe once to each
var obs = scheduler.CreateObserver<int>();
var sub = result.Subscribe(obs);
Assert.AreEqual(1, source.ActiveSubscriptions());
Assert.AreEqual(1, delay.ActiveSubscriptions());

// Dispose subscriptions when subscription is disposed
sub.Dispose();
Assert.AreEqual(0, source.ActiveSubscriptions());
Assert.AreEqual(0, delay.ActiveSubscriptions());
}

[TestMethod]
public void DelayWhile_WhenSubscribeWithNoDelay_EmitCurrentValue()
{
var source = new BehaviorSubject<int>(1);
var emittedValues = new List<int>();
source.DelayWhile(Observable.Return(false)).Subscribe(i => emittedValues.Add(i));
Assert.AreEqual(1, emittedValues.Single());
}

// Subscription timing issue?
[TestMethod]
public void DelayWhile_WhenSubscribeWithDelay_EmitNothing()
{
var source = new BehaviorSubject<int>(1);
var emittedValues = new List<int>();
source.DelayWhile(Observable.Return(true)).Subscribe(i => emittedValues.Add(i));
Assert.AreEqual(0, emittedValues.Count);
}

[TestMethod]
public void DelayWhile_CoreScenario()
{
var source = new BehaviorSubject<int>(1);
var delay = new BehaviorSubject<bool>(false);
var emittedValues = new List<int>();

// Since no delay when subscribing, emit value
source.DelayWhile(delay).Subscribe(i => emittedValues.Add(i));
Assert.AreEqual(1, emittedValues.Single());

// Turn on delay and buffer up a few; nothing emitted
delay.OnNext(true);
source.OnNext(2);
source.OnNext(3);
Assert.AreEqual(1, emittedValues.Single());

// Turn off delay; should release the buffered items
delay.OnNext(false);
Assert.IsTrue(emittedValues.SequenceEqual(new int[] { 1, 2, 3 }));
}

最佳答案

编辑:我忘记了使用基于JoinJoin 的运算符(如WithLatestFrom) 当有两个冷的可观察对象时。不用说,下面提到的关于缺乏交易的批评比以往任何时候都更加明显。

我会推荐这个,它更像是我原来的解决方案,但使用了 Delay 重载。它通过了除 DelayWhile_WhenSubscribeWithDelay_EmitNothing 之外的所有测试。为了解决这个问题,我将创建一个接受起始默认值的重载:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay, bool isGateClosedToStart)
{
return source.Publish(_source => delay
.DistinctUntilChanged()
.StartWith(isGateClosedToStart)
.Publish(_delay => _delay
.Select(isGateClosed => isGateClosed
? _source.TakeUntil(_delay).Delay(_ => _delay)
: _source.TakeUntil(_delay)
)
.Merge()
)
);
}

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return DelayWhile(source, delay, false);
}

旧答案:

我最近读了一本书,批评 Rx 不支持事务,我第一次尝试解决这个问题就是一个很好的例子,为什么:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return source.Publish(_source => delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => _delay
.Select(isGateClosed => isGateClosed
? _source.Buffer(_delay).SelectMany(l => l)
: _source)
.Switch()
)
);
}

应该可以工作,除了有太多东西依赖于 delay observable,而且订阅顺序很重要:在这种情况下,SwitchBuffer 结束之前切换,因此当延迟门关闭时不会有任何结果。

这可以通过以下方式解决:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return source.Publish(_source => delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => _delay
.Select(isGateClosed => isGateClosed
? _source.TakeUntil(_delay).Buffer(_delay).SelectMany(l => l)
: _source.TakeUntil(_delay)
)
.Merge()
)
);
}

我的下一次尝试通过了你所有的测试,并且使用了你想要的 Observable.Delay 重载:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => source
.Join(_delay,
s => Observable.Empty<Unit>(),
d => _delay,
(item, isGateClosed) => isGateClosed
? Observable.Return(item).Delay(, _ => _delay)
: Observable.Return(item)
)
.Merge()
);
}

Join 可以简化为 WithLatestFrom,如下所示:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
return delay
.DistinctUntilChanged()
.StartWith(false)
.Publish(_delay => source
.WithLatestFrom(_delay,
(item, isGateClosed) => isGateClosed
? Observable.Return(item).Delay(_ => _delay)
: Observable.Return(item)
)
.Merge()
);
}

关于system.reactive - 如何在另一个可观察对象发出 true 时缓冲项目,并在 false 时释放它们,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43421284/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com