- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我已经在 .Net 上使用 RabbitMQ 有一段时间了,我对它没有太大的问题。现在我正在使用 node.js 迁移到 rabbit.js,但我对它不是很熟悉。 rabbit.js 的文档有限。我只知道基本的 PUSH/PULL 或 PUB/SUB。现在我想做 REQ/REP,但我不知道该怎么做。请任何人分享一些片段。
非常感谢您的回复。
最好的,
最佳答案
这可能比您要求的要多,但我有一个使用 node-amqp 进行 RPC 的代码片段(尽管它很长)而不是使用 rabbit.js 的 REQ/RES。我所做的与您在 RabbitMQ tutorial 中找到的类似关于RPC
目前消息中的内容应该是一个对象(哈希),它将被 amqp 模块转换为 json。
AmqpRpc 类在初始化时采用 amqp 连接,然后只需调用 makeRequest 并在回调中等待响应即可。响应具有function(err, response) 的形式,其中err 可能是超时错误
很抱歉,这不完全符合您的要求,但可能已经足够接近了。我还将代码作为要点发布在 github 上:https://gist.github.com/2720846
编辑:更改示例以支持多个未完成的请求。
amqprpc.js
var amqp = require('amqp')
, crypto = require('crypto')
var TIMEOUT=2000; //time to wait for response in ms
var CONTENT_TYPE='application/json';
exports = module.exports = AmqpRpc;
function AmqpRpc(connection){
var self = this;
this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection();
this.requests = {}; //hash to store request in wait for response
this.response_queue = false; //plaseholder for the future queue
}
AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
var self = this;
//generate a unique correlation id for this call
var correlationId = crypto.randomBytes(16).toString('hex');
//create a timeout for what should happen if we don't get a response
var tId = setTimeout(function(corr_id){
//if this ever gets called we didn't get a response in a
//timely fashion
callback(new Error("timeout " + corr_id));
//delete the entry from hash
delete self.requests[corr_id];
}, TIMEOUT, correlationId);
//create a request entry to store in a hash
var entry = {
callback:callback,
timeout: tId //the id for the timeout so we can clear it
};
//put the entry in the hash so we can match the response later
self.requests[correlationId]=entry;
//make sure we have a response queue
self.setupResponseQueue(function(){
//put the request on a queue
self.connection.publish(queue_name, content, {
correlationId:correlationId,
contentType:CONTENT_TYPE,
replyTo:self.response_queue});
});
}
AmqpRpc.prototype.setupResponseQueue = function(next){
//don't mess around if we have a queue
if(this.response_queue) return next();
var self = this;
//create the queue
self.connection.queue('', {exclusive:true}, function(q){
//store the name
self.response_queue = q.name;
//subscribe to messages
q.subscribe(function(message, headers, deliveryInfo, m){
//get the correlationId
var correlationId = m.correlationId;
//is it a response to a pending request
if(correlationId in self.requests){
//retreive the request entry
var entry = self.requests[correlationId];
//make sure we don't timeout by clearing it
clearTimeout(entry.timeout);
//delete the entry from hash
delete self.requests[correlationId];
//callback, no err
entry.callback(null, message);
}
});
return next();
});
}
下面是一个关于如何使用它的小例子。保存两个代码部分并运行...
node client.js
如果您没有服务器提供回复,请求将超时。
client.js
//exmaple on how to use amqprpc
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});
var rpc = new (require('./amqprpc'))(connection);
connection.on("ready", function(){
console.log("ready");
var outstanding=0; //counter of outstanding requests
//do a number of requests
for(var i=1; i<=10 ;i+=1){
//we are about to make a request, increase counter
outstanding += 1;
rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){
if(err)
console.error(err);
else
console.log("response", response);
//reduce for each timeout or response
outstanding-=1;
isAllDone();
});
}
function isAllDone() {
//if no more outstanding then close connection
if(outstanding === 0){
connection.end();
}
}
});
我什至会放入一个示例服务器以备不时之需
server.js
//super simple rpc server example
var amqp = require('amqp')
, util = require('util');
var cnn = amqp.createConnection({host:'127.0.0.1'});
cnn.on('ready', function(){
console.log("listening on msg_queue");
cnn.queue('msg_queue', function(q){
q.subscribe(function(message, headers, deliveryInfo, m){
util.log(util.format( deliveryInfo.routingKey, message));
//return index sent
cnn.publish(m.replyTo, {response:"OK", index:message.index}, {
contentType:'application/json',
contentEncoding:'utf-8',
correlationId:m.correlationId
});
});
});
});
关于node.js - 如何在 Rabbit.js 上创建 REP/REQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10524613/
我应该如何继续将 std::chrono::minutes::rep 类型值转换为小时表示。 #include #include using namespace std; using namesp
关闭。这个问题需要更多focused .它目前不接受答案。 想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post . 4年前关闭。 Improve this questi
我有一个带有3个kafka节点和3个zk节点的kakfa集群。 生产者在AWS机器上尝试将数据推送到我的Intranet服务器上运行的kafka集群上。 使用以下命令从控制台创建主题(JOB_AWS_
我有关于序列和 each 的快速问题: vect1 <- c(4, 5, 10, 3, 1) 我想用这个向量复制每个,这样第一个数字被复制 4,第二个 5,第三个 10,第四个 3 和第五个等于 1。
当我在控制台中键入泛型的函数名称时,我希望看到对 UseMethod 的调用。例如,the documentation for determinant 将其称为泛型,当我将其输入控制台时得到以下输出:
我正在尝试理解 SIMD 和向量指令的概念。如果我理解正确的话: 向量指令是对一维数据数组(=向量)进行操作的指令,而不是对单个数据项进行操作的标量指令。 SIMD指令实际上是单指令多数据指令,看起来
引用英特尔® 64 和 IA-32 架构优化引用手册,第 2.4.6 节“REP 字符串增强”: The performance characteristics of using REP string
我正在尝试编写一个应用程序,允许用户启动长时间运行的计算进程,该进程将从使用 ØMQ 的 Web 服务器接收命令。我使用标准的请求-回复架构:服务器有一个连接到工作进程 REP 套接字的 REQ 套接
我希望使用多线程通过 Python 和 ZeroMQ 实现 REQ-REP 模式。 使用 Python,我可以在新客户端连接到服务器时创建一个新线程。该线程将处理与该特定客户端的所有通信,直到套接字关
我想创建一个列表,它是向量的 8 倍 c(2,6) ,即 8 个向量的列表。 错误:object = as.list(rep(c(2,6),8))结果是 16 个单个数字的列表:2 6 2 6 2 6
我需要将一个向量分解成一系列 x 并重复,我不太确定这个术语是什么。它是 rep 的倒数功能。所以一个向量 [1,2,2,2,2,1,1,1,1,1,2,2] -> [1x1, 4x2, 5x1, 2
x=1:20 [1] 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 rep(x,2) [1] 1 2 3 4 5 6 7 8 9 10 11 1
假设我做了一个长 REP INSB在普通优先级线程中从用户模式读取 PCI 设备寄存器。在它执行期间,以下哪些可以发生,哪些不能发生: 中断(其他内核) 中断(同核) PCI 访问(其他内核) PCI
怎么才能使用说明rep stosb执行速度比这段代码快? Clear: mov byte [edi],AL ; Write the value in AL to memory
我需要从端口读取一些 16 位值并将它们保存到缓冲区。我正在使用的教程建议使用 REP INSW 指令,但我不知道如何使用它,甚至不知道它是如何工作的...... 这条指令相当于两条IN指令吗? 最佳
什么是重用/发布等效原则以及为什么它很重要? 最佳答案 重用/发布等效原则 (REP) 说: The unit of reuse is the unit of release. Effective r
给定一个向量,例如 > x [1] 1 1 2 1 1 1 5 1 1 1 5 7 1 1 1 1 1 1 1 1 1 我想复制元素n次——但是——我希望旧元素被复制覆盖。使用基本的 rep 函数给
我需要从端口读取一些 16 位值并将它们保存到缓冲区。我正在使用的教程建议使用 REP INSW 指令,但我不知道如何使用它,甚至不知道它是如何工作的...... 这条指令相当于两条IN指令吗? 最佳
我在 Visual Studio 2008 上测试一些代码并注意到 security_cookie。我能理解它的意思,但我不明白这个指令的目的是什么。 rep ret /* REP to av
我想知道是否有更简单的方法来制作列表,例如 10 '4'、20 '6' 和 30 '3' 然后用函数 'rep 手写 (example <- c(4,4,4,4,...)) '.我知道我可以将某个序列
我是一名优秀的程序员,十分优秀!