- xml - AJAX/Jquery XML 解析
- 具有多重继承的 XML 模式
- .net - 枚举序列化 Json 与 XML
- XML 简单类型、简单内容、复杂类型、复杂内容
作为大型应用程序的一部分,我必须为跨多个工作人员的传出请求设置一些基本的速率限制。这背后的想法相当简单:通过发布带有“立即”标志的“ token ”消息,如果没有人在等待,则该消息会自动丢弃。通过让工作人员仅在发送传出请求之前订阅 token 队列, token 不会“保存起来”,并且每个 token 只能使用一次。我认为这相当优雅。
不幸的是,添加和删除订阅者并不完全稳定。我在 https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733 设置了一个完整的示例.代码如下:
require 'bundler'
Bundler.setup
require 'amqp'
puts "single-message consumer listening to rapid producer"
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
def start_producer
exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "")
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
def start_consumer
EM::PeriodicTimer.new(CONSUME_RATE) do
started = Time.now
AMQP::Channel.new do |channel_consumer|
channel_consumer.prefetch(1)
tick_queue = channel_consumer.queue(QUEUE_NAME)
consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
consumer.cancel
channel_consumer.close
end
consumer.consume
end
end
end
EM.run do
EM.set_quantum(50)
start_producer
start_consumer
end
运行该示例几分钟最终会出现以下两个错误之一:
amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in
<class:Consumer>': undefined method `handle_delivery' for
nil:NilClass (NoMethodError)
amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in
`send_frame': Trying to send frame through a closed connection.
Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0
@payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02",
@channel=1> (AMQ::Client::ConnectionClosedError)
第一个错误是由于订阅者已被删除,但仍向其发送了一条消息,并且 amq-client
图书馆从没想过会发生这种情况。第二个错误来自发布者,它突然关闭了连接。
要使它始终按预期工作,我缺少什么?
使用的版本:
gem 文件:
source 'http://rubygems.org'
gem 'amqp'
Gemfile.lock:
GEM
remote: http://rubygems.org/
specs:
amq-client (0.8.3)
amq-protocol (>= 0.8.0)
eventmachine
amq-protocol (0.8.1)
amqp (0.8.0)
amq-client (~> 0.8.3)
amq-protocol (~> 0.8.0)
eventmachine
eventmachine (0.12.10)
PLATFORMS
ruby
DEPENDENCIES
amqp
eventmachine
最佳答案
来自#rabbitmq channel (amqp 作者 antares_):只需使用一个 channel ,它就可以正常工作。略有变化,但稳定的版本:
require 'bundler'
Bundler.setup
require 'amqp'
puts "single-message consumer listening to rapid producer"
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
def start_producer channel
exchange = AMQP::Exchange.new(channel, :direct, "")
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
def start_consumer channel
EM::PeriodicTimer.new(CONSUME_RATE) do
started = Time.now
tick_queue = channel.queue(QUEUE_NAME)
consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
consumer.cancel do
puts "< GET #{message} (CANCEL DONE)"
end
end
consumer.consume
end
end
EM.run do
EM.set_quantum(50)
AMQP::Channel.new do |channel|
start_producer channel
end
AMQP::Channel.new do |channel|
channel.prefetch(1)
start_consumer channel
end
end
关于ruby - 如何使这个 AMQP 单消息订阅者稳定?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7658819/
在 Spring AMQP 项目中, 如果 messageProperties 没有 messageId,它们总是创建 messageId。 像这样.. if (this.createMessageI
我不确定我对 errorHandler 和 returnExceptions 的理解是否正确。 但这是我的目标:我从 App_A 发送了一条消息,使用 @RabbitListener 在 App_B
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我们希望将大型机放在总线上。我相信是AS400。为此,我想让 CICS 大型机向代理发送 AMQP 消息。有几十个AMQP客户端,包括JMS客户端。我对大型机上可能发生的事情了解不够,无法判断是否可以
我们希望将大型机放在公共(public)汽车上。我相信它是AS400。为此,我想让 CICS 大型机向代理发送 AMQP 消息。有几十个 AMQP 客户端,包括 JMS 客户端。我不太了解大型机上的可
我即将实现一个基于 PHP 的系统,该系统使用 RabbitMQ。我可以看出那里有 2 个成熟的库:PECL AMQP和 php-amqp . 我将同时为客户端和工作人员使用 PHP。 有人对这两个库
我正在使用 RabbitMQ 和 ruby-amqp与 Rails。当 Controller 收到消息时,我执行以下操作: def create AMQP.start("amqp://localh
我有两个困惑。 1.如果消息监听器抛出RuntimeException,SimpleMessageListenrContainer会停止吗?2.如果SimpleMessageListenerConta
我们在绑定(bind)到 Rabbit MQ 实例的 Java 应用程序的日志中遇到以下异常。 这是必须要注意的事情,表示Spring AMQP的实现中存在问题,还是可以忽略的事情?在后一种情况下,此
场景:微服务从 RabbitMQ 队列中获取消息,将其转换为对象,然后微服务对外部服务进行 REST 调用。 它将处理成千上万条这样的消息,如果我们知道外部 Rest 服务已关闭,有没有办法告诉我的消
我有一个使用带有 webflux 的 Boot 2.0 的应用程序,并且有一个端点返回 ServerSentEvent 的 Flux。这些事件是通过利用 spring-amqp 从 RabbitMQ
我正在尝试使用 streadway/amqp 连接到 RabbitMQ 总线Go 的驱动程序。我正在处理重新连接例程,为此,我有一个 rabbitMQConsume 函数调用 rabbitMQConn
我正在尝试通过 SSL 连接到 RabbitMQ。我遵循了此处链接的 RabbitMQ SSL 文档 https://www.rabbitmq.com/ssl.html根据 RabbitMQ SSL
这些 amqp 客户端库之间有什么区别?哪一个是最推荐的?主要区别是什么? 最佳答案 我会推荐 amqp.node和 bramqp通过 node-amqp。 node-amqp 有很多错误并且维护不善
我得到的错误: 2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatche
所以我已经让 MQTT -> MQTT 和 AMQP -> AMQP 工作;不过,MQTT -> AMQP 的翻译似乎在某处不起作用。这是我的测试,如果我的“监听器”也在使用 paho 的 MQTT
我得到的错误: 2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatche
几天前我问了同样的问题:Unable to "Peek" messages from an Azure Service Bus Queue using AMQP and Node 。我再次问同样的问题
我有一个当前与 RabbitMQ 集成的组件。我想将 RabbitMQ 替换为 Azure Event Hub,因为我们现在位于云中。 AMQP 0.9.1 与 AMQP 1.0 兼容吗?交换会无缝进
我有一个当前与 RabbitMQ 集成的组件。我想将 RabbitMQ 替换为 Azure Event Hub,因为我们现在位于云中。 AMQP 0.9.1 与 AMQP 1.0 兼容吗?交换会无缝进
我是一名优秀的程序员,十分优秀!