gpt4 book ai didi

Ruby 线程和 Websocket

转载 作者:太空宇宙 更新时间:2023-11-03 16:21:13 24 4
gpt4 key购买 nike

这已经困扰我一段时间了——这似乎是完全可行的事情,但我被卡住了。

我有一个小的 Ruby 程序,它只是充当中间人。它运行很长时间(几分钟),阻塞操作(通过 FFI 接口(interface)),但应该通过 DDP 连接通过回调将它从该操作获得的定期更新发送到主 Meteor 应用程序。

该程序的两个组件均独立运行。通过我自己的系统,以及 metybur gem ,我能够与 Meteor 应用程序通信。而且,如果我只是使用 puts 从 FFI 接口(interface)的回调中输出数据,我也能完美地获得这些数据。 (除此之外,由于另一个原因我不能完全确定,如果 FFI/阻塞操作在 Thread.new block 中,它会默默地失败。)

然而,出于某种原因,当我尝试将数据发送到 Meteor 应用程序时,没有任何反应。 ws.send(在 EventMachine 上)返回 true,但实际上从未被调用,即使我将它放在它自己的 Thread.new block 中也是如此。

部分我怀疑(虽然不知道如何测试)连接丢失是因为 Ruby 应用程序无法在阻塞期间处理 ping/pong keepalive 请求。

我已经尝试从 EventMachine 中使用 EM.spawn 来阻止进程,我已经尝试在它自己的线程中启动 EventMachine,但似乎没有任何效果。

想知道是否有类似这样的最佳实践,即使在 CPU 密集型阻塞操作期间也能够保持应用程序的 EventMachine 部分响应?

最佳答案

已编辑

经过我们在评论中的讨论,我决定审查我发布的代码并编写一个小的 DDP 封装利用 Iodine's Websocket 客户端(我更喜欢它,因为我是作者)。

我不得不承认,我在思考这个问题时真的很开心。附件是使用 Iodine 的 Meteor 连接器的简化代码。这是非常基本的,仅包括:断开连接时更新连接、完成握手和应答乒乓球。

要将此代码与第一个答案中启动的 FFI 工作流概念一起使用,请使用:

# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
Iodine.debug "got message #{message}, it's a Hash"
end

# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
# initialize FFI interface
data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
# imagine it takes time
sleep 1
# Meteor will respond to these with error messages
(meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
sleep 1
Iodine.signal_exit
end

# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }

对于 Meteor DDP 连接类,这可能是这样实现的:

require 'iodine/client'

class IodineDDP
attr_reader :session
attr_reader :server_id
def initialize url, &block
@url = url
@ddp_initialized = false
@session = nil
@server_id = nil
@block = block
@closed = false
connect_websocket
end

def << message
Iodine.debug "Writing message #{message}"
ensure_connection
@ws << message
end
alias :write :<<

def close
@closed = true
@ws.on_close { nil }
@ws.close
end

protected

def on_message data
# make sure the DDP handshake is complete
return handshake data unless @ddp_initialized
data = JSON.parse(data)
Iodine.debug "Got message: #{data}"
return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
return true if data['msg'] == 'pong'
@block.call data
end
def on_close
@ddp_initialized = false
connect_websocket
end

def ensure_connection
return true unless @ws.closed? || !@ddp_initialized
raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
timeout = Iodine.time + 3
sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
end

def connect_websocket
@___on_message_proc ||= method(:on_message)
@___on_close_proc ||= method(:on_close)
@ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
# inform
Iodine.debug "initiating a new DDP connection to #{@url}"
# start the DDP handshake
handshake
end
def handshake last_message = nil
raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
unless last_message # this is the first message sent
Iodine.debug "Meteor DDP handshake initiated."
msg = {msg: "connect", version: "1", support: ["1"]}
msg[:session] = @session if @session
return(@ws << msg.to_json)
end
message = JSON.parse(last_message)
raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
if message['msg'] == 'connected'
# inform
Iodine.debug "Meteor DDP handshake complete."
@session = message['session']
return @ddp_initialized = true
else
return @server_id = message['server_id'] if message['server_id']
Iodine.error "Invalid handshake data - closing connection."
close
end
end
end

# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3

# # if we are inside a larger application, call:
# Iodine.force_start!

# # if we are on irb:
exit
# no need to write anything if this is the whole of the script

关于Ruby 线程和 Websocket,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33207545/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com