gpt4 book ai didi

Ruby:同步 fork 池输出

转载 作者:太空宇宙 更新时间:2023-11-03 16:27:04 24 4
gpt4 key购买 nike

我正在尝试创建一种使用多个处理器迭代 Enumerable 的通用方法。我正在使用 fork 产生一定数量的 worker ,并向他们提供数据以处理重用闲置 worker 。但是,我想同步输入和输出顺序。如果作业 1 和作业 2 同时启动并且作业 2 在作业 1 之前完成,则结果顺序不同步。我想以某种方式动态缓存输出以同步输出顺序,但我看不出如何做到这一点?

#!/usr/bin/env ruby

require 'pp'

DEBUG = false
CPUS = 2

module Enumerable
# Fork each (feach) creates a fork pool with a specified number of processes
# to iterate over the Enumerable object processing the specified block.
# Calling feach with :processes => 0 disables forking for debugging purposes.
# It is possible to disable synchronized output with :synchronize => false
# which will save some overhead.
#
# @example - process 10 elements using 4 processes:
#
# (0 ... 10).feach(:processes => 4) { |i| puts i; sleep 1 }
def feach(options = {}, &block)
$stderr.puts "Parent pid: #{Process.pid}" if DEBUG

procs = options[:processes] || 0
sync = options[:synchronize] || true

if procs > 0
workers = spawn_workers(procs, &block)
threads = []

self.each_with_index do |elem, index|
$stderr.puts "elem: #{elem} index: #{index}" if DEBUG

threads << Thread.new do
worker = workers[index % procs]
worker.process(elem)
end

if threads.size == procs
threads.each { |thread| thread.join }
threads = []
end
end

threads.each { |thread| thread.join }
workers.each { |worker| worker.terminate }
else
self.each do |elem|
block.call(elem)
end
end
end

def spawn_workers(procs, &block)
workers = []

procs.times do
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe

pid = Process.fork do
begin
parent_write.close
parent_read.close
call(child_read, child_write, &block)
ensure
child_read.close
child_write.close
end
end

child_read.close
child_write.close

$stderr.puts "Spawning worker with pid: #{pid}" if DEBUG

workers << Worker.new(parent_read, parent_write, pid)
end

workers
end

def call(child_read, child_write, &block)
while not child_read.eof?
elem = Marshal.load(child_read)
$stderr.puts " call with Process.pid: #{Process.pid}" if DEBUG
result = block.call(elem)
Marshal.dump(result, child_write)
end
end

class Worker
attr_reader :parent_read, :parent_write, :pid

def initialize(parent_read, parent_write, pid)
@parent_read = parent_read
@parent_write = parent_write
@pid = pid
end

def process(elem)
Marshal.dump(elem, @parent_write)
$stderr.puts " process with worker pid: #{@pid} and parent pid: #{Process.pid}" if DEBUG
Marshal.load(@parent_read)
end

def terminate
$stderr.puts "Terminating worker with pid: #{@pid}" if DEBUG
Process.wait(@pid, Process::WNOHANG)
@parent_read.close
@parent_write.close
end
end
end

def fib(n) n < 2 ? n : fib(n-1)+fib(n-2); end # Lousy Fibonacci calculator <- heavy job

(0 ... 10).feach(processes: CPUS) { |i| puts "#{i}: #{fib(35)}" }

最佳答案

除非您强制所有子进程将它们的输出发送给父进程并让它对结果进行排序,或者您在进程之间强制执行某种 I/O 锁定,否则无法同步输出。

如果不知道您的长期目标是什么,就很难提出解决方案。通常,您需要在每个进程中做大量工作才能使用 fork 获得任何显着的加速,并且没有一种简单的方法可以将结果返回到主程序。

native 线程(Linux 上的 pthreads)可能更适合完成您正在尝试做的事情,但并非所有版本的 Ruby 都支持该级别的线程。参见:

Does ruby have real multithreading?

关于Ruby:同步 fork 池输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22698901/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com