- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这个问题在这里已经有了答案:
What is the best approach to encapsulate blocking I/O in future-rs?
(1 个回答)
1年前关闭。
不幸的是,我对 Rust 并发开发的理解存在明显的失误。这个问题源于数周反复努力解决一个看似“微不足道”的问题。
问题域
开发一个名为 twistrs 的 Rust 库,它是一个域名排列和枚举库。该库的目的和目标是提供一个根域(例如 google.com
)并生成该域的排列(例如 guugle.com
)并丰富该排列(例如它解析为 123.123.123.123
)。
它的目标之一是比它的 Python counterpart 执行得更快。最值得注意的是,网络调用,例如但不限于 DNS 查找。
目前的设计方案
该库背后的想法(除了作为学习场所之外)是开发一个非常简单的安全库,可以实现以满足各种需求。您(作为客户端)可以选择在内部直接与 permutation 或 enrichment 模块交互,或者使用库提供的异步/并发实现。
请注意,内部没有共享状态。这可能是非常低效的,但暂时有点没有意义,因为它可以防止很多问题。
当前问题
在内部,DNS 查找是同步完成的,并且本质上是阻塞的。我无法将其转换为并发代码。我能得到的最接近的是使用 tokio mpsc channel ,并执行 spawn 单个 tokio 任务:
use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let domain = Domain::new("google.com").unwrap();
let _permutations = domain.all().unwrap().collect::<Vec<String>>();
let (mut tx, mut rx) = mpsc::channel(1000);
tokio::spawn(async move {
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
}
也就是说,精明的读者会立即注意到这会阻塞,并有效地以任何方式同步运行 DNS 查找。
move
上完成了
tx
(并且
tx
不是 impl
Copy
),因此无法尝试在 for 循环中生成 Tokio 任务:
for (i, v) in _permutations.into_iter().enumerate() {
tokio::spawn(async move {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
});
}
删除
await
当然不会导致任何事情发生,因为需要轮询生成的任务。我如何有效地将所有这些同步调用包装到异步任务中,这些任务可以独立运行并最终汇聚到一个集合中?
examples/twistrs-cli/main.rs
。
最佳答案
编辑:我误解了你的问题,没有意识到 DNS 解析本身不是异步的。以下方法实际上不适用于同步代码,并且只会由于阻塞代码而导致执行程序停止,但如果您切换到异步解析方法,我会保留它。我建议使用 tokio 的异步 lookup_host()
如果这符合您的需求。
异步执行器旨在处理大量并行任务,因此您可以尝试为每个请求生成一个新任务,使用 Semaphore
一次创建运行任务数量的上限。其代码可能如下所示:
let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let mut tx = tx.clone(); // every task will have its own copy of the sender
let permit = semaphore.acquire_owned().await; // wait until we have a permit
let dns_resolution = domain_metadata.dns_resolvable();
tokio::spawn(async move {
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
drop(permit); // explicitly release the permit, to make sure it was moved into this task
}); // note: task spawn results and handle dropped here
}
while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
如果额外任务的开销被证明太显着,您可以尝试将这些任务组合成一个 future ,使用像
FuturesUnordered
这样的工具。来自
futures
箱。这使您可以获取任意大的 future 列表,并在单个任务中重复轮询它们。
关于rust - 你如何用 Tokio 简单地包装同步网络 I/O?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63548625/
我是一名优秀的程序员,十分优秀!