gpt4 book ai didi

c# - .Net 中的 Reactive Rx zip 队列

转载 作者:行者123 更新时间:2023-11-30 20:26:09 28 4
gpt4 key购买 nike

我对响应式编程的概念还很陌生。我正在使用 Bonsai ,它通过 C# 公开了一些但不是全部的 .Net rx 命令。

我正在尝试获得像这个弹珠图这样的行为:

input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e

基本上,输入 2 生成应该存储在队列中的事件波。输入 1 用作从该队列发出单个项目的触发器。

当队列为空时,应发出队列的最后一项。我尝试了 zip 和 combineLatest 的各种组合,但无法获得所需的行为。

我还尝试了基于 this postWithLatestFrom 实现,但回想起来我意识到这也不会产生所需的行为。

public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
IObservable<TSource> source,
IObservable<TOther> other)
{


// return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
}

是否有任何运算符或运算符组合会产生此行为?一旦我了解了要使用的运算符,我就可以对 Bonsai 进行实现。

更新 1:2018/05/18

根据 Sentinel 的帖子,我在 Bonsai 命名空间内编写了一个新类 DiscriminatedUnion。虽然我没有设法指定适当的类型。编译器声明“无法推断 Merge 的类型参数”(在 .Merge(input1.Select... 中)。在哪里添加正确的类型规范?

using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;
namespace Bonsai.Reactive
{
[Combinator]
// [XmlType(Namespace = Constants.XmlNamespace)]
[Description("Implementation of Discriminated Union")]
public class DiscriminatedUnion
{
public IObservable<int?> Process<TInput1, TInput2>(
IObservable<TInput1> input1,
IObservable<TInput2> input2)
{
var merged =
input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
.Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
.Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
{
int? next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue(val.Item2);
}
return Tuple.Create(next, state.Item2, val.Item1);
})
.Where(x => (x.Item1 != null && x.Item3 == 1))
.Select(x => x.Item1);
return merged;
}
}
}

最佳答案

这是使用 NuGet 包 Microsoft.Reactive.Testing 对您的问题(或弹珠图)进行测试的表示:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(1000.Ms(), 1),
ReactiveTest.OnNext(2000.Ms(), 2),
ReactiveTest.OnNext(3000.Ms(), 3),
ReactiveTest.OnNext(4000.Ms(), 4),
ReactiveTest.OnNext(5000.Ms(), 5),
ReactiveTest.OnNext(6000.Ms(), 6),
ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1400.Ms(), "a"),
ReactiveTest.OnNext(1500.Ms(), "b"),
ReactiveTest.OnNext(1600.Ms(), "c"),
ReactiveTest.OnNext(5500.Ms(), "d"),
ReactiveTest.OnNext(5600.Ms(), "e"),
ReactiveTest.OnNext(5700.Ms(), "f")
);

使用此扩展方法:

public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}

该问题基本上是一个状态机问题,涉及两个不同类型的可观察对象。解决此问题的最佳方法是使用 Discriminated Union类型,它在 C# 中不存在,因此我们将创建一个。 @Sentinel 的回答是用一个元组来完成的,它也可以工作:

public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}

public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}

public bool IsType1 { get; }
public bool IsType2 => !IsType1;

public T1 Type1Item { get; }
public T2 Type2Item { get; }
}

然后我们可以将我们的两个不同类型的流,SelectMerge 合并到一个可区分的联合流中,我们可以在其中使用 Scan< 管理状态。您的状态逻辑有点棘手,但可行:

  • 如果一个号码到达并且队列中没有项目,则什么也不做
  • 如果一个数字到达并且队列中有项目,则发出队列中的第一个项目。
    • 如果有多个项目,请从队列中删除最近发布的项目。
    • 如果队列只有一个项目,不删除它,进入“假空”状态。
  • 如果字符串到达​​,将其放入队列中。
    • 如果队列为“假空”,弹出最后一项并退出“假空”状态。

这是生成的可观察对象(使用 NuGet 包 System.Collections.Immutable):

var result = input1.Select(i => new DUnion<int, string>(i))
.Merge(input2.Select(s => new DUnion<int, string>(s)))
.Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
? state.queue.IsEmpty
? (state.queue, null, false, false) //Is integer, but empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true)
: (state.queue.Dequeue(), state.queue.Peek(), false, true)
: state.isFakeEmptyState //Is new string, just add to queue, don't emit
? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false)
: (state.queue.Enqueue(dItem.Type2Item), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);

然后可以按如下方式进行测试:

var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.

更新:我稍微考虑了一下,我认为将一些运算符放在 Discriminated Union 功能周围是有意义的。这样您就不必明确处理类型:

public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}

public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}

public bool IsType1 { get; }
public bool IsType2 => !IsType1;

public T1 Type1Item { get; }
public T2 Type2Item { get; }
}

public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x)));
}

public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}

有了这些扩展方法,解决方案就变成了这样,我认为这样读起来更好:

var result = input1
.Union(input2)
.ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false),
(state, _) => state.queue.IsEmpty
? (state.queue, null, false, false) //empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: (state.queue.Dequeue(), state.queue.Peek(), false, true),
(state, s) => state.isFakeEmptyState
? (state.queue.Dequeue().Enqueue(s), null, false, false)
: (state.queue.Enqueue(s), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);

如果您在命名元组语法方面遇到问题,那么您可以使用旧的元组:

var result = input1
.Union(input2)
.ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
(state, _) => state.Item1.IsEmpty
? Tuple.Create(state.Item1, (string)null, false, false) //empty queue, so don't emit item
: state.Item1.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
(state, s) => state.Item3
? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
: Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
)
.Where(t => t.Item4)
.Select(t => t.Item2);

关于c# - .Net 中的 Reactive Rx zip 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50395951/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com