gpt4 book ai didi

Azure服务总线队列: re-schedule messages while increasing deliveryCount

转载 作者:行者123 更新时间:2023-12-03 06:15:19 25 4
gpt4 key购买 nike

我使用 Azure 服务总线队列来收集需要发布到 API 的消息。我在队列上使用Azure 触发器函数来处理传入消息。不幸的是,该 API 不可靠,因此我正在努力添加自己的重试计划。

例如,第一次无法处理消息,我想安排在 15 分钟内再次重试,然后再安排 1 小时、2 小时等。达到 maxDeliveryCount 后,消息应发送到 DLQ。

消息的调度正在工作,但是,每个调度的消息都被视为一条新消息,这意味着 deliveryCount 始终为 1。我正在使用 @azure/service -bus NPM包,用于在Azure功能中进行重新调度。

因此,我的问题是:如何在 Azure 触发器函数中重新安排 SB 队列消息,同时增加这些消息的 deliveryCount

(我知道我可以将自己的 DeliveryCount 属性添加到消息中,但我不想用元数据污染消息正文。)

为了提供更多详细信息,这是 MRE:

const serviceBusQueueTrigger: AzureFunction = async function (
context: Context,
msg: any
): Promise<void> {
context.log("ServiceBus queue trigger function processed message", msg);


const serviceBusClient = new ServiceBusClient(<connection_string>);
const sender = serviceBusClient.createSender(<queue_name>);

context.log("dequeueCount: ", context.bindingData.deliveryCount);

try {
// POST to API
} catch (error) {

// if an error is thrown here, trigger resubmits immediately
// I want to wait proportionally to current deliveryCount

await sender.scheduleMessages(
{
body: msgData,
contentType: "application/json",
},
// just an example of schedule time
new Date(Date.now() + Math.pow(2, context.bindingData.deliveryCount) * 60 * 1000)
);
}

await sender.close();
}

===

编辑:虽然 Microsoft 文档仍然包含有关内置重试机制的(模糊)详细信息(@Manish 在他的答案中链接),但该功能似乎不再受支持。相关讨论可参见here .

因此,我最初的问题是,如何使用 Azure 服务总线触发器功能构建此机制?谢谢!

主机.json

{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.0.0, 5.0.0)"
},
"concurrency": {
"dynamicConcurrencyEnabled": true,
"snapshotPersistenceEnabled": true
},
"extensions": {
"serviceBus": {
"clientRetryOptions": {
"mode": "exponential",
"tryTimeout": "00:01:00",
"delay": "00:01:00",
"maxDelay": "03:00:00",
"maxRetries": 3
}
}
}
}

函数.json

{
"bindings": [
{
"name": "incoming",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "incoming",
"connection": "SERVICEBUS"
}
],
"retry": {
"strategy": "exponentialBackoff",
"maxRetryCount": 5,
"minimumInterval": "00:00:10",
"maximumInterval": "00:15:00"
},
"scriptFile": "../dist/trigger/index.js"
}

最佳答案

问题是,每当您在队列中发送消息时,它都会将其视为新消息。
如何修复它?
您不必将新消息推送到队列中,只需删除 try-catch 即可,所有发送代码和功能应用程序将负责其余工作。

const serviceBusQueueTrigger: AzureFunction = async function (
context: Context,
msg: any
): Promise<void> {
context.log("ServiceBus queue trigger function processed message", msg);
context.log("dequeueCount: ", context.bindingData.deliveryCount);
// POST to API

}

请确保您的 function.json 文件如下所示(从下面的链接复制的代码)
{
"bindings": [
{
"queueName": "testqueue",
"connection": "MyServiceBusConnection",
"name": "myQueueItem",
"type": "serviceBusTrigger",
"direction": "in"
}
],
"disabled": false
}

了解更多信息 -> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=python-v2%2Cin-process%2Cextensionv5&pivots=programming-language-javascript


对于指数重试,请确保您具有如下所示的 host.json 设置
{
"version": "2.0",
"extensions": {
"serviceBus": {
"clientRetryOptions":{
"mode": "exponential",
"tryTimeout": "00:01:00",
"delay": "00:00:00.80",
"maxDelay": "00:01:00",
"maxRetries": 3
},
"prefetchCount": 0,
"transportType": "amqpWebSockets",
"webProxy": "https://proxyserver:8080",
"autoCompleteMessages": true,
"maxAutoLockRenewalDuration": "00:05:00",
"maxConcurrentCalls": 16,
"maxConcurrentSessions": 8,
"maxMessageBatchSize": 1000,
"sessionIdleTimeout": "00:01:00",
"enableCrossEntityTransactions": false
}
}
}

enter image description here了解更多信息->https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-javascript


编辑1:阅读github issue后,我们可以尝试使用重试

{
"disabled": false,
"bindings": [
{
....
}
],
"retry": {
"strategy": "exponentialBackoff",
"maxRetryCount": 5,
"minimumInterval": "00:00:10",
"maximumInterval": "00:15:00"
}
}

编辑2:确保您有installed bundle更多信息here

{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.3.0, 4.0.0)"
}
}

关于Azure服务总线队列: re-schedule messages while increasing deliveryCount,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76311885/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com