gpt4 book ai didi

multithreading - 如何并行排序?

转载 作者:行者123 更新时间:2023-12-03 23:45:10 24 4
gpt4 key购买 nike

我想对大量的东西进行排序。

Julia 的标准库排序是单线程的。
如何利用我的多核机器更快地分类?

最佳答案

这是使用(实验类型) 的解决方案Base.Threads 线程模块。

使用 pmap 的解决方案(等)分布式并行性将是相似的。虽然我认为进程间通信开销会伤害你。

这个想法是按 block 排序(每个线程一个),因此每个线程都可以完全独立,只需要处理它的 block 。

然后来合并这些预先排序的 block 。

这是合并排序列表的一个众所周知的问题。另见其他questions在那。

并且不要忘记通过设置环境变量 来设置自己的多线程。 JULIA_NUM_THREADS 在你开始前。

这是我的代码:

using Base.Threads

function blockranges(nblocks, total_len)
rem = total_len % nblocks
main_len = div(total_len, nblocks)

starts=Int[1]
ends=Int[]
for ii in 1:nblocks
len = main_len
if rem>0
len+=1
rem-=1
end
push!(ends, starts[end]+len-1)
push!(starts, ends[end] + 1)
end
@assert ends[end] == total_len
starts[1:end-1], ends
end

function threadedsort!(data::Vector)
starts, ends = blockranges(nthreads(), length(data))

# Sort each block
@threads for (ss, ee) in collect(zip(starts, ends))
@inbounds sort!(@view data[ss:ee])
end


# Go through each sorted block taking out the smallest item and putting it in the new array
# This code could maybe be optimised. see https://stackoverflow.com/a/22057372/179081
ret = similar(data) # main bit of allocation right here. avoiding it seems expensive.
# Need to not overwrite data we haven't read yet
@inbounds for ii in eachindex(ret)
minblock_id = 1
ret[ii]=data[starts[1]]
@inbounds for blockid in 2:endof(starts) # findmin allocates a lot for some reason, so do the find by hand. (maybe use findmin! ?)
ele = data[starts[blockid]]
if ret[ii] > ele
ret[ii] = ele
minblock_id = blockid
end
end
starts[minblock_id]+=1 # move the start point forward
if starts[minblock_id] > ends[minblock_id]
deleteat!(starts, minblock_id)
deleteat!(ends, minblock_id)
end
end
data.=ret # copy back into orignal as we said we would do it inplace
return data
end

我做了一些基准测试:
using Plots
function evaluate_timing(range)
sizes = Int[]
threadsort_times = Float64[]
sort_times = Float64[]
for sz in 2.^collect(range)
data_orig = rand(Int, sz)
push!(sizes, sz)

data = copy(data_orig)
push!(sort_times, @elapsed sort!(data))

data = copy(data_orig)
push!(threadsort_times, @elapsed threadedsort!(data))

@show (sz, sort_times[end], threadsort_times[end])
end
return sizes, threadsort_times, sort_times
end

sizes, threadsort_times, sort_times = evaluate_timing(0:28)
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"])
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10)

我的结果: 使用 8 个线程。

plot normal scale
plot loglog scale

我发现交叉点非常低,略高于 1024。
请注意,可以忽略最初花费的长时间——即为首次运行而 JIT 编译的代码。

奇怪的是,这些结果在使用 BenchmarkTools 时不会重现。
基准工具会停止计算初始时间。
但是当我在上面的基准代码中使用正常的时序代码时,它们会非常一致地重现。
我想它正在做一些杀死多线程的事情

非常感谢@xiaodai 指出了我分析代码中的一个错误

关于multithreading - 如何并行排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47235390/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com