gpt4 book ai didi

ruby - 如何使这个 AMQP 单消息订阅者稳定?

转载 作者:数据小太阳 更新时间:2023-10-29 08:05:49 24 4
gpt4 key购买 nike

作为大型应用程序的一部分,我必须为跨多个工作人员的传出请求设置一些基本的速率限制。这背后的想法相当简单:通过发布带有“立即”标志的“ token ”消息,如果没有人在等待,则该消息会自动丢弃。通过让工作人员仅在发送传出请求之前订阅 token 队列, token 不会“保存起来”,并且每个 token 只能使用一次。我认为这相当优雅。

不幸的是,添加和删除订阅者并不完全稳定。我在 https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733 设置了一个完整的示例.代码如下:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer
exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "")

n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end

def start_consumer

EM::PeriodicTimer.new(CONSUME_RATE) do

started = Time.now
AMQP::Channel.new do |channel_consumer|
channel_consumer.prefetch(1)
tick_queue = channel_consumer.queue(QUEUE_NAME)

consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|

took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

consumer.cancel
channel_consumer.close
end
consumer.consume
end
end
end

EM.run do
EM.set_quantum(50)

start_producer
start_consumer
end

运行该示例几分钟最终会出现以下两个错误之一:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in
    <class:Consumer>': undefined method `handle_delivery' for
    nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in
    `send_frame': Trying to send frame through a closed connection.
    Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0
    @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02",
    @channel=1> (AMQ::Client::ConnectionClosedError)

第一个错误是由于订阅者已被删除,但仍向其发送了一条消息,并且 amq-client图书馆从没想过会发生这种情况。第二个错误来自发布者,它突然关闭了连接。

要使它始终按预期工作,我缺少什么?

使用的版本:

  • OS X 10.7.1
  • ruby 1.9.2p312(2011-08-11 修订版 32926)[x86_64-darwin11.1.0]
  • RabbitMQ 2.6.1

gem 文件:

source 'http://rubygems.org'

gem 'amqp'

Gemfile.lock:

GEM
remote: http://rubygems.org/
specs:
amq-client (0.8.3)
amq-protocol (>= 0.8.0)
eventmachine
amq-protocol (0.8.1)
amqp (0.8.0)
amq-client (~> 0.8.3)
amq-protocol (~> 0.8.0)
eventmachine
eventmachine (0.12.10)

PLATFORMS
ruby

DEPENDENCIES
amqp
eventmachine

最佳答案

来自#rabbitmq channel (amqp 作者 antares_):只需使用一个 channel ,它就可以正常工作。略有变化,但稳定的版本:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer channel
exchange = AMQP::Exchange.new(channel, :direct, "")

n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end

def start_consumer channel
EM::PeriodicTimer.new(CONSUME_RATE) do

started = Time.now
tick_queue = channel.queue(QUEUE_NAME)

consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|

took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

consumer.cancel do
puts "< GET #{message} (CANCEL DONE)"
end
end
consumer.consume
end
end

EM.run do
EM.set_quantum(50)

AMQP::Channel.new do |channel|
start_producer channel
end

AMQP::Channel.new do |channel|
channel.prefetch(1)
start_consumer channel
end

end

关于ruby - 如何使这个 AMQP 单消息订阅者稳定?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7658819/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com