gpt4 book ai didi

c# - 使用 RX 组成命令总线

转载 作者:太空宇宙 更新时间:2023-11-03 20:28:04 25 4
gpt4 key购买 nike

我正在熟悉 RX,作为我的实验项目,我正在尝试创建一个概念上类似于此的简单命令总线:

class Bus
{
Subject<Command> commands;
IObservable<Invocation> invocations;

public Bus()
{
this.commands = new Subject<Command>();
this.invocations = commands.Select(x => new Invocation { Command = x }).Publish();
}

public IObserver<Command> Commands
{
get { return this.commands; }
}

public IObservable<Invocation> Invocations
{
get { return this.invocations; }
}
}

class Invocation
{
public Command Command { get; set; }
public bool Handled { get; set; }
}

这个想法是,模块可以在启动时使用 Invocations 属性安装命令处理程序,并且可以将他们希望的任何过滤应用于他们的订阅。另一方面,客户端可以通过调用 Commands.OnNext(command) 来触发命令执行。

但是,我希望总线能够保证提交的每个命令都将由一个处理程序处理。也就是说,理想情况下,OnNext 处理应该在第一个处理程序将 Invocation.Handled 设置为 true 时立即终止,并且应该抛出异常,如果在 OnNext() 结束时,Invocation.Handled 仍然是 false。

我试着创建自己的 ISubject、IObservable 和 IObserver 实现,但这感觉“又脏又便宜”;)

我正在努力了解 RX 提供的组合能力。以组合的方式,我怎样才能提供“恰好一次”的保证?

感谢您提供的任何见解。

最佳答案

实际上,您在这里的想法大体上是正确的。你只需要做实际的调度。为此,SelectMany 将提供帮助:

class Bus
{
Subject<Command> commands;
Subject<Invocation> invocations;

// TODO: Instantiate me
List<Func<Command, bool>> handlerList;

public Bus()
{
this.commands = new Subject<Command>();
this.invocations = new Subject<Invocation>();

commands.SelectMany(x => {
// This FirstOrDefault() is just good ol' LINQ
var passedHandler =
handlerList.FirstOrDefault(handler => handler(x) == true);

return passedHandler != null ?
Observable.Return(new Invocation() { Command = x, Handled = true}) :
Observable.Throw<Invocation>(new Exception("Unhandled!"));
}).Multicast(invocations).Connect();
}

/* ... snip ... */
}

但是,老实说,这并不能真正展示 Rx 的强大功能,因为它是同步执行处理程序列表的。让我们通过使其完全非阻塞来使其更具吸引力。

首先,我们将 Func 原型(prototype)更改为 Func<Command, IObservable<Invocation>> .这意味着,一种接受命令并产生 Future Invocation 结果的方法 (a-la Task<T>)。然后,我们可以获得相同的行为,但我们的处理程序通过此选择器异步(通过 TextArea 提前编码):

commands.SelectMany(x =>
handlerList.ToObservable()
.Select(h => Observable.Defer(() => h(x)))
.Concat()
.SkipWhile(x => x.Handled == false)
.TakeLast(1))
.Multicast(invocations).Connect();

这是对 Rx 的相当研究生级别的使用,但想法是,对于每个命令,我们将首先创建一个处理程序流并按顺序运行它们(这就是 Defer + Concat 所做的),直到我们找到一个 Handled 为 true 的,然后取出最后一个。

外层的 SelectMany 选择一个命令流到一个 future 结果流中(即类型是 IO<IO<Invocation>> 然后将它展平,所以它变成一个结果流。

从来没有阻塞,非常简洁,100% 可测试,类型安全的代码,只是表达了一个非常复杂的想法,如果用命令式编写的话会很难看。这就是 Rx 很酷的原因。

关于c# - 使用 RX 组成命令总线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9246885/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com