gpt4 book ai didi

azure - 更新 Azure 表存储记录时如何避免竞争条件

转载 作者:行者123 更新时间:2023-12-02 23:43:32 25 4
gpt4 key购买 nike

利用 Azure 表存储的 Azure Function

我有一个从 Azure 服务总线主题订阅触发的 Azure 函数,我们将其称为“处理文件信息”函数。

订阅上的消息包含要处理的文件信息。与此类似的东西:

{
"uniqueFileId": "adjsdakajksajkskjdasd",
"fileName":"mydocument.docx",
"sourceSystemRef":"System1",
"sizeBytes": 1024,
... and other data
}

该函数执行以下两个操作 -

  1. 检查单个文件存储表中是否存在该文件。如果存在,请更新该文件。如果是新文件,请将文件添加到存储表中(按每个系统|每个文件Id存储)。

  2. 捕获文件大小字节的指标并存储在第二个存储表中,称为指标(不断增加字节,存储在每个系统|每年/月 基础)。

下图简要总结了我的方法:

enter image description here

individualFileInfo表和fileMetric之间的区别在于,individualFileInfo表每个文件有一条记录,而metric表每月存储一条记录,并且不断变化。更新(增量)收集通过函数传递的总字节数。

fileMetrics表中的数据存储如下:

enter image description here

问题...

Azure 函数在扩展方面非常出色,在我的设置中,我每次最多运行 6 个这样的函数。假设处理的每条文件消息都是唯一的 - 更新(或插入)individualFileInfo 表中的记录可以正常工作,因为不存在竞争条件。

但是,更新 fileMetric 表被证明是有问题的,因为所有 6 个函数都会同时触发,它们都打算一次性更新指标表(不断增加新文件计数器或增加现有文件计数器)。

我尝试使用 etag 进行乐观更新,并在存储更新返回 412 响应时进行一点递归重试(下面的代码示例)。但我似乎无法避免这种竞争条件。有没有人对如何解决此限制或之前遇到类似的问题有任何建议?

在存储 fileMetric 更新的函数中执行的示例代码:

internal static async Task UpdateMetricEntry(IAzureTableStorageService auditTableService, 
string sourceSystemReference, long addNewBytes, long addIncrementBytes, int retryDepth = 0)
{
const int maxRetryDepth = 3; // only recurively attempt max 3 times
var todayYearMonth = DateTime.Now.ToString("yyyyMM");
try
{
// Attempt to get existing record from table storage.
var result = await auditTableService.GetRecord<VolumeMetric>("VolumeMetrics", sourceSystemReference, todayYearMonth);

// If the volume metrics table existing in storage - add or edit the records as required.
if (result.TableExists)
{
VolumeMetric volumeMetric = result.RecordExists ?
// Existing metric record.
(VolumeMetric)result.Record.Clone()
:
// Brand new metrics record.
new VolumeMetric
{
PartitionKey = sourceSystemReference,
RowKey = todayYearMonth,
SourceSystemReference = sourceSystemReference,
BillingMonth = DateTime.Now.Month,
BillingYear = DateTime.Now.Year,
ETag = "*"
};

volumeMetric.NewVolumeBytes += addNewBytes;
volumeMetric.IncrementalVolumeBytes += addIncrementBytes;

await auditTableService.InsertOrReplace("VolumeMetrics", volumeMetric);
}
}
catch (StorageException ex)
{
if (ex.RequestInformation.HttpStatusCode == 412)
{
// Retry to update the volume metrics.
if (retryDepth < maxRetryDepth)
await UpdateMetricEntry(auditTableService, sourceSystemReference, addNewBytes, addIncrementBytes, retryDepth++);
}
else
throw;
}
}

Etag 会跟踪冲突,如果此代码收到 412 Http 响应,它将重试,最多 3 次(尝试缓解该问题)。我的问题是,我无法保证该函数的所有实例的表存储更新。

感谢您提前提供任何提示!

最佳答案

您可以将工作的第二部分放入第二个队列和函数中,甚至可以在文件更新上放置触发器。

由于其他操作听起来可能会花费大部分时间,因此它也可以消除第二步中的一些热量。

然后,您可以通过仅关注该函数来解决任何剩余的竞争条件。您可以使用 session 来有效地限制并发数。在您的情况下,系统 ID 可能是一个可能的 session key 。如果您使用它,则一次只有一个 Azure Function 处理来自一个系统的数据,从而有效地解决竞争条件。

https://dev.to/azure/ordered-queue-processing-in-azure-functions-4h6c

编辑:如果无法使用 session 逻辑锁定资源,则可以通过 blob 存储使用锁:

https://www.azurefromthetrenches.com/acquiring-locks-on-table-storage/

关于azure - 更新 Azure 表存储记录时如何避免竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56508736/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com