gpt4 book ai didi

javascript - 如何使用lib amqp.node中的consume方法获取所有消息?

转载 作者:行者123 更新时间:2023-12-01 02:45:02 25 4
gpt4 key购买 nike

大家好。
你能帮助我解决 node.js 中的异步问题吗?

问题:

我正在使用amqplib moduleRabbitMQ 一起工作这里有方法 consume,它从 RabbitMQ 提供消息,但是该方法首先返回有关他启动的 promise ,在这个 promise 启动后,他调用回调来获取数据 来自 RabbitMQ,我不知道如何捕获所有消息何时发送到我的 Node js 应用程序。

更多解释,这里是我的代码,在评论的末尾代码我写了我想要的内容:

/**
* Here my test code
*
* requirng amqp.node lib
*/
let amqp = require('amqplib')
, configConnection = { /* ..config options */ }
, queue = 'users'
, exchange = 'users.exchange'
, type = 'fanout'

/**
* declare annonymous function as async and immediately called it
*/
(async () => {
/**
* declare connection and channel with using async/await construction
* who support version node.js >= 8.5.0
*/
let conn = await amqp.connect(configConnection)
let channel = await conn.createChannel()
await channel.assertExchange(exchange, type)
let response = await channel.assertQueue(queue)
/**
* response: { queue: 'users', messageCount: 10, consumerCount: 0 }
*/
response = await channel.bindQueue(response.queue, exchange, '')
response = await channel.consume(response.queue, logMessage, {noAck: false})
/**
* {noAck: false} false for not expect an acknowledgement
*/
console.log('reading for query finish')

function logMessage(msg) {
console.log("[*] recieved: '%s'", msg.content.toString())
}
})()
/**
* output will show:
* reading for query finish
* [*] recieved: 'message content'
* [*] recieved: 'message content'
* [*] recieved: 'message content'
* ...
*
* But i'm need show message 'reading for query finish' after when
* all consumes will executed
*
* Ask: How i can do this?
*/

最佳答案

我找到了问题的答案 here .

使用中的答案:EventEmitter && Promise

魔法(对我来说)就在这里:
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))

所以结束的代码是:

/**
* Here my test code
*
* requirng amqp.node lib
*/
let amqp = require('amqplib')
, EventEmitter = require('events')
, eventEmitter = new EventEmitter()
, timeout = 10000
, configConnection = { /* ..config options */ }
, queue = 'users'
, exchange = 'users.exchange'
, type = 'fanout'

/**
* declare annonymous function as async and immediately called it
*/
(async () => {
/**
* declare connection and channel with using async/await construction
* who support version node.js >= 8.5.0
*/
let conn = await amqp.connect(configConnection)
let channel = await conn.createChannel()
await channel.assertExchange(exchange, type)
let response = await channel.assertQueue(queue)
/**
* response: { queue: 'users', messageCount: 10, consumerCount: 0 }
*/
let messageCount = response.messageCount
response = await channel.bindQueue(response.queue, exchange, '')
response = await channel.consume(response.queue, logMessage(messageCount), {noAck: false})
/**
* {noAck: false} false for not expect an acknowledgement
*/

/**
* declare timeout if we have problems with emit event in consume
* we waiting when event will be emit once 'consumeDone' and promise gain resolve
* so we can go to the next step
*/
setTimeout(() => eventEmitter.emit('consumeDone'), timeout)
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))
console.log('reading for query finish')

function logMessage(messageCount) {
return msg => {
console.log("[*] recieved: '%s'", msg.content.toString())
if (messageCount == msg.fields.deliveryTag) {
eventEmitter.emit('consumeDone')
}
}

}
})()

关于javascript - 如何使用lib amqp.node中的consume方法获取所有消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47303096/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com