gpt4 book ai didi

ruby-on-rails - 是否有 Ruby 消息总线 gem?

转载 作者:数据小太阳 更新时间:2023-10-29 08:05:26 26 4
gpt4 key购买 nike

我目前正在构建一个架构,其中包含一个 Rails 进程和多个工作进程,这些进程需要被告知某些事件(例如对象的创建)。

Rails
| API Worker
+----------o------o--------o------ - - -
Some other
daemon

我想做以下事情

class Article
after_creation do
MessageBus.send type: "article-created", id: self.id
end
end

虽然进程(API、Worker、Daemons,...)只是订阅消息总线,并且当消息进来时调用一个 block 。

MessageBus.subscribe do |msg|
if msg['type'] == 'article-created'
# if this is my websocket daemon, I would push the article to the browser
# if this is my indexing daemon, I would reindex the full-text search
# if this is ... you get the deal.
end
end

目前我正在使用本地 unix 域套接字,我在其中使用 UNIXSocket 推送 JSON,并使用 EventMachine.start_unix_domain_server 获取它。但这只允许双向通信。我也考虑过使用 resque,但这更像是一个消息队列,而我需要一个总线。而且依赖于redis。我很确定一定有一个 gem,它在 ruby​​ 中实现了一些消息总线,但是谷歌搜索没有得到任何结果

最佳答案

最后,我使用 Eventmachine Channels 快速破解了自己的解决方案。

这是我的服务器。基本上,客户端连接到 /tmp/messagebus.sock 并发送数据。插入套接字的所有内容都会发送到所有其他客户端。

require 'rubygems'
require 'eventmachine'

module Messagebus
class Server
attr_accessor :connections
attr_accessor :channel

def initialize
@connections = []
@channel = EventMachine::Channel.new
end

def start
@signature = EventMachine.start_unix_domain_server '/tmp/messagebus.sock', Connection do |conn|
conn.server = self
end
end

def stop
EventMachine.stop_server(@signature)

unless wait_for_connections_and_stop
EventMachine.add_periodic_timer(1) { wait_for_connections_and_stop }
end
end

def wait_for_connections_and_stop
if @connections.empty?
EventMachine.stop
true
else
puts "Waiting for #{@connections.size} connection(s) to finish ..."
false
end
end
end

class Connection < EventMachine::Connection
attr_accessor :server

def post_init
log "Connection opened"
end

def server=(server)
@server = server

@subscription = server.channel.subscribe do |data|
self.log "Sending #{data}"
self.send_data data
end
end

def receive_data(data)
log "Received #{data}"
server.channel.push data
end

def unbind
server.channel.unsubscribe @subscription
server.connections.delete(self)
log "Connection closed"
end

def log(msg)
puts "[#{self.object_id}] #{msg}"
end
end
end

EventMachine::run {
s = Messagebus::Server.new
s.start
puts "New server listening"
}

关于ruby-on-rails - 是否有 Ruby 消息总线 gem?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10247961/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com