作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在从数据库中迭代数GB的输入项。在每个输入项目上,我都在进行一些CPU密集型处理,从而产生一个或多个新的输出项目,总计数十GB。然后将输出项目存储在另一个数据库表中。
通过使用Rayon进行并行处理,我得到了很好的加速。但是,数据库API不是线程安全的。它是Send
而不是Sync
,因此必须对I/O进行序列化。
理想情况下,我只想写:
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
基本上我想要
par_bridge()
的反面;在调用它的线程上运行的东西,从每个线程中读取项目,并串行产生它们。但是在Rayon的当前实现中,这似乎并不存在。我不确定这是因为从理论上讲是不可能的,还是不适合当前库的设计。
Vec
中;需要直接将其流式传输到数据库中。
最佳答案
我认为顺序无关紧要,因此您不需要对输出数据进行排序。
您可以使用 mpsc::channel
将数据从for_each
闭包传输到数据库api,例如
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each(move |output_item| {
tx.send(output_item).unwrap();
});
在第二个线程中,您可以使用
rx
变量来接收数据并将其写入数据库。
关于rust - 如何将ParallelIterator转换回顺序Iterator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66560235/
我是一名优秀的程序员,十分优秀!