gpt4 book ai didi

javascript - 在nodejs上延迟消息rabbitmq

转载 作者:行者123 更新时间:2023-12-03 02:10:12 25 4
gpt4 key购买 nike

我正在尝试为rabbitmq 添加一些带有延迟消息的功能。实际上我需要两周后收到这条消息。据我所知,我们不需要任何插件。此外,当此消息调用时,我应该如何重新安排新的 x 延迟交换器在 2 周内再次调用。我应该在哪里添加这个 x 延迟消息。

配置

"messageQueue": {
"connectionString": "amqp://guest:guest@localhost:5672?heartbeat=5",
"queueName": "history",
"exchange": {
"type": "headers",
"prefix": "history."
},
"reconnectTimeout": 5000
},

服务:

import amqplib from 'amqplib'
import config from 'config'

import logger from './logger'

const {reconnectTimeout, connectionString, exchange: {prefix, type: exchangeType}, queueName} = config.messageQueue

const onConsume = (expectedMessages, channel, onMessage) => async message => {
const {fields: {exchange}, properties: {correlationId, replyTo}, content} = message

logger.silly(`consumed message from ${exchange}`)

const messageTypeName = exchange.substring(exchange.startsWith(prefix) ? prefix.length : 0)

const messageType = expectedMessages[messageTypeName]

if (!messageType) {
logger.warn(`Unexpected message of type ${messageTypeName} received. The service only accepts messages of types `, Object.keys(expectedMessages))

return
}

const deserializedMessage = messageType.decode(content)

const object = deserializedMessage.toJSON()

const result = await onMessage(messageTypeName, object)

if (correlationId && replyTo) {
const {type, response} = result

const encoded = type.encode(response).finish()

channel.publish('', replyTo, encoded, {correlationId})
}
}

const startService = async (expectedMessages, onMessage) => {

const restoreOnFailure = e => {
logger.warn('connection with message bus lost due to error', e)
logger.info(`reconnecting in ${reconnectTimeout} milliseconds`)

setTimeout(() => startService(expectedMessages, onMessage), reconnectTimeout)
}

const exchanges = Object.keys(expectedMessages).map(m => `${prefix}${m}`)

try {
const connection = await amqplib.connect(connectionString)

connection.on('error', restoreOnFailure)

const channel = await connection.createChannel()

const handleConsume = onConsume(expectedMessages, channel, onMessage)

const queue = await channel.assertQueue(queueName)

exchanges.forEach(exchange => {
channel.assertExchange(exchange, exchangeType, {durable: true})

channel.bindQueue(queue.queue, exchange, '')
})

logger.debug(`start listening messages from ${exchanges.join(', ')}`)

channel.consume(queue.queue, handleConsume, {noAck: true})
}
catch (e) {
logger.warn('error while subscribing for messages message', e)

restoreOnFailure(e)
}
}

export default startService

最佳答案

RabbitMQ 有一个 plug-in for scheduling messages 。您可以使用它,但要遵守我在下面解释的重要设计注意事项。

使用步骤

您必须首先安装它:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后,您必须设置延迟交换:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

最后,您可以设置x-delay参数(其中延迟以毫秒为单位)。

byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

两周等于 (7*24*60*60*1000 = 604,800,000) 毫秒。

重要警告正如我在this answer中解释的那样,要求消息代理做这件事确实很糟糕。

重要的是要记住,在处理消息队列时,它们在系统中执行非常特定的功能:在处理器忙于处理较早的消息时保存消息。 预期功能正常的消息队列将在合理的时间内传递消息。基本上,基本的期望是一旦消息到达队列的头部,队列上的下一个拉取将产生消息 - 无延迟

延迟是具有队列的系统处理消息方式的结果。事实上,Little's Law对此提供了一些有趣的见解。如果您要在那里设置任意延迟,那么您实际上不需要从一开始就使用消息队列 - 您的所有工作都是预先安排的。

因此,在需要延迟的系统中(例如,加入/等待并行操作完成),您应该考虑其他方法。通常,可查询数据库在这种特定情况下是有意义的。如果您发现自己将消息保留在队列中一段预设的时间,那么您实际上是在将消息队列用作数据库 - 这是它未设计提供的功能。这不仅存在风险,而且很可能会损害消息代理的性能。

关于javascript - 在nodejs上延迟消息rabbitmq,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49611402/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com