gpt4 book ai didi

线程间共享的 Ruby TCPServer 套接字

转载 作者:行者123 更新时间:2023-12-05 05:31:30 25 4
gpt4 key购买 nike

我目前仅使用标准库在 ruby​​ 中实现一个简单的发布/订阅系统,客户端可以在其中订阅消息主题或将消息发布到主题,这些消息将发送给所有订阅者。我只使用标准的 TCPServer 并假设所有消息都在使用 gets 读取的一行中。我正在使用 ruby​​ Queue 在线程之间进行通信并避免访问共享内存资源,除非它们是线程安全的。我已经到了一个地步,除了在线程之间共享我的 TCPSocket 客户端之外,我看不到任何其他方法可以继续,因为一个线程需要在循环中阻塞,同时等待套接字上的新数据,而其他需要等待发布的新消息并将它们写入客户端。 ruby 线程中的 TCPSocket 是否安全?如果不是,一个简单的 dupclone 调用就足够了,并且每个线程都有自己的套接字引用吗?

对于代码引用,我有以下基本实现

require 'socket'

socket = TCPServer.new(4242)

processing_queue = Queue.new
Thread.start({}) do |subscriptions|
while event = processing_queue.pop
command, *args = event
case command
when 'publish'
topic, message = args
subscriptions[topic].each do |subscription_queue|
subscription_queue.push(message)
end
when 'subscribe'
topic, subscription_queue = args
subscriptions[topic] = [] if subscriptions[topic].nil?
subscriptions[topic] << subscription_queue
end
end
end

loop do
Thread.start(socket.accept, Queue.new) do |client, queue|
writer_queue = Queue.new
Thread.start do
while response = writer_queue.pop
client.puts(response)
end
end

while request = client.gets
command, *args = request.split(' ')
case command
when 'subscribe'
topic = args[0]
Thread.start(Queue.new) do |subscription_queue|
processing_queue << ['subscribe', topic, subscription_queue]
while message = subscription_queue.pop
writer_queue << message
end
end
writer_queue << 'OK'
when 'publish'
topic = args.shift
message = args.join(' ')
processing_queue << ['publish', topic, message]
writer_queue << 'OK'
end
end
client.close
end
end

socket.close

最佳答案

因此,为了避免担心使用多线程访问共享资源的整个头痛,我创建了一个基于这个非常简单的单线程方法 example .

这个解决方案可以改进,应该真正使用非阻塞读取和写入套接字,而不是使用 putsgets,但我只是实现了一些基本的东西用于演示目的。

不过,我仍然希望听到更多关于多线程方法的信息,因为我觉得必须有一个人们已经使用过的更完整的解决方案。如果有人有建议请回答。

require 'socket'

server = TCPServer.open("0.0.0.0", 4242)
read_fds = [server]
subscriptions = Hash.new([])
while true
puts 'loop'
if ios = select(read_fds, [], [])
selected_reads = ios.first
p selected_reads
selected_reads.each do |client|
if client == server
puts 'Someone connected to server. Adding socket to read_fds.'
client, sockaddr = server.accept
read_fds << client
elsif client.eof?
puts "Client disconnected"
read_fds.delete(client)
client.close
else
# Perform a blocking-read until new-line is encountered.
# We know the client is writing, so as long as it adheres to the
# new-line protocol, we shouldn't block for long
puts "Reading..."
request = client.gets
action, topic, *args = request.split(' ')
if action == 'subscribe'
subscriptions[topic] << client
client.puts('OK')
elsif action == 'publish'
message = args.join(' ')
# Should also be writing to socket in non blocking way using select
# but again keeping simple assuming all messages to write are small
subscriptions[topic].each { |client| client.puts(message) }
client.puts('OK')
else
puts "Invalid request #{action}"
client.puts('ERROR')
end
end
end
end
end

关于线程间共享的 Ruby TCPServer 套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74351143/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com