gpt4 book ai didi

domain-driven-design - EventSourced Saga实现

转载 作者:行者123 更新时间:2023-12-04 18:36:55 25 4
gpt4 key购买 nike

我已经写了一个事件源聚合,现在实现了一个事件源传奇...我注意到两者都是相似的,并创建了一个事件源对象作为两者的派生基类。

我在http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/上看到了一个演示,但是感觉可能存在问题,因为在发送崩溃命令时,如果进程崩溃,命令可能会丢失?

public void Save(ISaga saga)
{
var events = saga.GetUncommittedEvents();
eventStore.Write(new UncommittedEventStream
{
Id = saga.Id,
Type = saga.GetType(),
Events = events,
ExpectedVersion = saga.Version - events.Count
});

foreach (var message in saga.GetUndispatchedMessages())
bus.Send(message); // can be done in different ways

saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
}

相反,我使用的是格雷格·杨(Greg Young)的EventStore,当我保存EventSourcedObject(聚合或传奇)时,序列如下:
  • 存储库获取新的MutatingEvents的列表。
  • 将它们写入流。
  • 当将流写入并提交到流时,
  • EventStore会触发新事件。
  • 我们监听EventStore中的事件,并在EventHandlers中处理它们。

  • 我正在实现传奇的两个方面:
  • 接受事件,这些事件可以转换状态,从而可以发出命令。
  • 要发出警报,以便将来在某个时间点(通过外部计时器服务)可以被回叫)。

  • 问题
  • 据我所知,事件处理程序不应发出命令(如果命令失败会发生什么情况?)-但我可以接受上述命令,因为Saga是通过此事件控制命令创建(响应事件)的实际内容代理,并且命令发送的任何失败都可以在外部处理(在处理CommandEmittedFromSaga并在命令失败时重新发送的外部EventHandler中)?
  • 还是我忘记包装事件并将 native CommandsEvents存储在同一流中(与基类Message混合-Saga会同时使用Command和Events,而Aggregate将只使用Event)?
  • 网上是否有其他引用资料可用于实现事件来源的Sagas?我可以理智地检查我的想法吗?

  • 下面是一些背景代码。

    Saga发出要运行的命令(包装在CommandEmittedFromSaga事件中)

    下面的命令包含在事件中:
    public class CommandEmittedFromSaga : Event
    {
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
    Command = command;
    SagaType = sagaType;
    SagaIdentity = sagaIdentity;
    }
    }

    Saga在将来的某个时间点请求回调(AlarmRequestedBySaga事件)

    警报回调请求包装在事件旁边,并且将在请求的时间或之后触发并向Saga回传事件:
    public class AlarmRequestedBySaga : Event
    {
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
    Identity = identity;
    SagaType = sagaType;
    Event = @event;
    FireOn = fireOn;
    }
    }

    或者,我可以将命令和事件存储在基本类型Message 的同一流中
    public abstract class EventSourcedSaga
    {
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
    Identity = id;

    if (messages == null) throw new ArgumentNullException(nameof(messages));

    var count = 0;

    foreach (var message in messages)
    {
    var ev = message as Event;
    var command = message as Command;

    if(ev != null) Transition(ev);
    else if(command != null) _messages.Add(command);
    else throw new Exception($"Unsupported message type {message.GetType()}");

    count++;
    }

    if (count == 0)
    throw new ArgumentException("No messages provided");

    // All we need to know is the original number of events this
    // entity has had applied at time of construction.
    _unmutatedVersion = count;
    _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
    return _messages.ToArray();
    }

    public void Transition(Event e)
    {
    _messages.Add(e);
    _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
    // Don't add a command whilst we are in the constructor. Message
    // state transition during construction must not generate new
    // commands, as those command will already be in the message list.
    if (_constructing) return;

    _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
    }

    最佳答案

    我相信前两个问题是对流程管理者的错误理解所致(又名Sagas,请参阅底部的术语说明)。

    转变思维

    似乎您正在尝试将其建模为逆聚合(就像我曾经做的那样)。这样做的问题是:集合的“社会契约”是其输入(命令)可以随时间变化(因为系统必须能够随时间变化),而其输出(事件)却不能。写入事件后,事件就成为历史,系统必须始终能够处理这些事件。有了该条件,就可以从不可变事件流中可靠地加载聚合。

    如果您只是尝试将输入和输出作为流程管理器实现反转,那么它的输出就不会成为记录问题,因为随着时间的推移,命令可能会过时并从系统中删除。当您尝试使用已删除的命令加载流时,它将崩溃。因此,无法可靠地从不可变消息流中重新加载建模为反向聚合的流程管理器。 (好吧,我确定您可以设计一种方法……但这是明智的吗?)

    因此,让我们考虑一下它所取代的过程,以考虑实现一个流程管理器。以一个管理订单履行等流程的员工为例。为此用户要做的第一件事是在用户界面中设置一个 View ,供他们查看。您要做的第二件事是在UI中创建按钮,以使用户执行操作以响应他们在 View 上看到的内容。前任。 “此行包含PaymentFailed,所以我单击CancelOrder。此行包含PaymentSucceededOrderItemOutOfStock,所以我单击ChangeToBackOrder。此顺序为Pending且已使用1天,所以我单击FlagOrderForReview“...等。一旦确定了正确的决策流程并开始占用用户太多时间,您就可以自动执行此流程。为了使它自动化,其他所有东西都可以保持不变( View ,甚至包括某些UI,以便您可以对其进行检查),但是用户已经变成了一段代码。

    “走开,否则我将用一个非常小的Shell脚本代替您。”

    现在,流程管理器代码会定期读取 View ,并且如果存在某些数据条件,则可以发出命令。本质上,Process Manager的最简单版本是一些可在计时器上运行的代码(例如,每小时一次),并且取决于特定的 View 。那是我开始的地方...您已经拥有的东西( View / View 更新器)和最少的添加(定期运行的代码)。即使您以后决定针对某些用例需要不同的功能,“Future You”也将更好地了解需要解决的特定缺陷。

    这是一个使您想起Gall's law以及YAGNI的好地方。

    1. Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?


    很难找到好的 Material ,因为这些概念具有非常可延展的实现,并且有各种各样的示例,其中许多示例出于通用目的而进行了过度设计。但是,这是我在答案中使用的一些引用。

    DDD - Evolving Business Processes
    DDD/CQRS Google Group(大量阅读 Material )

    请注意,术语Saga具有与Process Manager不同的含义。常见的传奇实现基本上是每个步骤都包含一个路由 list ,并且其相应的故障补偿都包含在 list 中。这取决于路由选择 list 的每个接收者执行在路由选择 list 上指定的内容,并将其成功传递到下一跳,或者执行故障补偿并向后路由。当处理由不同组管理的多个系统时,这可能有点过于乐观,因此通常使用流程管理器代替。 See this SO question了解更多信息。

    关于domain-driven-design - EventSourced Saga实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33429626/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com