- xml - AJAX/Jquery XML 解析
- 具有多重继承的 XML 模式
- .net - 枚举序列化 Json 与 XML
- XML 简单类型、简单内容、复杂类型、复杂内容
鉴于我有一个工作人员订阅了两个队列“低”和“高”,如果高优先级队列为空,我希望该工作人员仅处理来自低优先级队列的消息。
我试图通过定义两个 channel 并将预取设置为更高优先级队列上的更高值来实现此目的,如此处所建议:http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html
这是我的 worker 代码:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
low_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
high_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
def slow_task
# Do some slow work
sleep(1)
end
end
当我针对它运行此客户端时,我没有看到首先处理的高优先级消息:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
low_queue = channel.queue("low")
high_queue = channel.queue("high")
exchange = channel.direct("")
10.times do |i|
message = "LOW #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => low_queue.name
end
# EventMachine.add_periodic_timer(0.0001) do
10.times do |i|
message = "HIGH #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => high_queue.name
end
end
输出:
Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9
Server >>>
HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
最佳答案
正如 Michael 所说,您当前的方法存在一些问题:
为了实现优先级概念,您需要将接收网络数据与确认它分开。在我们的应用程序(我写过关于它的博客文章)中,我们使用后台线程和优先级队列来重新排序传入的工作。这在每个工作人员中引入了一个小的消息缓冲区。其中一些消息可能是低优先级的消息,在没有更高优先级的消息可供处理之前不会处理这些消息。
这里有一个稍微修改过的工作代码,它使用一个工作线程和一个优先级队列来获得所需的结果。
require "rubygems"
require "amqp"
require "pqueue"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
# Our priority queue for buffering messages in the worker's memory
to_process = PQueue.new {|a,b| a[0] > b[0] }
# The pqueue gem isn't thread safe
mutex = Mutex.new
# Background thread for working blocking operation. We can spin up more of
# these to increase concurrency.
Thread.new do
loop do
_, header, payload = mutex.synchronize { to_process.pop }
if payload
puts "#{payload}"
slow_task
# We need to call ack on the EM thread.
EM.next_tick { header.ack }
else
sleep(0.1)
end
end
end
low_queue.subscribe(:ack => true) do |header, payload|
mutex.synchronize { to_process << [0, header, payload] }
end
high_queue.subscribe(:ack => true) do |header, payload|
mutex.synchronize { to_process << [10, header, payload] }
end
def slow_task
# Do some slow work
sleep(1)
end
end
如果需要提高并发性,可以生成多个后台线程。
关于ruby - 在 RabbitMQ 的 ruby AMQP 中,高优先级队列高于低优先级队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16938226/
在 Spring AMQP 项目中, 如果 messageProperties 没有 messageId,它们总是创建 messageId。 像这样.. if (this.createMessageI
我不确定我对 errorHandler 和 returnExceptions 的理解是否正确。 但这是我的目标:我从 App_A 发送了一条消息,使用 @RabbitListener 在 App_B
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我们希望将大型机放在总线上。我相信是AS400。为此,我想让 CICS 大型机向代理发送 AMQP 消息。有几十个AMQP客户端,包括JMS客户端。我对大型机上可能发生的事情了解不够,无法判断是否可以
我们希望将大型机放在公共(public)汽车上。我相信它是AS400。为此,我想让 CICS 大型机向代理发送 AMQP 消息。有几十个 AMQP 客户端,包括 JMS 客户端。我不太了解大型机上的可
我即将实现一个基于 PHP 的系统,该系统使用 RabbitMQ。我可以看出那里有 2 个成熟的库:PECL AMQP和 php-amqp . 我将同时为客户端和工作人员使用 PHP。 有人对这两个库
我正在使用 RabbitMQ 和 ruby-amqp与 Rails。当 Controller 收到消息时,我执行以下操作: def create AMQP.start("amqp://localh
我有两个困惑。 1.如果消息监听器抛出RuntimeException,SimpleMessageListenrContainer会停止吗?2.如果SimpleMessageListenerConta
我们在绑定(bind)到 Rabbit MQ 实例的 Java 应用程序的日志中遇到以下异常。 这是必须要注意的事情,表示Spring AMQP的实现中存在问题,还是可以忽略的事情?在后一种情况下,此
场景:微服务从 RabbitMQ 队列中获取消息,将其转换为对象,然后微服务对外部服务进行 REST 调用。 它将处理成千上万条这样的消息,如果我们知道外部 Rest 服务已关闭,有没有办法告诉我的消
我有一个使用带有 webflux 的 Boot 2.0 的应用程序,并且有一个端点返回 ServerSentEvent 的 Flux。这些事件是通过利用 spring-amqp 从 RabbitMQ
我正在尝试使用 streadway/amqp 连接到 RabbitMQ 总线Go 的驱动程序。我正在处理重新连接例程,为此,我有一个 rabbitMQConsume 函数调用 rabbitMQConn
我正在尝试通过 SSL 连接到 RabbitMQ。我遵循了此处链接的 RabbitMQ SSL 文档 https://www.rabbitmq.com/ssl.html根据 RabbitMQ SSL
这些 amqp 客户端库之间有什么区别?哪一个是最推荐的?主要区别是什么? 最佳答案 我会推荐 amqp.node和 bramqp通过 node-amqp。 node-amqp 有很多错误并且维护不善
我得到的错误: 2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatche
所以我已经让 MQTT -> MQTT 和 AMQP -> AMQP 工作;不过,MQTT -> AMQP 的翻译似乎在某处不起作用。这是我的测试,如果我的“监听器”也在使用 paho 的 MQTT
我得到的错误: 2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatche
几天前我问了同样的问题:Unable to "Peek" messages from an Azure Service Bus Queue using AMQP and Node 。我再次问同样的问题
我有一个当前与 RabbitMQ 集成的组件。我想将 RabbitMQ 替换为 Azure Event Hub,因为我们现在位于云中。 AMQP 0.9.1 与 AMQP 1.0 兼容吗?交换会无缝进
我有一个当前与 RabbitMQ 集成的组件。我想将 RabbitMQ 替换为 Azure Event Hub,因为我们现在位于云中。 AMQP 0.9.1 与 AMQP 1.0 兼容吗?交换会无缝进
我是一名优秀的程序员,十分优秀!