- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
(免责声明:我对消息队列和 Azure Functions 都不熟悉)
我有一个 Azure 服务总线队列。它使用了 FIFO 技术,这正是我所需要的。我还成功设置了一项不断将消息推送到队列中的作业。我现在队列中有大约 100K 条消息,它们已按正确的顺序排列。
现在,我需要按顺序使用/处理这些消息。每条 Message 的处理时间大约为 1 秒。但当涉及到 self 处理时,我只知道使用Azure Functions中的“计时器函数”的方法。但同样,当我使用“计时器功能”时,我需要每 5 分钟运行一次,然后每次需要拉取大约 300 条消息。我的代码如下所示(我使用的是 NodeJS):
函数.json
{
"bindings": [
{
"name": "myTimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */5 * * * *"
}
]
}
index.js
..
..
let allMessages = [];
while (allMessages.length < 300) {
const messages = await sbReceiver.receiveMessages(300, {
maxWaitTimeInMs: 60 * 1000,
});
if (!messages.length) {
break;
}
allMessages.push(...messages);
for (let message of messages) {
await axios({
/**
* PROCESS SINGLE MESSAGE HERE
* TAKES ABOUT 1 SEC EACH
*/
})
.then(async function (response) {
await sbReceiver.completeMessage(message);
})
.catch(function (error) {
break;
});
}
}
..
..
这种方法可以很好地完成工作。但我只是不满意必须通过预定作业(计时器函数)进行处理的想法。
除了使用“计时器函数”之外,是否有一种方法(或)如何使 Azure 函数(NodeJS)始终保持自行消费/处理排队消息,同时确保消息是按顺序处理的吗?
这里的要点是:
编辑:我知道“Azure 服务总线队列触发器”,但我不认为这些触发器会等待上一个作业“完成处理”。因为我的处理涉及到另一方的API调用,只需按顺序依次完成即可。我认为队列触发器无论如何都会在新消息到达后立即被触发,这会扰乱另一端的排序。如果我在这个概念上有误,请纠正我。
预先感谢您的好意建议。
最佳答案
发布我的评论作为社区的答案。
队列到达服务总线后,您可以直接使用 Azure 服务总线队列触发器来处理和延迟消息。您可以在 Node.js 代码中使用延迟方法 deferMessage() 以及服务总线队列触发器。一旦新消息到达队列。 Azure服务总线队列触发器将触发该函数,然后您可以添加延迟消息代码块来延迟消息并处理它。
您可以引用这个Github repository中的defer.js代码示例代码
示例服务总线 js 函数:-
index.js:-
module.exports = async function(context, mySbMsg) {
context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
};
const { ServiceBusClient, delay } = require("@azure/service-bus");
// Load the .env file if it exists
require("dotenv").config();
// Define connection string and related Service Bus entity names here
const connectionString = "Endpoint=sb://siliconservicebus76.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xcMOcbMJAPnPM+LTl+zRq4Ix2EjHQFkrs+ASbBXJ2v4=";
const queueName = "test";
async function main() {
await sendMessages();
await receiveMessage();
}
// Shuffle and send messages
async function sendMessages() {
const sbClient = new ServiceBusClient(connectionString);
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const data = [
{ step: 1, title: "Shop" },
{ step: 2, title: "Unpack" },
{ step: 3, title: "Prepare" },
{ step: 4, title: "Cook" },
{ step: 5, title: "Eat" },
];
const promises = new Array();
for (let index = 0; index < data.length; index++) {
const message = {
body: data[index],
subject: "RecipeStep",
contentType: "application/json",
};
// the way we shuffle the message order is to introduce a tiny random delay before each of the messages is sent
promises.push(
delay(Math.random() * 30).then(async () => {
try {
await sender.sendMessages(message);
console.log("Sent message step:", data[index].step);
} catch (err) {
console.log("Error while sending message", err);
}
})
);
}
// wait until all the send tasks are complete
await Promise.all(promises);
await sender.close();
await sbClient.close();
}
async function receiveMessage() {
const sbClient = new ServiceBusClient(connectionString);
// If receiving from a subscription, you can use the createReceiver(topicName, subscriptionName) overload
let receiver = sbClient.createReceiver(queueName);
const deferredSteps = new Map();
let lastProcessedRecipeStep = 0;
try {
const processMessage = async (brokeredMessage) => {
if (
brokeredMessage.subject === "RecipeStep" &&
brokeredMessage.contentType === "application/json"
) {
const message = brokeredMessage.body;
// now let's check whether the step we received is the step we expect at this stage of the workflow
if (message.step === lastProcessedRecipeStep + 1) {
console.log("Process received message:", message);
lastProcessedRecipeStep++;
await receiver.completeMessage(brokeredMessage);
} else {
// if this is not the step we expected, we defer the message, meaning that we leave it in the queue but take it out of
// the delivery order. We put it aside. To retrieve it later, we remeber its sequence number
const sequenceNumber = brokeredMessage.sequenceNumber;
deferredSteps.set(message.step, sequenceNumber);
console.log("Defer received message:", message);
await receiver.deferMessage(brokeredMessage);
}
} else {
// we dead-letter the message if we don't know what to do with it.
console.log(
"Unknown message received, moving it to dead-letter queue ",
brokeredMessage.body
);
await receiver.deadLetterMessage(brokeredMessage);
}
};
const processError = async (args) => {
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
};
receiver.subscribe(
{ processMessage, processError },
{
autoCompleteMessages: false,
}
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
await delay(10000);
await receiver.close();
console.log("Total number of deferred messages:", deferredSteps.size);
receiver = sbClient.createReceiver(queueName);
// Now we process the deferred messages
while (deferredSteps.size > 0) {
const step = lastProcessedRecipeStep + 1;
const sequenceNumber = deferredSteps.get(step);
const [message] = await receiver.receiveDeferredMessages(sequenceNumber);
if (message) {
console.log("Process deferred message:", message.body);
await receiver.completeMessage(message);
} else {
console.log("No message found for step number ", step);
}
deferredSteps.delete(step);
lastProcessedRecipeStep++;
}
await receiver.close();
} finally {
await sbClient.close();
}
}
main().catch((err) => {
console.log("Deferral Sample - Error occurred: ", err);
process.exit(1);
});
module.exports = { main };
输出:-
关于node.js - 如何使Azure Functions按顺序连续消费/处理Azure Service Bus Queue消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76626519/
如何检查一个元素是否立即隐藏。即如何通知元素的可见性。 在我的例子中,该元素是通过 slideUp 函数隐藏的。我应该立即收到有关该元素的可见性的通知。 我想到了使用bind()方法。但它没有类似 o
if (srcbloc == NULL) { fprintf(stderr, "warning!: memrip source is null!\n"); exit(1); } if
当我在数据库的旧 View 中清理一些问题时,我遇到了这个“奇怪”的连接条件: from tblEmails [e] join tblPersonEmails [pe]
如何水平对齐多张图像,一张一张地?它们不必适合宽度屏幕:相反,我希望它们超过后者的宽度,如果这有任何意义的话。 我已经检查了很多类似问题的答案,但找不到任何可以解决我的问题的答案。 HTML:
我知道 Cassandra 中的列有 TTL。但是也可以在一行上设置 TTL 吗?在每列上设置 TTL 并不能解决我的问题,如下面的用例所示: 在某些时候,一个进程想要删除一个带有 TTL 的完整行(
我有一个 NSTextField 和 Label,其值绑定(bind)到 View Controller 中的相同 NSString 这里的问题是标签只有在我按 Tab 时才会更新。 如何使其连续,以
例如。 1."abc"; ===>abc 2."ab c"; ===>ab_c 3."ab c"; ===>ab_c 4."ab c" ===>ab_c 对于多个连续空格也是如此。 我怎样
大家好,我想获取前一天或最后一天的信息,只有当我按下按钮时,它才会显示最后一天(星期六)的所有信息,如果我再次单击按钮,它将显示最后一天的信息(星期五)如果我再次点击它(星期四)谢谢你们帮助我 编辑:
我需要从实时音频流中提取ICY元数据,并正在使用mplayer进行此操作,因为它在播放音频流时会输出元数据。我欢迎其他方式执行此操作,目标是将更新的元数据(歌曲信息)保存到文本文件中,只要歌曲(或数据
语音识别有没有解决方案 只有几个字(2 个就够了,10 个就不错了。100 个就很棒了。不需要更多) 也在移动浏览器上运行(是否可以为此使用 flash(而不是 java)?) 可以安装在您自己的服务
我有一个单词列表, list1 = ['hello', 'how', 'are', 'you?', 'i', 'am', 'fine', 'thanks.', 'great!'] 我想加入, list
我正在开发一个程序,但我不断收到“对‘dosell’的 undefined reference ”,我不太明白发生了什么。这是函数的声明: void dosell(int *cash, int *nu
我无法提出执行我要做的事情所需的查询。 我有三个这样的表: client_files ----------------------- client_id file_id ---------
我一直在寻找一个插件/脚本,当到达底部时,它会从头开始继续滚动网站,就像一个连续的循环。 示例:http://unfold.no/和 http://www.aquiesdonde.com.ar/ 我尝
这个问题在这里已经有了答案: How to prevent scanf causing a buffer overflow in C? (6 个答案) 关闭 6 年前。 我一直在使用一个非常简单的程
给定一个整数数组,找到具有相同数量的 x 和 y 的连续子序列的总数。例如 x=1 和 y=2 的数组 [1,2,1] ans = 2 表示它的两个子数组 [1,2] 和 [2,1]。检查每个连续的子
所以,我有一个所有正自然数的数组。我得到了一个阈值。我必须找出总和小于给定阈值的数字(连续)的最大计数。 For example, IP: arr = {3,1,2,1} Threshold = 5
我制作了像内置相机一样的相机应用。 我想实现像内置相机一样的连续对焦功能。(此功能我不触摸屏幕,但相机会尝试自行对焦。) 因此,将其设置为 surfaceCreated : Camera.Pa
我有这样的数据: f x A 1.1 A 2.2 A 3.3 B 3.5 B 3.7 B 3.9 B 4.1 B 4.5 A 5.1 A 5.2 C 5.4 C 5.5 C 6.1 B 6.2 B
假设我有一个包含一组数据点的表,每个数据点由一个时间戳和一个值组成。如果至少有 N 个连续记录(按时间戳排序)高于给定值 X,我将如何编写返回 true (1) 的查询,否则返回 false (0)?
我是一名优秀的程序员,十分优秀!