gpt4 book ai didi

multithreading - 如何在 Rust 中的自定义单线程迭代器上并行处理 `map(...)`?

转载 作者:行者123 更新时间:2023-11-29 07:45:38 25 4
gpt4 key购买 nike

我有一个 MyReader实现 Iterator并产生 Buffer Buffer : Send在哪里. MyReader产生很多 Buffer非常快,但我有一个 CPU 密集型工作要在每个 Buffer 上执行( .map(|buf| ...) ) 这是我的瓶颈,然后收集结果(有序)。我想将 CPU 密集型工作并行化 - 希望并行化到 N 个线程,这将使用工作窃取以在内核数量允许的情况下尽可能快地执行它们。

编辑:更准确地说。我正在研究 rdedup . MyStructChunker上面写着 io::Read (通常是 stdio),找到数据的部分( block )并产生它们。那么map()假设,对于每个 block ,计算它的 sha256 摘要,压缩,加密,保存并返回摘要作为 map(...) 的结果。 .保存数据的摘要用于构建index的数据。 map(...) 处理的 block 之间的顺序没关系,但是从每个 map(...) 返回的摘要需要按照发现 block 的相同顺序收集。实际save to file 步骤被卸载到另一个线程(writer thread)。 actual code of PR in question

我希望我可以使用 rayon 为此,但是 rayon期望一个已经可以并行化的迭代器 - 例如。一个Vec<...>或类似的东西。我找不到获得 par_iter 的方法来自 MyReader - 我的阅读器本质上是单线程的。

simple_parallel 但文档说不建议将其用于一般用途。我想确保一切正常。

我可以采用 spmc 队列实现和自定义 thread_pool ,但我希望找到经过优化和测试的现有解决方案。

还有 pipeliner 但尚不支持有序 map 。

最佳答案

一般来说,就并行化而言,保持顺序是一项非常严格的要求。

您可以尝试使用典型的扇出/扇入设置进行手工制作:

  • 一个单一的生产者,它用连续单调递增的 ID 标记输入,
  • 一个线程池,它从这个生产者那里消费,然后将结果发送给最终消费者,
  • 对结果进行缓冲和重新排序以便按顺序处理它们的消费者。

或者您可以提高抽象级别。


此处特别感兴趣:Future

Future 表示计算的结果,可能已经发生也可能还没有发生。接收到 Future 有序列表的消费者可以简单地等待每个列表,并让缓冲在队列中自然发生。

对于奖励积分,如果您使用固定大小的队列,您会自动对消费者产生背压。


因此我建议构建一些 CpuPool 的东西.

设置将是:

use std::sync::mpsc::{Receiver, Sender};

fn produce(sender: Sender<...>) {
let pool = CpuPool::new_num_cpus();

for chunk in reader {
let future = pool.spawn_fn(|| /* do work */);
sender.send(future);
}

// Dropping the sender signals there's no more work to consumer
}

fn consume(receiver: Receiver<...>) {
while let Ok(future) = receiver.recv() {
let item = future.wait().expect("Computation Error?");

/* do something with item */
}
}

fn main() {
let (sender, receiver) = std::sync::mpsc::channel();

std::thread::spawn(move || consume(receiver));

produce(sender);
}

关于multithreading - 如何在 Rust 中的自定义单线程迭代器上并行处理 `map(...)`?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42476389/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com