gpt4 book ai didi

rust - 如何将ParallelIterator转换回顺序Iterator?

转载 作者:行者123 更新时间:2023-12-03 11:41:26 25 4
gpt4 key购买 nike

我正在从数据库中迭代数GB的输入项。在每个输入项目上,我都在进行一些CPU密集型处理,从而产生一个或多个新的输出项目,总计数十GB。然后将输出项目存储在另一个数据库表中。
通过使用Rayon进行并行处理,我得到了很好的加速。但是,数据库API不是线程安全的。它是Send而不是Sync,因此必须对I/O进行序列化。
理想情况下,我只想写:

input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
基本上我想要 par_bridge()的反面;在调用它的线程上运行的东西,从每个线程中读取项目,并串行产生它们。但是在Rayon的当前实现中,这似乎并不存在。我不确定这是因为从理论上讲是不可能的,还是不适合当前库的设计。
输出太大,无法首先将其全部收集到 Vec中;需要直接将其流式传输到数据库中。
顺便说一句,我还没有嫁给人造丝。如果还有另一个更合适的 crate ,我很乐意进行切换。

最佳答案

我认为顺序无关紧要,因此您不需要对输出数据进行排序。
您可以使用 mpsc::channel 将数据从for_each闭包传输到数据库api,例如

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();

input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each(move |output_item| {
tx.send(output_item).unwrap();
});
在第二个线程中,您可以使用 rx变量来接收数据并将其写入数据库。

关于rust - 如何将ParallelIterator转换回顺序Iterator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66560235/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com