gpt4 book ai didi

node.js - 如何使Azure Functions按顺序连续消费/处理Azure Service Bus Queue消息?

转载 作者:行者123 更新时间:2023-12-03 06:12:15 26 4
gpt4 key购买 nike

(免责声明:我对消息队列和 Azure Functions 都不熟悉)

我有一个 Azure 服务总线队列。它使用了 FIFO 技术,这正是我所需要的。我还成功设置了一项不断将消息推送到队列中的作业。我现在队列中有大约 100K 条消息,它们已按正确的顺序排列

现在,我需要按顺序使用/处理这些消息。每条 Message 的处理时间大约为 1 秒。但当涉及到 self 处理时,我只知道使用Azure Functions中的“计时器函数”的方法。但同样,当我使用“计时器功能”时,我需要每 5 分钟运行一次,然后每次需要拉取大约 300 条消息。我的代码如下所示(我使用的是 NodeJS):

函数.json

{
"bindings": [
{
"name": "myTimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */5 * * * *"
}
]
}

index.js

..
..
let allMessages = [];
while (allMessages.length < 300) {
const messages = await sbReceiver.receiveMessages(300, {
maxWaitTimeInMs: 60 * 1000,
});

if (!messages.length) {
break;
}

allMessages.push(...messages);

for (let message of messages) {

await axios({
/**
* PROCESS SINGLE MESSAGE HERE
* TAKES ABOUT 1 SEC EACH
*/
})
.then(async function (response) {
await sbReceiver.completeMessage(message);
})
.catch(function (error) {
break;
});

}
}
..
..

这种方法可以很好地完成工作。但我只是不满意必须通过预定作业(计时器函数)进行处理的想法。

问题

除了使用“计时器函数”之外,是否有一种方法(或)如何使 Azure 函数(NodeJS)始终保持自行消费/处理排队消息,同时确保消息是按顺序处理的吗?

这里的要点是:

  1. 继续消费/处理消息,最好不使用计时器
  2. 按顺序使用/处理消息

编辑:我知道“Azure 服务总线队列触发器”,但我不认为这些触发器会等待上一个作业“完成处理”。因为我的处理涉及到另一方的API调用,只需按顺序依次完成即可。我认为队列触发器无论如何都会在新消息到达后立即被触发,这会扰乱另一端的排序。如果我在这个概念上有误,请纠正我。

预先感谢您的好意建议。

最佳答案

发布我的评论作为社区的答案。

队列到达服务总线后,您可以直接使用 Azure 服务总线队列触发器来处理和延迟消息。您可以在 Node.js 代码中使用延迟方法 deferMessage() 以及服务总线队列触发器。一旦新消息到达队列。 Azure服务总线队列触发器将触发该函数,然后您可以添加延迟消息代码块来延迟消息并处理它。

您可以引用这个Github repository中的defer.js代码示例代码

示例服务总线 js 函数:-

index.js:-


module.exports = async function(context, mySbMsg) {
context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
};

const { ServiceBusClient, delay } = require("@azure/service-bus");

// Load the .env file if it exists
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString = "Endpoint=sb://siliconservicebus76.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xcMOcbMJAPnPM+LTl+zRq4Ix2EjHQFkrs+ASbBXJ2v4=";
const queueName = "test";

async function main() {
await sendMessages();
await receiveMessage();
}

// Shuffle and send messages
async function sendMessages() {
const sbClient = new ServiceBusClient(connectionString);
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);

const data = [
{ step: 1, title: "Shop" },
{ step: 2, title: "Unpack" },
{ step: 3, title: "Prepare" },
{ step: 4, title: "Cook" },
{ step: 5, title: "Eat" },
];
const promises = new Array();
for (let index = 0; index < data.length; index++) {
const message = {
body: data[index],
subject: "RecipeStep",
contentType: "application/json",
};
// the way we shuffle the message order is to introduce a tiny random delay before each of the messages is sent
promises.push(
delay(Math.random() * 30).then(async () => {
try {
await sender.sendMessages(message);
console.log("Sent message step:", data[index].step);
} catch (err) {
console.log("Error while sending message", err);
}
})
);
}
// wait until all the send tasks are complete
await Promise.all(promises);
await sender.close();
await sbClient.close();
}

async function receiveMessage() {
const sbClient = new ServiceBusClient(connectionString);

// If receiving from a subscription, you can use the createReceiver(topicName, subscriptionName) overload
let receiver = sbClient.createReceiver(queueName);

const deferredSteps = new Map();
let lastProcessedRecipeStep = 0;
try {
const processMessage = async (brokeredMessage) => {
if (
brokeredMessage.subject === "RecipeStep" &&
brokeredMessage.contentType === "application/json"
) {
const message = brokeredMessage.body;
// now let's check whether the step we received is the step we expect at this stage of the workflow
if (message.step === lastProcessedRecipeStep + 1) {
console.log("Process received message:", message);
lastProcessedRecipeStep++;
await receiver.completeMessage(brokeredMessage);
} else {
// if this is not the step we expected, we defer the message, meaning that we leave it in the queue but take it out of
// the delivery order. We put it aside. To retrieve it later, we remeber its sequence number
const sequenceNumber = brokeredMessage.sequenceNumber;
deferredSteps.set(message.step, sequenceNumber);
console.log("Defer received message:", message);
await receiver.deferMessage(brokeredMessage);
}
} else {
// we dead-letter the message if we don't know what to do with it.
console.log(
"Unknown message received, moving it to dead-letter queue ",
brokeredMessage.body
);
await receiver.deadLetterMessage(brokeredMessage);
}
};
const processError = async (args) => {
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
};

receiver.subscribe(
{ processMessage, processError },
{
autoCompleteMessages: false,
}
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
await delay(10000);
await receiver.close();
console.log("Total number of deferred messages:", deferredSteps.size);

receiver = sbClient.createReceiver(queueName);
// Now we process the deferred messages
while (deferredSteps.size > 0) {
const step = lastProcessedRecipeStep + 1;
const sequenceNumber = deferredSteps.get(step);
const [message] = await receiver.receiveDeferredMessages(sequenceNumber);
if (message) {
console.log("Process deferred message:", message.body);
await receiver.completeMessage(message);
} else {
console.log("No message found for step number ", step);
}
deferredSteps.delete(step);
lastProcessedRecipeStep++;
}
await receiver.close();
} finally {
await sbClient.close();
}
}

main().catch((err) => {
console.log("Deferral Sample - Error occurred: ", err);
process.exit(1);
});

module.exports = { main };

输出:-

enter image description here

关于node.js - 如何使Azure Functions按顺序连续消费/处理Azure Service Bus Queue消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76626519/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com