作者热门文章
- xml - AJAX/Jquery XML 解析
- 具有多重继承的 XML 模式
- .net - 枚举序列化 Json 与 XML
- XML 简单类型、简单内容、复杂类型、复杂内容
如果在线程中,我无法让队列订阅 block 执行。
例子来自 rubybunny/exchanges正如预期的那样有效。但是,如果与线程中的消费者部分相适应,则订阅者 block 似乎不会执行。
我尝试了几种简单的变体,包括设置共享变量标志,但都没有成功。
我错过了什么?
代码#!/usr/bin/env ruby
require "bunny"
quit = false
consumer = Thread.new do
puts "consumer start"
cnx = Bunny.new
cnx.start
cn = cnx.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
loop {
sleep 1
break if quit
}
cnx.close
puts "consumer done"
end
connection = Bunny.new
connection.start
connection = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :routing_key => "europe.italy.roma").
publish("Paris update", :routing_key => "europe.france.paris")
sleep 5
connection.close
quit = true
consumer.join
实际产量
consumer start
consumer done
预期产出
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done
最佳答案
线程的订阅 block 没有执行,因为队列根本没有收到任何消息。详细来说,在这种情况下,队列最终是在消息发布后创建的。
这可以通过将消息切换为 :mandatory => true
并使用 Bunny::Exchange#on_return
来可视化:
#!/usr/bin/env ruby
require "bunny"
quit = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
正如我们所见,所有消息最终都以 NO_ROUTE
返回。
在消息发布之前强制队列(和路由)存在的简单解决方案:
#!/usr/bin/env ruby
require "bunny"
quit = false
consumer_queued = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
consumer_queued = true
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
$stdout.flush
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
# ensure queue is ready
sleep 0.125 while !consumer_queued
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
$stdout.flush
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出(带返回通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
接收到预期的消息并返回其余消息。
关于ruby - RabbitMQ/兔子 : subscribe block not called if within a thread,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48674793/
我是一名优秀的程序员,十分优秀!