gpt4 book ai didi

c# - 从不同线程访问后释放 BrokeredMessage

转载 作者:行者123 更新时间:2023-12-03 05:56:28 25 4
gpt4 key购买 nike

这可能是 this question 的重复项但这与批处理数据库更新的讨论相混淆,并且仍然没有正确的答案。

在使用 Azure 服务总线队列的简单示例中,我无法在将 BrokeredMessage 放入队列后访问它;如果我从另一个线程读取队列,它总是被处理。

示例代码:

class Program {
private static string _serviceBusConnectionString = "XXX";

private static BlockingCollection<BrokeredMessage> _incomingMessages = new BlockingCollection<BrokeredMessage>();
private static CancellationTokenSource _cancelToken = new CancellationTokenSource();

private static QueueClient _client;

static void Main(string[] args) {

// Set up a few listeners on different threads
Task.Run(async () => {
while (!_cancelToken.IsCancellationRequested) {
var msg = _incomingMessages.Take(_cancelToken.Token);
if (msg != null) {
try {
await msg.CompleteAsync();
Console.WriteLine($"Completed Message Id: {msg.MessageId}");
} catch (ObjectDisposedException) {
Console.WriteLine("Message was disposed!?");
}
}
}
});


// Now set up our service bus reader
_client = GetQueueClient("test");

_client.OnMessageAsync(async (message) => {
await Task.Run(() => _incomingMessages.Add(message));
},
new OnMessageOptions() {
AutoComplete = false
});

// Now start sending
Task.Run(async () => {
int sent = 0;
while (!_cancelToken.IsCancellationRequested) {
var msg = new BrokeredMessage();
await _client.SendAsync(msg);
Console.WriteLine($"Sent {++sent}");
await Task.Delay(1000);
}
});

Console.ReadKey();
_cancelToken.Cancel();

}

private static QueueClient GetQueueClient(string queueName) {

var namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceBusConnectionString);
if (!namespaceManager.QueueExists(queueName)) {
var settings = new QueueDescription(queueName);
settings.MaxDeliveryCount = 10;
settings.LockDuration = TimeSpan.FromSeconds(5);
settings.EnableExpress = true;
settings.EnablePartitioning = true;
namespaceManager.CreateQueue(settings);
}

var factory = MessagingFactory.CreateFromConnectionString(_serviceBusConnectionString);
factory.RetryPolicy = new RetryExponential(minBackoff: TimeSpan.FromSeconds(0.1), maxBackoff: TimeSpan.FromSeconds(30), maxRetryCount: 100);
var queueClient = factory.CreateQueueClient(queueName);

return queueClient;
}
}

我尝试过设置,但无法使其正常工作。有什么想法吗?

最佳答案

通过 Serkant Karaca @ Microsoft 的回复回答我自己的问题 here :

Very basic rule and I am not sure if this is documented. The received message needs to be processed in the callback function's life time. In your case, messages will be disposed when async callback completes, this is why your complete attempts are failing with ObjectDisposedException in another thread.

I don't really see how queuing messages for further processing helps on the throughput. This will add more burden to client for sure. Try processing the message in the async callback, that should be performant enough.

SCSS 。

关于c# - 从不同线程访问后释放 BrokeredMessage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40739063/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com