gpt4 book ai didi

nservicebus - NEventStore NServiceBus 设置

转载 作者:行者123 更新时间:2023-12-04 18:51:30 31 4
gpt4 key购买 nike

使用 NEventStore 时如何集成到 NServiceBus?
我是 NSB ans ES 的新手,并试图在使用 ES 和 CQRS 时找出 NSB 的最佳设置。
我以与示例中的 DispatchCommit 相同的方式连接到 NSB,
https://github.com/joliver/EventStore/blob/master/doc/EventStore.Example/MainProgram.cs

  • 你发布整个 Commit 还是 Commit.Events?
  • 您是否为您的消息创建了一个包装器,因为 NSB 需要在您的消息上使用 IMessage?那么如何发布到正确的队列呢?例如,与 OrderSubmittedEvent 相比,wrapper 是通用的。如果可能的话,我不希望我的事件依赖 NSB,因为那样我的域中也有。

  • 非常感谢一些代码或指导。

    最佳答案

    这是我在生产中使用的:

    public sealed class NServiceBusPublisher : IPublishMessages
    {
    private const string AggregateIdKey = "AggregateId";
    private const string CommitVersionKey = "CommitVersion";
    private const string EventVersionKey = "EventVersion";
    private const string BusPrefixKey = "Bus.";
    private readonly IBus bus;

    public NServiceBusPublisher(IBus bus)
    {
    this.bus = bus;
    }

    public void Dispose()
    {
    GC.SuppressFinalize(this);
    }

    public void Publish(Commit commit)
    {
    for (var i = 0; i < commit.Events.Count; i++)
    {
    var eventMessage = commit.Events[i];
    var busMessage = eventMessage.Body as IMessage;
    AppendHeaders(busMessage, commit.Headers); // optional
    AppendHeaders(busMessage, eventMessage.Headers); // optional
    AppendVersion(commit, i); // optional
    this.bus.Publish(busMessage);
    }
    }
    private static void AppendHeaders(IMessage message, IEnumerable<KeyValuePair<string, object>> headers)
    {
    headers = headers.Where(x => x.Key.StartsWith(BusPrefixKey));
    foreach (var header in headers)
    {
    var key = header.Key.Substring(BusPrefixKey.Length);
    var value = (header.Value ?? string.Empty).ToString();
    message.SetHeader(key, value);
    }
    }
    private static void AppendVersion(Commit commit, int index)
    {
    var busMessage = commit.Events[index].Body as IMessage;
    busMessage.SetHeader(AggregateIdKey, commit.StreamId.ToString());
    busMessage.SetHeader(CommitVersionKey, commit.StreamRevision.ToString());
    busMessage.SetHeader(EventVersionKey, GetSpecificEventVersion(commit, index).ToString());
    }
    private static int GetSpecificEventVersion(Commit commit, int index)
    {
    // e.g. (StreamRevision: 120) - (5 events) + 1 + (index @ 4: the last index) = event version: 120
    return commit.StreamRevision - commit.Events.Count + 1 + index;
    }
    }

    关于nservicebus - NEventStore NServiceBus 设置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5564130/

    31 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com