gpt4 book ai didi

c# - 使用 Azure 服务总线队列监听器定期收到 RenewToken 上的未经授权的访问错误

转载 作者:行者123 更新时间:2023-12-02 05:56:08 27 4
gpt4 key购买 nike

我的队列接收器定期收到 Microsoft.Azure.ServiceBus.ServiceBusException(下面的消息已删除敏感信息)。 SAS 键具有发送/监听访问权限,并且随着处理正常进行,该错误似乎无关紧要。然而,该消息在我的仪表板中造成了信噪比问题(每天收到 10-70 个错误)。关于为什么会发生这种情况有什么想法吗?监听器正在 Azure 应用服务中运行,但我认为这并不重要。我已经调整了重试逻辑,以使用 RetryExponential 进行 1 秒到 1 分钟的退避,重试 5 次。

Request for guidance from SDK developers

套餐

Net Core 3.1

Microsoft.Azure.ServiceBus,版本=4.1.3.0,文化=中性,PublicKeyToken=7e3416​​7dcc6d6d8c

错误消息

The link 'xxx;xxx:xxx:xxx:source(address:xxx):xxx' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://xxx.servicebus.windows.net/xxx'.. TrackingId:xxx, SystemTracker:xxx, Timestamp:2020-04-27T09:36:04 The link 'xxx;xxx:xxx:xxx:source(address:xxx):xxx' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://xxx.servicebus.windows.net/xxx'.. TrackingId:xxx, SystemTracker:xxx, Timestamp:2020-04-27T09:36:04

来源

internal delegate TClient ClientFactory<out TClient>(string connectionString, string entityPath,
RetryPolicy retryPolicy);

internal delegate Task OnMessageCallback<in TMessage>(TMessage message,
CancellationToken cancellationToken = default) where TMessage : ICorrelative;

internal sealed class ReceiverClientWrapper<TMessage> : IReceiverClientWrapper<TMessage>
where TMessage : ICorrelative
{
// ReSharper disable once StaticMemberInGenericType
private static readonly Regex TransientConnectionErrorRegex =
new Regex(
@"(The link '([a-f0-9-]+);([0-9]*:)*source\(address:([a-z0-9_]+)\):([a-z0-9_]+)' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim\(s\) are required to perform this operation. Resource: 'sb:\/\/([a-z0-9-_.\/]+)'.. TrackingId:([a-z0-9_]+), SystemTracker:([a-z0-9]+), Timestamp:([0-9]{4}(-[0-9]{2}){2}T([0-9]{2}:){2}[0-9]{2}) )+",
RegexOptions.Compiled | RegexOptions.Multiline | RegexOptions.IgnoreCase);

private readonly IReceiverClient _receiverClient;
private readonly IMessageConverter<TMessage> _messageConverter;
private readonly ILogger _logger;
private readonly int _maximumConcurrency;

public ReceiverClientWrapper(IReceiverClient receiverClient, IMessageConverter<TMessage> messageConverter,
ILogger logger, int maximumConcurrency)
{
_receiverClient = receiverClient;
_messageConverter = messageConverter;
_logger = logger;
_maximumConcurrency = maximumConcurrency;
}

public Task SubscribeAsync(OnMessageCallback<TMessage> onMessageCallback,
OnFailureCallback onFailureCallback, CancellationToken cancellationToken = default)
{
var messageHandlerOptions = CreateMessageHandlerOptions(onFailureCallback, cancellationToken);

async Task Handler(Message message, CancellationToken token)
{
var convertedMessage = _messageConverter.Convert(message);

await onMessageCallback(convertedMessage, cancellationToken);
await _receiverClient.CompleteAsync(message.SystemProperties.LockToken);
}

_receiverClient.RegisterMessageHandler(Handler, messageHandlerOptions);

return Task.CompletedTask;
}

private MessageHandlerOptions CreateMessageHandlerOptions(OnFailureCallback onFailureCallback,
CancellationToken cancellationToken)
{
async Task HandleExceptionAsync(ExceptionReceivedEventArgs arguments)
{
var exception = arguments.Exception;

if (TransientConnectionErrorRegex.IsMatch(exception.Message))
{
_logger.LogWarning(exception, @"Transient connectivity error occurred");

return;
}

await onFailureCallback(exception, cancellationToken);
}

return new MessageHandlerOptions(HandleExceptionAsync)
{
AutoComplete = false,
MaxConcurrentCalls = _maximumConcurrency
};
}

public async ValueTask DisposeAsync()
{
await _receiverClient.CloseAsync();
}
}

internal sealed class SenderClientWrapper<TMessage> : ISenderClientWrapper<TMessage> where TMessage : ICorrelative
{
private readonly ISenderClient _senderClient;
private readonly IMessageConverter<TMessage> _messageConverter;

public SenderClientWrapper(ISenderClient senderClient, IMessageConverter<TMessage> messageConverter)
{
_senderClient = senderClient;
_messageConverter = messageConverter;
}

public Task SendAsync(TMessage message, CancellationToken cancellationToken = default)
{
var internalMessage = _messageConverter.Convert(message);

return _senderClient.SendAsync(internalMessage);
}

public Task SendAsync(IEnumerable<TMessage> messages, CancellationToken cancellationToken = default)
{
var internalMessages = messages
.Select(_messageConverter.Convert)
.ToImmutableArray();

return _senderClient.SendAsync(internalMessages);
}

public async ValueTask DisposeAsync()
{
await _senderClient.CloseAsync();
}
}

internal abstract class AbstractClientWrapperFactory
{
private const int MaximumRetryCount = 5;
private static readonly TimeSpan MinimumRetryBackOff = TimeSpan.FromSeconds(1);
private static readonly TimeSpan MaximumRetryBackOff = TimeSpan.FromMinutes(1);

protected AbstractClientWrapperFactory(IOptions<MessageBusConfiguration> options)
{
Options = options;
}

protected IOptions<MessageBusConfiguration> Options { get; }

protected static string GetEntityPath<TMessage>() where TMessage : class
{
var messageAttribute = typeof(TMessage).GetCustomAttribute<AbstractMessageAttribute>();

if (messageAttribute == null)
{
throw new ArgumentException($@"Message requires {nameof(AbstractMessageAttribute)}");
}

return messageAttribute.EntityName;
}

protected TClient CreateClientEntity<TMessage, TClient>(ClientFactory<TClient> clientFactory)
where TMessage : class
{
var entityPath = GetEntityPath<TMessage>();
var retryPolicy = CreateRetryPolicy();

return clientFactory(Options.Value.ConnectionString, entityPath, retryPolicy);
}

protected static IQueueClient QueueClientFactory(string connectionString, string entityPath,
RetryPolicy retryPolicy)
{
return new QueueClient(connectionString, entityPath, retryPolicy: retryPolicy);
}

private static RetryPolicy CreateRetryPolicy()
{
return new RetryExponential(MinimumRetryBackOff, MaximumRetryBackOff, MaximumRetryCount);
}
}

internal sealed class SenderClientWrapperFactory : AbstractClientWrapperFactory, ISenderClientWrapperFactory
{
private readonly IMessageConverterFactory _messageConverterFactory;

public SenderClientWrapperFactory(IMessageConverterFactory messageConverterFactory,
IOptions<MessageBusConfiguration> options) : base(options)
{
_messageConverterFactory = messageConverterFactory;
}

public ISenderClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
{
return CreateWrapper<TEvent, ITopicClient>(TopicClientFactory);
}

public ISenderClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
{
return CreateWrapper<TRequest, IQueueClient>(QueueClientFactory);
}

private ISenderClientWrapper<TMessage> CreateWrapper<TMessage, TClient>(ClientFactory<TClient> clientFactory)
where TMessage : class, ICorrelative
where TClient : ISenderClient
{
var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
var messageConverter = _messageConverterFactory.Create<TMessage>();

return new SenderClientWrapper<TMessage>(clientEntity, messageConverter);
}

private static ITopicClient TopicClientFactory(string connectionString, string entityPath,
RetryPolicy retryPolicy)
{
return new TopicClient(connectionString, entityPath, retryPolicy);
}
}

internal sealed class ReceiverClientWrapperFactory : AbstractClientWrapperFactory, IReceiverClientWrapperFactory
{
private readonly IMessageConverterFactory _messageConverterFactory;
private readonly ILogger<ReceiverClientWrapperFactory> _logger;

public ReceiverClientWrapperFactory(IOptions<MessageBusConfiguration> options,
IMessageConverterFactory messageConverterFactory,
ILogger<ReceiverClientWrapperFactory> logger) : base(options)
{
_messageConverterFactory = messageConverterFactory;
_logger = logger;
}

public IReceiverClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
{
return CreateReceiverClientWrapper<TEvent, ISubscriptionClient>(SubscriptionClientFactory);
}

public IReceiverClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
{
return CreateReceiverClientWrapper<TRequest, IQueueClient>(QueueClientFactory);
}

private IReceiverClientWrapper<TMessage> CreateReceiverClientWrapper<TMessage, TClient>(
ClientFactory<TClient> clientFactory)
where TMessage : class, ICorrelative
where TClient : IReceiverClient
{
var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
var messageConverter = _messageConverterFactory.Create<TMessage>();

return new ReceiverClientWrapper<TMessage>(clientEntity, messageConverter, _logger,
Options.Value.MaximumConcurrency);
}

private ISubscriptionClient SubscriptionClientFactory(string connectionString, string entityPath,
RetryPolicy retryPolicy)
{
return new SubscriptionClient(connectionString, entityPath, Options.Value.SubscriberName,
retryPolicy: retryPolicy);
}
}

internal sealed class RequestService<TRequest> : IRequestService<TRequest> where TRequest : class, IRequest
{
private readonly Lazy<ISenderClientWrapper<TRequest>> _senderClient;
private readonly Lazy<IReceiverClientWrapper<TRequest>> _receiverClient;

public RequestService(ISenderClientWrapperFactory senderClientWrapperFactory,
IReceiverClientWrapperFactory receiverClientWrapperFactory)
{
_senderClient =
new Lazy<ISenderClientWrapper<TRequest>>(senderClientWrapperFactory.CreateQueueClient<TRequest>,
LazyThreadSafetyMode.PublicationOnly);

_receiverClient
= new Lazy<IReceiverClientWrapper<TRequest>>(receiverClientWrapperFactory.CreateQueueClient<TRequest>,
LazyThreadSafetyMode.PublicationOnly);
}

public Task PublishRequestAsync(TRequest requestMessage, CancellationToken cancellationToken = default)
{
return _senderClient.Value.SendAsync(requestMessage, cancellationToken);
}

public Task PublishRequestAsync(IEnumerable<TRequest> requestMessages,
CancellationToken cancellationToken = default)
{
return _senderClient.Value.SendAsync(requestMessages, cancellationToken);
}

public Task SubscribeAsync(OnRequestCallback<TRequest> onRequestCallback, OnFailureCallback onFailureCallback,
CancellationToken cancellationToken = default)
{
return _receiverClient
.Value
.SubscribeAsync((message, token) => onRequestCallback(message, cancellationToken), onFailureCallback,
cancellationToken);
}

public async ValueTask DisposeAsync()
{
if (_senderClient.IsValueCreated)
{
await _senderClient.Value.DisposeAsync();
}

if (_receiverClient.IsValueCreated)
{
await _receiverClient.Value.DisposeAsync();
}
}

public Task ThrowIfNotReadyAsync(CancellationToken cancellationToken = default)
{
return _senderClient.Value.SendAsync(ImmutableArray<TRequest>.Empty, cancellationToken);
}
}

最佳答案

使用 original Nuget package 从未解决此问题,但是新的Azure.Messaging.ServiceBus package似乎没有这个问题。我已经选择转向这一点。

关于c# - 使用 Azure 服务总线队列监听器定期收到 RenewToken 上的未经授权的访问错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61459576/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com