gpt4 book ai didi

c# - 强制 EventProcessorHost 重新传递失败的 Azure 事件中心 eventData 到 IEventProcessor.ProcessEvents 方法

转载 作者:IT王子 更新时间:2023-10-29 04:06:41 25 4
gpt4 key购买 nike

该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2 ,以及它的依赖项 WindowsAzure.ServiceBus package v3.0.1处理 Azure 事件中心消息。

该应用程序具有 IEventProcessor 的实现.当从 ProcessEventsAsync 抛出未处理的异常时方法EventProcessorHost永远不会将这些消息重新发送到正在运行的 IEventProcessor 实例. (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)

有没有办法强制导致异常的事件消息由 EventProcessorHost 重新发送?到 IEventProcessor执行?

此评论中针对几乎相同的问题提供了一种可能的解决方案:
Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync

该评论建议保留最后成功处理的事件消息的副本,并在 ProcessEventsAsync 中发生异常时使用该消息显式检查点。 .但是,在实现并测试了这样的解决方案后,EventProcessorHost仍然不重新发送。实现非常简单:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}

行动中的事物分析:
enter image description here

部分日志示例可在此处获得: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

最佳答案

TLDR : 唯一可靠的方法将失败的一批事件重播到 IEventProcessor.ProcessEventsAsync是- Shutdown EventProcessorHost (又名 EPH)立即 - 使用 eph.UnregisterEventProcessorAsync()或通过 terminating the process - 视情况而定。这会让其他 EPH实例获取此分区的租约并从上一个检查点开始。

在解释这个之前 - 我想指出,这是一个 好问题 确实,这是我们必须为 EPH 做出的最艰难的设计选择之一.在我看来,这是一种权衡黑白:usability/supportabilityEPH框架,vs Technical-Correctness .

理想情况本来是:当用户代码在 IEventProcessorImpl.ProcessEventsAsync 中时抛出异常 - EPH图书馆不应该捕获这个。应该让这个Exception - 使进程和 crash-dump 崩溃清楚地显示了 callstack负责任的。我仍然相信-这是最technically-correct解决方案。

现状 : 契约(Contract)IEventProcessorImpl.ProcessEventsAsync API & EPH是,

  • 只要EventData可以从 EventHubs 服务接收 - 继续使用 IEventProcessorImplementation.ProcessEventsAsync 调用用户回调( EventData's ) & 如果用户回调在调用时抛出错误,通知 EventProcessorOptions.ExceptionReceived .
  • 里面的用户代码 IEventProcessorImpl.ProcessEventsAsync应该处理所有错误并合并 Retry's有必要的。 EPH不会在此回调上设置任何超时,以让用户完全控制处理时间。
  • 如果特定事件是问题的原因 - 标记 EventData具有特殊属性 - 例如:type= poison-event并重新发送到相同的 EventHub (包括指向实际事件的指针,将这些 EventData.OffsetSequenceNumber 复制到新的 EventData.ApplicationProperties 中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理中毒事件 .
  • 如果您处理了所有可能的情况并且仍然遇到 Exceptions - 捕获他们并关闭 EPHfailfast有这个异常(exception)的过程。当EPH回来 - 它将从它离开的地方开始。


  • 为什么检查点“旧事件”不起作用 (阅读 this 以了解 EPH 一般):

    幕后制作, EPH正在为每个 EventHub 消费者组分区的接收器运行一个泵 - 其工作是从给定的 checkpoint 启动接收器(如果存在)并创建 IEventProcessor 的专用实例实现然后 receive来自指定的 EventHub 分区来自指定的 Offset在检查点(如果不存在 - EventProcessorOptions.initialOffsetProvider)并最终调用 IEventProcessorImpl.ProcessEventsAsync . Checkpoint的目的是为了能够可靠地开始处理消息,当 EPH process Shutsdown 和 Partition 的所有权转移到另一个 EPH实例。所以, checkpoint只会在启动 PUMP 时消耗,并且会 不是 读,一旦泵启动。

    在我写这篇文章时, EPH是在版本 2.2.10 .

    more general reading on Event Hubs...

    关于c# - 强制 EventProcessorHost 重新传递失败的 Azure 事件中心 eventData 到 IEventProcessor.ProcessEvents 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41006498/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com