gpt4 book ai didi

ruby-on-rails - 我可以在带有 AMQP 的 Rails 3 中使用请求/回复 - RPC 模式吗?

转载 作者:行者123 更新时间:2023-12-04 05:45:59 25 4
gpt4 key购买 nike

原因与this discussion中类似,我正在试验用消息代替 REST 来实现从一个 Rails 3 应用程序到另一个应用程序的同步 RPC 调用。这两个应用程序都在精简版上运行。

“服务器”应用程序有一个 config/initializers/amqp.rb基于 rubyamqp.info documentation 中的请求/回复模式的文件:

require "amqp"

EventMachine.next_tick do
connection = AMQP.connect ENV['CLOUDAMQP_URL'] || 'amqp://guest:guest@localhost'
channel = AMQP::Channel.new(connection)

requests_queue = channel.queue("amqpgem.examples.services.time", :exclusive => true, :auto_delete => true)
requests_queue.subscribe(:ack => true) do |metadata, payload|
puts "[requests] Got a request #{metadata.message_id}. Sending a reply..."
channel.default_exchange.publish(Time.now.to_s,
:routing_key => metadata.reply_to,
:correlation_id => metadata.message_id,
:mandatory => true)
metadata.ack
end

Signal.trap("INT") { connection.close { EventMachine.stop } }
end

在“客户端”应用程序中,我想在 View 中呈现对“服务器”的同步调用的结果。我意识到这有点超出像 amqp gem 这样的固有异步库的舒适区,但我想知道是否有办法让它工作。这是我的客户 config/initializers/amqp.rb :
require 'amqp'

EventMachine.next_tick do
AMQP.connection = AMQP.connect 'amqp://guest:guest@localhost'
Signal.trap("INT") { AMQP.connection.close { EventMachine.stop } }
end

这是 Controller :
require "amqp"

class WelcomeController < ApplicationController
def index
puts "[request] Sending a request..."

WelcomeController.channel.default_exchange.publish("get.time",
:routing_key => "amqpgem.examples.services.time",
:message_id => Kernel.rand(10101010).to_s,
:reply_to => WelcomeController.replies_queue.name)

WelcomeController.replies_queue.subscribe do |metadata, payload|
puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
@message = payload.inspect
end
end

def self.channel
@channel ||= AMQP::Channel.new(AMQP.connection)
end

def self.replies_queue
@replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
end
end

当我在不同的端口上启动两个应用程序并访问 welcome#index 时看法。 @message在 View 中为零,因为结果尚未返回。结果在 View 渲染后几毫秒到达并显示在控制台上:
$ thin start
>> Using rack adapter
>> Thin web server (v1.5.0 codename Knife)
>> Maximum connections set to 1024
>> Listening on 0.0.0.0:3000, CTRL+C to stop
[request] Sending a request...
[response] Response for 3877031: "2012-11-27 22:04:28 -0600"

毫不奇怪: subscribe显然不适用于同步调用。令人惊讶的是,我在 AMQP gem 源代码或任何在线文档中都找不到同步替代方案。是否有替代 subscribe 的替代方案?这会给我我想要的 RPC 行为吗?鉴于系统的其他部分我想使用合法的异步调用, bunny gem似乎不是这项工作的正确工具。我应该再看看吗?

编辑回应 Sam Stokes

感谢 Sam 提供抛出 :async/async.callback 的指针。我以前从未见过这种技术,而这正是我最初试图通过这个实验学习的东西。 send_response.finish在 Rails 3 中消失了,但我能够让他的示例至少为一个请求工作,但稍作改动:
render :text => @message
rendered_response = response.prepare!

后续请求失败 !! Unexpected error while processing request: deadlock; recursive locking .这可能是 Sam 在关于让 ActionController 允许并发请求的评论中得到的内容,但引用的要点仅适用于 Rails 2。添加 config.allow_concurrency = true在 development.rb 中消除了 Rails 3 中的这个错误,但导致 This queue already has default consumer.来自 AMQP。

我觉得这头牦牛剃得够多了。 ;-)

虽然有趣,但对于简单的 RPC 来说,这显然是矫枉过正。像这样的东西 Sinatra streaming example似乎是客户端与回复交互的更合适的用例。柔情也有 a blog post关于即将在 Rails 4 中流式传输事件的方法,该方法可以与 AMQP 一起使用。

正如 Sam 在他对 HTTP 替代方案的讨论中指出的那样,REST/HTTP 对于我的系统中涉及两个 Rails 应用程序的 RPC 部分非常有意义。系统的其他部分涉及更经典的异步事件发布到 Clojure 应用程序。对于这些,Rails 应用程序只需要以即发即弃的方式发布事件,因此 AMQP 可以在没有回复队列的情况下使用我的原始代码在那里正常工作。

最佳答案

您可以获得所需的行为 - 让客户端发出一个简单的 HTTP 请求,您的 Web 应用程序会对其进行异步响应 - 但您需要更多技巧。您需要使用 Thin 对异步响应的支持:

require "amqp"

class WelcomeController < ApplicationController
def index
puts "[request] Sending a request..."

WelcomeController.channel.default_exchange.publish("get.time",
:routing_key => "amqpgem.examples.services.time",
:message_id => Kernel.rand(10101010).to_s,
:reply_to => WelcomeController.replies_queue.name)

WelcomeController.replies_queue.subscribe do |metadata, payload|
puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
@message = payload.inspect

# Trigger Rails response rendering now we have the message.
# Tested in Rails 2.3; may or may not work in Rails 3.x.
rendered_response = send_response.finish

# Pass the response to Thin and make it complete the request.
# env['async.callback'] expects a Rack-style response triple:
# [status, headers, body]
request.env['async.callback'].call(rendered_response)
end

# This unwinds the call stack, skipping the normal Rails response
# rendering, all the way back up to Thin, which catches it and
# interprets as "I'll give you the response later by calling
# env['async.callback']".
throw :async
end

def self.channel
@channel ||= AMQP::Channel.new(AMQP.connection)
end

def self.replies_queue
@replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
end
end

就客户端而言,结果与您的 Web 应用程序在返回响应之前在同步调用上的阻塞无法区分;但是现在您的 Web 应用程序可以同时处理许多此类请求。

警告!

Async Rails 是一种高级技术;你需要知道你在做什么。 Rails 的某些部分不喜欢突然拆除它们的调用堆栈。 throw将绕过任何不知道捕获并重新抛出它的 Rack 中间件( here is a rather old partial solution )。 ActiveSupport 的开发模式类重新加载将在 throw 之后重新加载您的应用程序的类。 ,而无需等待响应,如果您的回调引用了一个已经重新加载的类,这可能会导致非常困惑的破坏。您还需要 ask ActionController nicely允许并发请求。

请求/响应

您还需要匹配请求和响应。就目前而言,如果请求 1 到达,然后请求 2 在请求 1 获得响应之前到达,则未定义哪个请求将接收响应 1(队列上的消息在订阅队列的消费者之间循环分发)。

您可以通过检查correlation_id(您必须显式设置,顺便说一句-RabbitMQ 不会为您执行此操作!)并重新排队消息(如果它不是您正在等待的响应)来完成此操作。我的方法是创建一个持久的 Publisher 对象,该对象将跟踪打开的请求,监听所有响应,并查找适当的回调以根据相关性 ID 进行调用。

替代方案:只使用 HTTP

您真的在这里解决了两个不同(而且很棘手!)的问题:说服 Rails/thin 异步处理请求,以及在 AMQP 的发布-订阅模型之上实现请求-响应语义。鉴于您说这是用于两个 Rails 应用程序之间的调用,为什么不直接使用 HTTP,它已经具有您需要的请求-响应语义?这样你只需要解决第一个问题。如果您使用非阻塞 HTTP 客户端库,例如 em-http-request,您仍然可以获得并发请求处理。 .

关于ruby-on-rails - 我可以在带有 AMQP 的 Rails 3 中使用请求/回复 - RPC 模式吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13597808/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com