gpt4 book ai didi

javascript - RabbitMQ。循环发布消息

转载 作者:太空宇宙 更新时间:2023-11-04 01:19:56 25 4
gpt4 key购买 nike

我正在使用 amqplib js 库。我有发布者(它循环发布消息)和很少的工作人员。如果我发布 1000000 条消息,我会看到我的消息如何首先发送到兔子,之后我收到 ack,并且只有在该消息开始在工作人员中使用之后。

据我了解,当我发送消息时,兔子无法向发布者发送ack。我对吗?我该如何解决这个问题?

我有主文件:

let amqp = require('amqplib');

const EXCHANGE = 'simple_exchange',
EXCHANGE_TYPE = 'direct',
QUEUE = 'simple_queue',
ROUTING_KEY = 'simple_routing_key';

const defaultPublishCount = 10000;

let runInit = async() => {
let connection = await amqp.connect('amqp://localhost');
let channel = await connection.createChannel();

let commonOptions = {
durable: false
};

await channel.assertExchange(EXCHANGE, EXCHANGE_TYPE, commonOptions);
await channel.assertQueue(QUEUE, commonOptions);
await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY, commonOptions);

return channel;
};

let runPublisher = async(count, userChannel) => {
const channel = userChannel || await runInit();

let d1 = (new Date).toISOString().slice(11, 23).replace('T', ' ');
const publishCount = count || defaultPublishCount;

let index = 1;
while (index <= publishCount) {
let msg = {
id: index,
time: (new Date).toISOString().slice(11, 23).replace('T', ' ')
};
channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {
noAck: true
});
console.log(`Message sent: ${JSON.stringify(msg)}`);
index++;
}
let d2 = (new Date).toISOString().slice(11, 23).replace('T', ' ');


console.log(`\nPublish started at: ${d1}`);
console.log(`Publish finished at: ${d2}\n\n`);
};

let runWorker = async(userChannel) => {
const channel = userChannel || await runInit();

console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", QUEUE);
channel.consume(QUEUE, (msg) => {
console.log(` [x] Received: '${msg.content}' at ${(new Date).toISOString().slice(11, 23).replace('T', ' ')}`);
channel.ack(msg);
});
};

module.exports = {
runInit,
runPublisher,
runWorker
};

简单发布者:

let { runPublisher } = require('./amqp_core.js');

let count = (process.argv.slice(2, 3)[0]) * 1;
runPublisher(count);

和简单的 worker :

let { runWorker } = require('./amqp_core.js');

runWorker();

工作结果: 10 000 messages 1 000 000 messages我不知道这是否重要,但我说。我用this rabbit cluster ,并在其中我启用了一项策略:

我还认为, channel 中存在一些问题,我又添加了一个测试:

(async() => {
let core = require('./amqp_core.js');

let channel = await core.runInit();

await core.runWorker(channel);

core.runPublisher(25, channel);
})();

但结果是一样的: img2

最佳答案

我不知道我是否正确,但我解决了这个问题)

enter image description here之前我为消费者和发布者使用了一个连接和一个 channel 。但不是,我为每条新发布消息打开新 channel 。

let channel = await connection.createChannel();
await channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {noAck: true});
await channel.close();

正如我在 Rabbit Management 中看到的,即使我打开了 5 个发布商和 1 个工作人员, channel 选项卡中也只有一个 channel 。

完整代码:

let amqp = require('amqplib');

const EXCHANGE = 'simple_exchange',
EXCHANGE_TYPE = 'direct',
QUEUE = 'simple_queue',
ROUTING_KEY = 'simple_routing_key';

const defaultPublishCount = 10000;

let runInit = async() => {
let connection = await amqp.connect('amqp://localhost');
let channel = await connection.createChannel();

let commonOptions = {
durable: false
};

await channel.assertExchange(EXCHANGE, EXCHANGE_TYPE, commonOptions);
await channel.assertQueue(QUEUE, commonOptions);
await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY, commonOptions);
await channel.close();

return connection;
};

let runPublisher = async(count) => {
const connection = await runInit();

let d1 = (new Date).toISOString().slice(11, 23).replace('T', ' ');
const publishCount = count || defaultPublishCount;

let index = 1;
while (index <= publishCount) {
let msg = {
id: index,
time: (new Date).toISOString().slice(11, 23).replace('T', ' ')
};
let channel = await connection.createChannel();
await channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {
noAck: true
});
await channel.close();
console.log(`Message sent: ${JSON.stringify(msg)}`);
index++;
}
let d2 = (new Date).toISOString().slice(11, 23).replace('T', ' ');


console.log(`\nPublish started at: ${d1}`);
console.log(`Publish finished at: ${d2}\n\n`);
};

let runWorker = async() => {
const connection = await runInit();
let channel = await connection.createChannel();

console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", QUEUE);
channel.consume(QUEUE, (msg) => {
console.log(` [x] Received: '${msg.content}' at ${(new Date).toISOString().slice(11, 23).replace('T', ' ')}`);
channel.ack(msg);
});
};

module.exports = {
runInit,
runPublisher,
runWorker
};

现在,我想听听您对此的看法

关于javascript - RabbitMQ。循环发布消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59679780/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com