gpt4 book ai didi

c# - .NET Disruptor 异步模式

转载 作者:行者123 更新时间:2023-12-02 17:15:11 24 4
gpt4 key购买 nike

我正在使用 Disruptor-net在 C# 应用程序中。我在理解如何在破坏者模式中执行异步操作时遇到了一些困难。

假设我有几个事件处理程序,并且链中的最后一个将消息传递给我的业务逻辑处理器,我如何处理我的业务逻辑处理器内部的异步操作?当我的业务逻辑需要做一些数据库插入时,它是否将一条消息传递给我的输出中断器,它执行插入,然后在我的输入中断器上发布一条新消息,并包含所有状态以继续事务?

此外,在我的输出中断器中,我会使用任务吗?我 99.9% 确定我想使用任务,这样我就不会有大量事件处理程序阻塞异步操作。那么,这如何适应破坏者模式呢?在我的 EventHandler 中做这样的事情似乎有点奇怪..

void OnEvent(MyEvent evt, long sequence, bool endOfBatch)
{
db.InsertAsync(evt).ContinueWith(task => inputDisruptor.Publish(task));
}

最佳答案

Disruptor 具有以下特点:

  • 专用线程,可以固定/屏蔽/优先处理以获得更好的性能。
  • 显式队列,可以监控并产生背压。
  • 按顺序处理消息。
  • 无堆分配,这有助于减少 GC 暂停,如果您自己的代码不生成堆分配,甚至可以将其移除。

您的代码示例并未真正遵循 Disruptor 理念:

  • Task.ContinueWith 默认异步运行,因此延续将使用线程池线程。
  • 因为您使用的是线程池,所以您无法保证继续执行的顺序。即使您使用 TaskContinuationOptions.ExecuteSynchronously,也不能保证 InsertAsync 会按顺序调用延续。
  • 您正在创建一个包含所有未决插入操作的隐式队列。此队列是隐藏的,不会产生背压。

我将搁置您的代码正在生成堆分配这一事实。您不会从“无 GC 暂停”效果中受益,但对于您的用例来说它可能是非常可接受的。

另请注意,批处理对于支持 IO 操作的高吞吐量至关重要。你真的应该在你的事件处理程序中使用 Disruptor 批处理。

我将问题简化为 3 个事件处理程序:

  • PreInsertEventHandler:预插入逻辑(此处未显示)
  • InsertEventHandler:插入逻辑
  • PostInsertEventHandler:插入后逻辑

当然,我假设插入后逻辑必须仅在插入完成后运行。

如果您的目标是在 InsertEventHandler 中保存事件并在下一个处理程序中处理事件之前阻塞直到完成,您可能应该在 InsertEventHandler 中等待。

插入事件处理程序:

void OnEvent(MyEvent evt, long sequence, bool endOfBatch)
{
_pendingInserts.Add((evt, task: db.InsertAsync(evt)));
if (endOfBatch)
{
var insertSucceeded = Task.WaitAll(_pendingInserts.Select(x => x.task).ToArray(), _insertTimeout);
foreach (var (pendingEvent, _) in _pendingInserts)
{
pendingEvent.InsertSucceeded = insertSucceeded;
}
_pendingInserts.Clear();
}
}

当然,如果您的数据库 API 公开了批量插入方法,最好将事件添加到列表中并在批处理结束时将它们全部保存。

还有许多其他选项,比如在 PostInsertEventHandler 中等待,或者在另一个 Disruptor 中对插入结果进行排队,每个选项都有自己的优缺点。 SO 答案可能不是讨论和分析所有这些问题的最佳场所。

关于c# - .NET Disruptor 异步模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46780319/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com