gpt4 book ai didi

node.js - 如何在 Rabbit.js 上创建 REP/REQ

转载 作者:搜寻专家 更新时间:2023-11-01 00:10:02 25 4
gpt4 key购买 nike

我已经在 .Net 上使用 RabbitMQ 有一段时间了,我对它没有太大的问题。现在我正在使用 node.js 迁移到 rabbit.js,但我对它不是很熟悉。 rabbit.js 的文档有限。我只知道基本的 PUSH/PULL 或 PUB/SUB。现在我想做 REQ/REP,但我不知道该怎么做。请任何人分享一些片段。

非常感谢您的回复。

最好的,

最佳答案

这可能比您要求的要多,但我有一个使用 node-amqp 进行 RPC 的代码片段(尽管它很长)而不是使用 rabbit.js 的 REQ/RES。我所做的与您在 RabbitMQ tutorial 中找到的类似关于RPC

目前消息中的内容应该是一个对象(哈希),它将被 amqp 模块转换为 json。

AmqpRpc 类在初始化时采用 amqp 连接,然后只需调用 makeRequest 并在回调中等待响应即可。响应具有function(err, response) 的形式,其中err 可能是超时错误

很抱歉,这不完全符合您的要求,但可能已经足够接近了。我还将代码作为要点发布在 github 上:https://gist.github.com/2720846

编辑:更改示例以支持多个未完成的请求。

amqprpc.js

var amqp = require('amqp')
, crypto = require('crypto')

var TIMEOUT=2000; //time to wait for response in ms
var CONTENT_TYPE='application/json';

exports = module.exports = AmqpRpc;

function AmqpRpc(connection){
var self = this;
this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection();
this.requests = {}; //hash to store request in wait for response
this.response_queue = false; //plaseholder for the future queue
}

AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
var self = this;
//generate a unique correlation id for this call
var correlationId = crypto.randomBytes(16).toString('hex');

//create a timeout for what should happen if we don't get a response
var tId = setTimeout(function(corr_id){
//if this ever gets called we didn't get a response in a
//timely fashion
callback(new Error("timeout " + corr_id));
//delete the entry from hash
delete self.requests[corr_id];
}, TIMEOUT, correlationId);

//create a request entry to store in a hash
var entry = {
callback:callback,
timeout: tId //the id for the timeout so we can clear it
};

//put the entry in the hash so we can match the response later
self.requests[correlationId]=entry;

//make sure we have a response queue
self.setupResponseQueue(function(){
//put the request on a queue
self.connection.publish(queue_name, content, {
correlationId:correlationId,
contentType:CONTENT_TYPE,
replyTo:self.response_queue});
});
}


AmqpRpc.prototype.setupResponseQueue = function(next){
//don't mess around if we have a queue
if(this.response_queue) return next();

var self = this;
//create the queue
self.connection.queue('', {exclusive:true}, function(q){
//store the name
self.response_queue = q.name;
//subscribe to messages
q.subscribe(function(message, headers, deliveryInfo, m){
//get the correlationId
var correlationId = m.correlationId;
//is it a response to a pending request
if(correlationId in self.requests){
//retreive the request entry
var entry = self.requests[correlationId];
//make sure we don't timeout by clearing it
clearTimeout(entry.timeout);
//delete the entry from hash
delete self.requests[correlationId];
//callback, no err
entry.callback(null, message);
}
});
return next();
});
}

下面是一个关于如何使用它的小例子。保存两个代码部分并运行...

node client.js

如果您没有服务器提供回复,请求将超时。

client.js

//exmaple on how to use amqprpc
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});

var rpc = new (require('./amqprpc'))(connection);

connection.on("ready", function(){
console.log("ready");
var outstanding=0; //counter of outstanding requests

//do a number of requests
for(var i=1; i<=10 ;i+=1){
//we are about to make a request, increase counter
outstanding += 1;
rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){
if(err)
console.error(err);
else
console.log("response", response);
//reduce for each timeout or response
outstanding-=1;
isAllDone();
});
}

function isAllDone() {
//if no more outstanding then close connection
if(outstanding === 0){
connection.end();
}
}

});

我什至会放入一个示例服务器以备不时之需

server.js

//super simple rpc server example
var amqp = require('amqp')
, util = require('util');

var cnn = amqp.createConnection({host:'127.0.0.1'});

cnn.on('ready', function(){
console.log("listening on msg_queue");
cnn.queue('msg_queue', function(q){
q.subscribe(function(message, headers, deliveryInfo, m){
util.log(util.format( deliveryInfo.routingKey, message));
//return index sent
cnn.publish(m.replyTo, {response:"OK", index:message.index}, {
contentType:'application/json',
contentEncoding:'utf-8',
correlationId:m.correlationId
});
});
});
});

关于node.js - 如何在 Rabbit.js 上创建 REP/REQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10524613/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com