gpt4 book ai didi

Azure EventProcessorHost 和辅助角色

转载 作者:行者123 更新时间:2023-12-02 06:34:13 25 4
gpt4 key购买 nike

我希望获得有关如何将 EventProcessorHost 与辅助角色一起使用的指导。基本上,我希望让 EventProcessorHost 并行处理分区,我想知道应该在辅助角色中放置此类代码,以及是否缺少任何关键内容。

    var manager = NamespaceManager.CreateFromConnectionString(connectionString);
var desc = manager.CreateEventHubIfNotExistsAsync(path).Result;
var client = Microsoft.ServiceBus.Messaging.EventHubClient.CreateFromConnectionString(connectionString, path);
var host = new EventProcessorHost(hostname, path, consumerGroup, connectionString, blobStorageConnectionString);
EventHubProcessorFactory<EventData> factory = new EventHubProcessorFactory<EventData>();
host.RegisterEventProcessorFactoryAsync(factory);

我读到的所有内容都说 EventProcessorHost 将自行划分分区,但是上面的代码足以异步处理所有分区吗?

最佳答案

以下是我们如何从辅助角色处理事件中心的简化版本。我们将该实例保留在 mainWorker 角色中,并调用 IEventProcessor 来开始处理它。

这样我们就可以在 Worker 响应关闭事件等时调用它并关闭它。

编辑:

至于并行处理,IEventProcessor 类在处理完当前事件后,只会从事件中心再获取 10 个事件。为您处理所有精美的隔断租赁事宜。

这是一个同步工作流程,当我扩展到多个辅助角色时,我开始看到分区在实例之间分割,并且速度变得更快等。如果您希望它在其中处理事件中心,则必须推出自己的解决方案另一种方式。

public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost _eventProcessorHost;

public override bool OnStart()
{
ThreadPool.SetMaxThreads(4096, 2048);
ServicePointManager.DefaultConnectionLimit = 500;
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;

var eventClient = EventHubClient.CreateFromConnectionString("consumersConnectionString",
"eventHubName");
_eventProcessorHost = new EventProcessorHost(Dns.GetHostName(), eventClient.Path,
eventClient.GetDefaultConsumerGroup().GroupName,
"consumersConnectionString", "blobLeaseConnectionString");
return base.OnStart();
}

public override void Run()
{
try
{
RunAsync(this._cancellationTokenSource.Token).Wait();
}
finally
{
_runCompleteEvent.Set();
}
}

private async Task RunAsync(CancellationToken cancellationToken)
{
// starts processing here
await _eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>();
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMinutes(1));
}
}

public override void OnStop()
{
_eventProcessorHost.UnregisterEventProcessorAsync().Wait();
_cancellationTokenSource.Cancel();
_runCompleteEvent.WaitOne();
base.OnStop();
}
}

我对特定分区有多个处理器(您可以通过这种方式保证 FIFO),但是您可以轻松实现自己的逻辑,即在我的示例中跳过使用 EventDataProcessor 类和字典查找,只需在其中实现一些逻辑ProcessEventsAsync 方法。

public class EventProcessor : IEventProcessor
{
private readonly Dictionary<string, IEventDataProcessor> _eventDataProcessors;

public EventProcessor()
{
_eventDataProcessors = new Dictionary<string, IEventDataProcessor>
{
{"A", new EventDataProcessorA()},
{"B", new EventDataProcessorB()},
{"C", new EventDataProcessorC()}
}
}

public Task OpenAsync(PartitionContext context)
{
return Task.FromResult<object>(null);
}

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach(EventData eventData in messages)
{
// implement your own logic here, you could just process the data here, just remember that they will all be from the same partition in this block
try
{
IEventDataProcessor eventDataProcessor;
if(_eventDataProcessors.TryGetValue(eventData.PartitionKey, out eventDataProcessor))
{
await eventDataProcessor.ProcessMessage(eventData);
}
}
catch (Exception ex)
{
_//log exception
}
}
await context.CheckpointAsync();
}

public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
if (reason == CloseReason.Shutdown)
await context.CheckpointAsync();
}
}

我们的 EventDataProcessor 之一的示例

public interface IEventDataProcessor
{
Task ProcessMessage(EventData eventData);
}

public class EventDataProcessorA : IEventDataProcessor
{
public async Task ProcessMessage(EventData eventData)
{
// Do Something specific with data from Partition "A"
}
}

public class EventDataProcessorB : IEventDataProcessor
{
public async Task ProcessMessage(EventData eventData)
{
// Do Something specific with data from Partition "B"
}
}

希望这有帮助,到目前为止它对我们来说一直坚如磐石,并且可以轻松扩展到多个实例

关于Azure EventProcessorHost 和辅助角色,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30248007/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com