gpt4 book ai didi

ruby - 在 RabbitMQ 的 ruby​​ AMQP 中,高优先级队列高于低优先级队列?

转载 作者:数据小太阳 更新时间:2023-10-29 08:49:29 24 4
gpt4 key购买 nike

鉴于我有一个工作人员订阅了两个队列“低”和“高”,如果高优先级队列为空,我希望该工作人员仅处理来自低优先级队列的消息。

我试图通过定义两个 channel 并将预取设置为更高优先级队列上的更高值来实现此目的,如此处所建议:http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html

这是我的 worker 代码:

require "rubygems"
require "amqp"

EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)

# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)

low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)

low_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end

high_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end

def slow_task
# Do some slow work
sleep(1)
end
end

当我针对它运行此客户端时,我没有看到首先处理的高优先级消息:

require "rubygems"
require "amqp"

EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)

low_queue = channel.queue("low")
high_queue = channel.queue("high")
exchange = channel.direct("")

10.times do |i|
message = "LOW #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => low_queue.name
end

# EventMachine.add_periodic_timer(0.0001) do
10.times do |i|
message = "HIGH #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => high_queue.name
end

end

输出:

Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9

Server >>>

HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9

最佳答案

正如 Michael 所说,您当前的方法存在一些问题:

  • 不启用显式 acks 意味着 RabbitMQ 在发送消息时考虑消息已送达,而不是在您处理消息时才考虑送达
  • 您的消息非常小,可以通过网络快速传送
  • 当 EventMachine 读取网络数据时调用您的订阅 block ,从套接字读取每个完整的数据帧一次
  • 最后,阻塞 react 器线程(使用 sleep )将阻止 EM 将 ack 发送到套接字,因此无法实现正确的行为。

为了实现优先级概念,您需要将接收网络数据与确认它分开。在我们的应用程序(我写过关于它的博客文章)中,我们使用后台线程和优先级队列来重新排序传入的工作。这在每个工作人员中引入了一个小的消息缓冲区。其中一些消息可能是低优先级的消息,在没有更高优先级的消息可供处理之前不会处理这些消息。

这里有一个稍微修改过的工作代码,它使用一个工作线程和一个优先级队列来获得所需的结果。

require "rubygems"
require "amqp"
require "pqueue"

EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)

# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)

low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)

# Our priority queue for buffering messages in the worker's memory
to_process = PQueue.new {|a,b| a[0] > b[0] }

# The pqueue gem isn't thread safe
mutex = Mutex.new

# Background thread for working blocking operation. We can spin up more of
# these to increase concurrency.
Thread.new do
loop do
_, header, payload = mutex.synchronize { to_process.pop }

if payload
puts "#{payload}"
slow_task
# We need to call ack on the EM thread.
EM.next_tick { header.ack }
else
sleep(0.1)
end
end
end

low_queue.subscribe(:ack => true) do |header, payload|
mutex.synchronize { to_process << [0, header, payload] }
end

high_queue.subscribe(:ack => true) do |header, payload|
mutex.synchronize { to_process << [10, header, payload] }
end

def slow_task
# Do some slow work
sleep(1)
end
end

如果需要提高并发性,可以生成多个后台线程。

关于ruby - 在 RabbitMQ 的 ruby​​ AMQP 中,高优先级队列高于低优先级队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16938226/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com