gpt4 book ai didi

c# - 事件处理器主机未接收消息

转载 作者:行者123 更新时间:2023-12-03 04:23:10 26 4
gpt4 key购买 nike

我有一个 azure 辅助角色,其事件处理器主机连接到 azure 事件中心。由于某种未知的原因 - 它不会收到任何消息。

日志显示它为每个分区打开一个 EventProcessor - 并且没有错误 - 但从未调用 ProcessEventsAsync

使用 Service Bus Explorer,我可以看到它在处理器关闭时接收消息,而当处理器打开时,它会抛出接收器打开的异常。

  • 我确实让它工作过一次,但重新启动后它没有继续工作

我不知道接下来该看哪里 - 但这是 worker 角色的代码

public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);

private EventProcessorHost _eventProcessorHost;
private IEventProcessorFactory _processorFactory;
private ConfigurationProvider configuration = new ConfigurationProvider();
private string _eventHubConnectionString;
private string _storageAccountConnectionString;
private string _dbConnectionString;

public override void Run()
{
Trace.TraceInformation("EventHubWorker is running");


try
{
RunAsync(_cancellationTokenSource.Token).Wait();
}
finally
{
_runCompleteEvent.Set();
}
}

public override bool OnStart()
{
Trace.TraceInformation("EventHubWorker is starting");
CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance);
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
SqlMapper.AddTypeHandler(new DateTimeHandler());
_eventHubConnectionString = configuration.EventHubConnectionString;
_dbConnectionString = configuration.DbConnectionString;
_storageAccountConnectionString = configuration.StorageConnectionString;
string hostName = Guid.NewGuid().ToString();
var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName);

_eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName,
_eventHubConnectionString, _storageAccountConnectionString);

var partitionOptions = new PartitionManagerOptions()
{
LeaseInterval = new TimeSpan(0, 5, 0)
};
_processorFactory = new EventProcessorFactory(/* some data for dependency injection */);

return base.OnStart();
}

public override void OnStop()
{
Trace.TraceInformation("EventHubWorker is stopping");

_cancellationTokenSource.Cancel();
_runCompleteEvent.WaitOne();
base.OnStop();

Trace.TraceInformation("EventHubWorker has stopped");
}

private async Task RunAsync(CancellationToken cancellationToken)
{
int retryCount = 0;
var exceptions = new List<Exception>();
async Task StartProcessing()
{
if (retryCount > 5)
{
throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions);
}
try
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions
{
InitialOffsetProvider = o => DateTime.UtcNow,
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
});
}
catch(MessagingException e) when (e.IsTransient)
{
retryCount++;
exceptions.Add(e);
await StartProcessing();
}
}
var options = new EventProcessorOptions();
options.ExceptionReceived += Options_ExceptionReceived;

await StartProcessing();

cancellationToken.WaitHandle.WaitOne();
await _eventProcessorHost.UnregisterEventProcessorAsync();
}

private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
Trace.TraceError(e.Exception.Message);
}
}

这是 EventProcessor 代码 - 工厂本身似乎无关

class EventProcessor : IEventProcessor
{
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
//never logged
Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed");
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
else
{
Trace.TraceError(reason.ToString());
}
}

public Task OpenAsync(PartitionContext context)
{
//always logs with the expected lease information
Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}");
return Task.FromResult<object>(null);
}

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
Trace.TraceInformation("processing event"); //never called
// processing code
}

最佳答案

PartitionManagerOptions 的最大租约间隔为 60 秒(与 blob 租约相同)EventProcessorHost 在最初获取租约时不会抛出异常。尝试将租用间隔设置为 60 秒而不是 5 分钟。

关于c# - 事件处理器主机未接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46621968/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com