- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2 ,以及它的依赖项 WindowsAzure.ServiceBus package v3.0.1处理 Azure 事件中心消息。
该应用程序具有 IEventProcessor
的实现.当从 ProcessEventsAsync
抛出未处理的异常时方法EventProcessorHost
永远不会将这些消息重新发送到正在运行的 IEventProcessor
实例. (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)
有没有办法强制导致异常的事件消息由 EventProcessorHost
重新发送?到 IEventProcessor
执行?
此评论中针对几乎相同的问题提供了一种可能的解决方案:
Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync
该评论建议保留最后成功处理的事件消息的副本,并在 ProcessEventsAsync
中发生异常时使用该消息显式检查点。 .但是,在实现并测试了这样的解决方案后,EventProcessorHost
仍然不重新发送。实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
最佳答案
TLDR : 唯一可靠的方法将失败的一批事件重播到 IEventProcessor.ProcessEventsAsync
是- Shutdown
EventProcessorHost
(又名 EPH
)立即 - 使用 eph.UnregisterEventProcessorAsync()
或通过 terminating the process - 视情况而定。这会让其他 EPH
实例获取此分区的租约并从上一个检查点开始。
在解释这个之前 - 我想指出,这是一个 好问题 确实,这是我们必须为 EPH
做出的最艰难的设计选择之一.在我看来,这是一种权衡黑白:usability
/supportability
的EPH
框架,vs Technical-Correctness
.
理想情况本来是:当用户代码在 IEventProcessorImpl.ProcessEventsAsync
中时抛出异常 - EPH
图书馆不应该捕获这个。应该让这个Exception
- 使进程和 crash-dump
崩溃清楚地显示了 callstack
负责任的。我仍然相信-这是最technically-correct
解决方案。
现状 : 契约(Contract)IEventProcessorImpl.ProcessEventsAsync
API & EPH
是,
EventData
可以从 EventHubs 服务接收 - 继续使用 IEventProcessorImplementation.ProcessEventsAsync
调用用户回调( EventData's
) & 如果用户回调在调用时抛出错误,通知 EventProcessorOptions.ExceptionReceived
. IEventProcessorImpl.ProcessEventsAsync
应该处理所有错误并合并 Retry's
有必要的。 EPH
不会在此回调上设置任何超时,以让用户完全控制处理时间。 EventData
具有特殊属性 - 例如:type= poison-event
并重新发送到相同的 EventHub
(包括指向实际事件的指针,将这些 EventData.Offset
和 SequenceNumber
复制到新的 EventData.ApplicationProperties
中)或将其转发到 SERVICEBUS 队列或将其存储在其他地方,基本上,识别并推迟处理中毒事件 . Exceptions
- 捕获他们并关闭 EPH
或 failfast
有这个异常(exception)的过程。当EPH
回来 - 它将从它离开的地方开始。EPH
一般):
EPH
正在为每个 EventHub 消费者组分区的接收器运行一个泵 - 其工作是从给定的
checkpoint
启动接收器(如果存在)并创建
IEventProcessor
的专用实例实现然后
receive
来自指定的 EventHub 分区来自指定的
Offset
在检查点(如果不存在 -
EventProcessorOptions.initialOffsetProvider
)并最终调用
IEventProcessorImpl.ProcessEventsAsync
.
Checkpoint
的目的是为了能够可靠地开始处理消息,当
EPH
process Shutsdown 和 Partition 的所有权转移到另一个
EPH
实例。所以,
checkpoint
只会在启动 PUMP 时消耗,并且会
不是 读,一旦泵启动。
EPH
是在版本
2.2.10 .
关于c# - 强制 EventProcessorHost 重新传递失败的 Azure 事件中心 eventData 到 IEventProcessor.ProcessEvents 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41006498/
我目前正在使用 EventProcessorHost 和简单的 IEventProcessor 实现来实现事件中心读取器。我已经确认正在使用 Paolo Salvatori 的优秀 Service B
我正忙于为 azure 的 EventBus 客户端实现 EventProcessorHost 客户端。 我有一个实现 IEventProcessor 的类,如下所示: public class M
我在工作角色方面遇到问题,该角色似乎成功启动并注册了 EventProcessor 类实现(EventProcessorA 和 EventProcessorB),但它们都没有拾取任何事件。我的意思是,
我有以下问题。 我们使用事件中心。在下面的类中,我们从 IEventProcessor 继承,如您所见,我们使用 Service Locator 。我们不能让它与构造函数/属性注入(inject)一起
我们对CloseAsync(PartitionContext, CloseReason)有以下实现我们的 IEventProcessor 中的方法: public async Task CloseAs
我正在为 Azure EventHub 编写一个工作程序,我希望在 EventProcessorHost 之上进行构建;但是,我在 general 中找不到一些可能相关的详细信息。 documenta
我有 1 个带有 2 个分区的 eventhub,我想聚合我的数据一分钟并将该数据保存到数据库,我正在使用 IEventProcessor 从 eventhub 读取事件。 我能够将数据按原样保存到数
我有 1 个带有 2 个分区的 eventhub,我想聚合我的数据一分钟并将该数据保存到数据库,我正在使用 IEventProcessor 从 eventhub 读取事件。 我能够将数据按原样保存到数
该应用程序使用 .NET 4.6.1 和 Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2 ,以及它的依赖项 Win
我是一名优秀的程序员,十分优秀!