gpt4 book ai didi

performance - 在 Julia 中并行操作大型常量数据结构

转载 作者:行者123 更新时间:2023-12-01 00:52:55 24 4
gpt4 key购买 nike

我有一个大的字符串向量向量:
大约有 50,000 个字符串向量,
每个包含 2-15 个长度为 1-20 个字符的字符串。
MyScoringOperation是一个函数,它对字符串向量(数据)进行操作并返回一个包含 10100 个分数的数组(作为 Float64s)。运行MyScoringOperation大约需要0.01秒(取决于数据的长度)

function MyScoringOperation(state:State, datum::Vector{String})
...
score::Vector{Float64} #Size of score = 10000

我有什么相当于嵌套循环。
外循环通常会运行 500 次迭代
data::Vector{Vector{String}} = loaddata()
for ii in 1:500
score_total = zeros(10100)
for datum in data
score_total+=MyScoringOperation(datum)
end
end

在一台计算机上,在一个 3000(而不是 50,000)的小测试用例上,每个外循环需要 100-300 秒。

我有 3 台安装了 Julia 3.9 的强大服务器(并且可以更轻松地获得 3 台,然后在下一个规模下可以获得数百台)。

我对@parallel 有基本的经验,但是它似乎花了很多时间来复制常量(它或多或少地卡在较小的测试用例上)

看起来像:
data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500

score_total = @parallel(+) for datum in data
MyScoringOperation(state, datum)
end
state = update(state, score_total)
end

我对这个实现与@parallel 一起工作的方式的理解是:

每个 ii :
  • 分区data每个 worker 一个夹头
  • 把那个夹头发给每个 worker
  • 工作所有处理有块
  • 主程序在结果到达时对结果求和。

  • 我想删除第 2 步,
    这样就不用向每个工作人员发送一大块数据,
    我只是向每个 worker 发送一系列索引,他们从自己的 data 副本中查找。 .或者甚至更好,只给每个人自己的块,并让他们每次重用它(节省大量 RAM)。

    分析支持我对@parellel 功能的看法。
    对于类似范围的问题(数据更小),
    非并行版本运行时间为 0.09 秒,
    和并行运行
    而分析器显示几乎所有的时间都花费了 185 秒。
    Profiler 显示,其中几乎 100% 用于与网络 IO 交互。

    最佳答案

    这应该让你开始:

    function get_chunks(data::Vector, nchunks::Int)
    base_len, remainder = divrem(length(data),nchunks)
    chunk_len = fill(base_len,nchunks)
    chunk_len[1:remainder]+=1 #remained will always be less than nchunks
    function _it()
    for ii in 1:nchunks
    chunk_start = sum(chunk_len[1:ii-1])+1
    chunk_end = chunk_start + chunk_len[ii] -1
    chunk = data[chunk_start: chunk_end]
    produce(chunk)
    end
    end
    Task(_it)
    end

    function r_chunk_data(data::Vector)
    all_chuncks = get_chunks(data, nworkers()) |> collect;
    remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
    #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
    end



    function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
    total = nothing
    #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe
    @sync for rr in rem_results
    function gather(rr)
    res=fetch(rr)
    if total===nothing
    total=res
    else
    total=red_acc(total,res)
    end
    end
    @async gather(rr)
    end
    total
    end

    function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
    rem_results = map(r_chunks) do rchunk
    function do_mapred()
    @assert r_chunk.where==myid()
    @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
    end
    remotecall(r_chunk.where,do_mapred)
    end
    @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
    end
    rchunk_data将数据分成块(由 get_chunks 方法定义)并将这些块分别发送到不同的工作人员,在那里它们存储在 RemoteRefs 中。
    RemoteRefs 是对其他进程(以及可能的计算机)上的内存的引用,即
    prechunked_map_reduce对一种 map 进行变体减少让每个 worker 首先运行 map_fun在它的每个卡盘元素上,然后使用 red_acc 减少其卡盘中的所有元素。 (减少累加器功能)。最后,每个工作人员返回那里的结果,然后通过使用 red_acc 将它们全部减少在一起来组合这些结果。这次使用 fetch_reduce这样我们就可以添加第一个完成的。
    fetch_reduce是一个非阻塞的 fetch 和 reduce 操作。我相信它没有竞争条件,尽管这可能是因为 @async 中的实现细节和 @sync .当 julia 0.4 出来时,很容易加锁以使其明显没有竞争条件。

    这段代码并不是真正经过战斗的。我不相信
    您可能还想考虑使卡盘大小可调,以便您可以看到更多数据给更快的工作人员(如果有些人有更好的网络或更快的 CPU)

    您需要将代码重新表示为 map-reduce 问题,这看起来不太难。

    测试:
    data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
    chunk_data(:data, data)
    @time prechunked_mapreduce(:data, mean, (+))

    花费约 0.03 秒,当分布在 8 个工作器上时(没有一个与启动器在同一台机器上)

    vs 只在本地运行:
    @time reduce(+,map(mean,data))

    花了 ~0.06 秒。

    关于performance - 在 Julia 中并行操作大型常量数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30047182/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com