gpt4 book ai didi

Azure IoT Hub触发器按顺序处理数据

转载 作者:行者123 更新时间:2023-12-03 02:31:35 26 4
gpt4 key购买 nike

我有一个 IoT 设备 (ESP32),每 x 秒向 IoT 中心(免费层)发送数据包。每个数据包都包含一个整数属性PacketID,每次发送新数据包时该属性都会递增,从1开始,依此类推。当 IoT 中心收到数据时,将调用 Azure IoT 中心触发器(托管在门户上)并进一步处理数据 - 首先插入到 CosmosDB(从 StorageQueue 触发器),然后作为消息发送到 SignalR 服务(从另一个 StorageQueue 触发器) )并连接到客户端 Web 应用程序。

IoT 中心触发器如下所示:

    [FunctionName("FramesPacketAdd_IoTHubTrigger")]
public async Task Run(
[IoTHubTrigger("messages/events", Connection = "IoTHubConnectionString")] EventData message, ILogger _logger,
[Queue("dbinsert-frames-packet-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<FramesPacket> dbInsertFramesPacketQueue,
[Queue("eventdata-parse-error-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<string> eventDataParseErrorQueue)
{
try
{
_logger.LogInformation($"C# IoT Hub trigger function FramesPacketAdd processed a message: {Encoding.UTF8.GetString(message.Body.Array)}");

string messageBody = Encoding.UTF8.GetString(message.Body.Array);
var jsonObj = JsonConvert.DeserializeObject<FramesPacket>(messageBody);

var framesPacket = new FramesPacket
{
PacketID = jsonObj.PacketID,
SessionID = jsonObj.SessionID,
DeviceID = jsonObj.DeviceID,
Frames = jsonObj.Frames
};

await dbInsertFramesPacketQueue.AddAsync(framesPacket);
}

catch (Exception ex)
{
await eventDataParseErrorQueue.AddAsync($"[IoTHubTrigger] function [FramesPacketAdd] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");

_logger.LogError($"[IoTHubTrigger] function [FramesPacketAdd_IoTHubTrigger] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
}
}

问题是,当数据包发送速度很快时,例如每个数据包之间间隔 200 毫秒,IoT 中心接收到的数据完全无序,这意味着有时数据包 #7 和 #8 在数据包 #1、# 之前被接收和处理。 2 和 #3,这对于我的客户端应用程序来说是一个问题,因为它依赖于按照从 ESP32 芯片发送的相同顺序接收数据包。我尝试以 1 秒的延迟发送每个数据包,但问题仍然存在,但它似乎只影响前三个数据包 - 有时它们按 3、1、2 的顺序接收,其余的顺序正确。更长的延迟似乎可以完全消除这个问题。我确实相信这是因为函数/触发器的异步性质?最后,我希望能够相对较快地发送它们,每个之间有 200-500 毫秒的延迟。

我对 Azure IoT 中心非常陌生,我的问题是,是否可以在门户端执行某些操作来确保按正确的顺序接收数据包,或者这是此类方法和此类情况的限制接收到数据后需要处理,可能在调用 IoT 中心触发器后使用存储队列或服务总线?

我希望我的问题有意义,否则我非常乐意提供更多详细信息。

提前致谢。

最佳答案

以下是两种可能的路径:

1.使用 EventHub 触发的 Azure Function 处理来自 IoT 中心的事件中心兼容终结点的消息。

保证来自特定设备的每条消息都将发送到同一分区。

分区中的消息是有序的,因此您需要的是每个分区的事件处理器。

您可以使用 Azure 函数来实现此目的,在本例中,Azure 函数所做的是为每个分区创建租约,这使得每个分区只有一个 Azure 函数实例。这意味着如果您有 10 个分区,则将有 10 个 Azure 函数实例处理来自每个分区的消息。现在您要做的是确保在Azure函数中,您按顺序处理消息批处理,例如:

[FunctionName("EventHubTrigger")]
public static async Task RunAsync([EventHubTrigger("ordered", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log)
{
log.Info($"Triggered batch of size {eventDataSet.Length}");

//processing event by event
foreach (var eventData in eventDataSet) {
try
{
// process the event here
}
catch
{
// handle event exception
}
}
}

这里,在 foreach 循环之前,除了 native 行为之外,您还可以根据 sequenceNumber(如果您要发送它)对 eventDataSet 进行排序,仅用于案例。

这是blog .

2.第二种方法非常相似,是关于使用服务总线队列和 session ,如以下代码示例所示:

public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
// process the message here
}

这是blog也是如此。

需要注意的重要一点是,如果任何 Azure 函数实例(在第一种情况下)失败,租约将被续订,并且它可能会处理某些消息两次。如果这是一个问题,那么您应该选择第二种情况。

关于Azure IoT Hub触发器按顺序处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64917114/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com