gpt4 book ai didi

Azure 服务总线接收和删除

转载 作者:行者123 更新时间:2023-12-02 23:07:44 25 4
gpt4 key购买 nike

因此,我创建了示例应用程序,其中应用程序将消息发送到队列并读取消息。在此应用程序中,我使用“ReceiveAndDelete”,下面是示例代码

创建消息

private static async void CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);

// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}

接收消息

// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");

// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}

static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
PrefetchCount = 1
};


await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{

// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;

// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;

// start processing
await processor.StartProcessingAsync();

Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();

// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}

主要方法

static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");

await ReceiveMessagesAsync();
}
catch (Exception ex)
{

throw;
}
Console.ReadKey();
}

一切都很好,但是一旦应用程序调用“awaitprocessor.StartProcessingAsync();”即使所有消息尚未处理,也会从队列中读取所有消息。在我的示例中,我在队列中有两条消息,但是当“等待处理器.StartProcessingAsync();”时称为消息计数变为零(基本上消息已出列),并且它开始一条一条地处理消息。据我了解,如果消息尚未开始处理,那么它应该在队列中。在本示例中,仅应从队列中删除一条消息,并且第二条消息应在队列中可见。

这是预期的行为还是我在这里遗漏了一些东西?

最佳答案

Is the expected behavior or am I missing something here?

这是 ReceiveAndDelete 模式的预期行为。消息发送到客户端后,无论客户端是否能够处理该消息,服务总线都会立即将其删除。

从此link :

This operation receives a message from a queue or subscription, andremoves the message from that queue or subscription in one atomicoperation.

如果您想控制此行为,您可能需要以 PeekLock 模式获取消息,处理消息,然后调用消息的 Complete 方法来删​​除该消息如果处理成功。

更新

我尝试了你的代码,以下是我的观察结果:

  1. PrefetchCount = 1 时,第一次从队列中提取 2 条消息并将其删除。之后,将提取并删除一条消息。可能的解释是,预取 1 条消息,并根据请求获取 1 条消息。

  2. 使用 PrefetchCount = 0(或从“processorOptions”中省略),将提取并删除一条消息。

请尝试以下代码:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace SO67076189
{
class Program
{
static string connectionString = "connection-string";
static string queueName = "queue-name";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");

await ReceiveMessagesAsync();
}
catch (Exception ex)
{

throw;
}
Console.ReadKey();
}

private static async Task CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);

// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}

static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");

// complete the message. messages is deleted from the queue.
//await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}

static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
//AutoCompleteMessages = false,
//MaxConcurrentCalls = 1,
//MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
//PrefetchCount = 1
};


await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{

// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;

// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;

// start processing
await processor.StartProcessingAsync();

Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();

// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
}
}

关于Azure 服务总线接收和删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67076189/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com