gpt4 book ai didi

Azure服务总线队列: How To Read Individual Messages From A Queue

转载 作者:行者123 更新时间:2023-12-03 02:39:08 27 4
gpt4 key购买 nike

我有一个应用程序,我可以在流程的一部分中以 JSON 格式将消息写入 Azure 服务总线队列。我有一个下游进程,我希望将该消息从队列中弹出,将 json 转换为对象,然后处理该对象。

我将消息推送到队列中没有问题,但我找不到任何一次从队列中弹出消息或循环弹出消息的示例。我在 Microsoft 或 Github 上看到的每个示例都是一个控制台应用程序(在 Web 应用程序中无用),它设置某种监听器来捕获队列中的所有消息并写入控制台消息。我没有找到弹出消息然后对数据进行一些处理的示例。有没有人有关于如何从队列中弹出消息然后处理它或调用另一个方法来对消息中的数据执行某些操作的示例?

更新:我使用了下面由 Guru Pasupathy 提供的 WindowsAzure.ServiceBus 示例,并使用 Azure Service Bus Queues: How To Read Individual Messages From A Queue 中的以下代码片段结束。从 BrokeredMessage 对象获取消息文本:

Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string messageBody = reader.ReadToEnd();

然后我可以获取 messageBody 并将嵌入的 JSON 反序列化为 POCO 对象,然后我就可以了!现在,我可以在应用程序中更有效地使用队列来执行各种任务。

最佳答案

您可以使用 Peek Lock 接收模式从队列中获取消息,处理它,然后您可以根据您的业务逻辑选择放弃它或完成它。

如果您使用WindowsAzure.ServiceBus nuget包发送/接收消息,则可以使用以下方法基于循环或单次调用逐一使用队列中的消息,而无需使用监听器。

        public void Receive()
{
QueueClient myQueueClient = QueueClient.CreateFromConnectionString("<connectionString>;<queueName>", ReceiveMode.PeekLock);

int someCount = 2; //some random value for testing
try
{
for (int i = 0; i < someCount; i++)
{
BrokeredMessage message = myQueueClient.Receive();
Console.WriteLine("The message is " + message);
message.Complete();
}
}
catch(Exception e)
{
//Handle your expection
}

}

如果您使用的是 Microsoft.Azure.Service nuget 包,那么我找不到在不使用监听器的情况下仅读取单个消息的直接方法。我看到监听器将继续轮询和处理队列,直到没有更多消息为止。

如果您的要求是停止轮询并继续获取所有可用消息,那么作为一种解决方法,您可以在读取一条消息后关闭 QueueClient 实例,并在进程准备好接受下一条消息时打开 qc 并注册处理程序.

        public async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");

BinaryFormatter bf = new BinaryFormatter();
using (MemoryStream ms = new MemoryStream(message.Body))
{
Payload payload = (Payload) bf.Deserialize(ms);

//Based on your needs you may have a condition here based on which you could Abandon or Complete the mesage

await qc.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine("Completed the message --> " + payload.Message + " -- Id --> " + payload.Id);

//await qc.AbandonAsync(message.SystemProperties.LockToken);
//Console.WriteLine("Abandon the message --> " + payload.Message + " -- Id --> " + payload.Id);

qc.CloseAsync(); //If you close the QueueClient instance here, no more messages will be picked up from queue.
}
}

以上示例基于 https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#receive-messages-from-the-queue ,我刚刚在末尾添加了 qc.CloseAsync() 调用。如果没有这一行,监听器将继续处理,直到队列中不再有消息为止。我不确定是否有更好的方法来实现这一点,但想到分享。

希望这有帮助

编辑-

发送消息时,如果您使用自定义类型,可以使用以下内容

                BrokeredMessage message = new BrokeredMessage(new Payload() { Id = 4332, Message = "WindowsAzure package" });
myQueueClient.Send(message);

在接收时,您应该使用 GetBody,如下所示

                BrokeredMessage message = myQueueClient.Receive();
var incoming = message.GetBody<Payload>();
Console.WriteLine("The message is " + incoming.Id + " and " + incoming.Message);
message.Complete();

以下是供您引用的自定义对象

    [Serializable]
public class Payload
{
public int Id { get; set; }
public string Message { get; set; }
}

同样适用于字符串

接收时您可以使用

var incoming = message.GetBody<string>();

发送时您可以发送为

BrokeredMessage message = new BrokeredMessage("WindowsAzure package" );

您可以通过以下链接获取有关不同内容格式的更多详细信息 https://abhishekrlal.com/2012/03/30/formatting-the-content-for-service-bus-messages/

关于Azure服务总线队列: How To Read Individual Messages From A Queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61977591/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com