gpt4 book ai didi

concurrency - rust future -cpupool : inconsistent behavior explanations

转载 作者:行者123 更新时间:2023-11-29 08:08:41 25 4
gpt4 key购买 nike

我在使用 cpu pools 时遇到了奇怪的行为:

#[macro_use]
extern crate lazy_static;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;

use std::time::Duration;

use futures_cpupool::{CpuPool, Builder, CpuFuture};
use futures::Stream;
use futures::{Future, future, lazy};
use futures::sync::mpsc;
use futures::Sink;

lazy_static! {
static ref CPU_POOL: CpuPool = {
Builder::new()
.pool_size(10)
.after_start(|| {
println!("Pool started one thread");
})
.before_stop(|| {
println!("Pool stopped one thread");
})
.create()
};
}

struct Producer {}

impl Producer {
fn search_names(&self) -> Box<Stream<Item = String, Error = String> + Send> {
let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1);

println!("Creating producer thread...");
let producer_cpu: CpuFuture<(), ()> = CPU_POOL.spawn(lazy(move || {
println!(" -- Begin to produce names");
for i in 0..10 {
match tx.send(Ok("name".to_string())).wait() {
Ok(t) => {
println!(" -- sent the name");
tx = t
}
Err(err) => {
println!(" -- Error occured sending name! {:?}", err);
break;
}
}
std::thread::sleep(Duration::from_secs(1));
}
future::ok::<(), ()>(())
})
.then(|result| {
match result {
Ok(data) => println!("Producer finished with data: {:?}", data),
Err(err) => println!("Producer finished with error: {:?}", err),
}
future::ok::<(), ()>(())
}));

rx.then(|r| r.unwrap()).boxed()
}
}

fn main() {
let producer = Producer {};

let names = CPU_POOL.spawn(producer.search_names()
.map(|name| {
println!("name = {:?}", name);
name
})
.collect()
.then(|result| {
match result {
Ok(data) => println!("Finished to read producer {:?}", data),
Err(err) => println!("Error reading stream of producer! {:?}", err),
}
future::ok::<(), ()>(())
}));

names.wait();
}

这里是对应的Cargo.toml

[package]
name = "example"
version = "0.1.0"

[dependencies]
lazy_static = "^0.1.*"

tokio-core = "^0.1"
futures = "^0.1"
futures-cpupool = "^0.1"

我每晚都在 Rust 上运行(1.16.0-nightly (df8debf6d 2017-01-25))

我希望这个程序生成 10 个 String,通过 println 输出并退出。但是,大多数时候,程序不会生成 String 并正常退出,其他时候会正确生成 String

这是第一种情况的输出:

Creating producer thread...
Pool started one thread
Finished to read producer []
Pool started one thread
Pool started one thread
Pool started one thread
Pool started one thread

以及 String 生成时的输出

Pool started one thread
Pool started one thread
Pool started one thread
Pool started one thread
Creating producer thread...
-- Begin to produce names
-- sent the name
name = "name"
Pool started one thread
-- sent the name
name = "name"
Producer finished with data: ()
Finished to read producer ["name", "name"]

我的感觉是,对于第一种情况,生产者线程出于某种原因没有被安排到线程池中。我一定是错过了什么,但我不知道是什么。

最佳答案

问题的原因是生产者 future 的早期下跌。

在方法 search_names 上,产生值的 CpuFuturesearch_names 返回时被丢弃。删除时,CpuFuture 将被取消,从而跳过值的生成。行为上的差异肯定来自 future 的下降和执行之间的竞争。

一个解决方案是像这样在整个应用程序中引用生产者的 future :

#[macro_use]
extern crate lazy_static;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;

use std::time::Duration;

use futures_cpupool::{CpuPool, Builder, CpuFuture};
use futures::Stream;
use futures::{Future, future, lazy};
use futures::sync::mpsc;
use futures::Sink;

lazy_static! {
static ref CPU_POOL: CpuPool = {
Builder::new()
.pool_size(5)
.after_start(|| {
println!("Pool started one thread");
})
.before_stop(|| {
println!("Pool stopped one thread");
})
.create()
};
}

struct Producer {}

impl Producer {
fn search_names(&self) -> (CpuFuture<(), ()>, Box<Stream<Item = String, Error = String> + Send>) {
let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1);

println!("Creating producer thread...");
let producer_cpu: CpuFuture<(), ()> = CPU_POOL.spawn(
lazy(move || {
println!(" -- Begin to produce names");
for i in 0..2 {
match tx.send(Ok("name".to_string())).wait() {
Ok(t) => {
println!(" -- sent the name");
tx = t
},
Err(err) => {
println!(" -- Error occured sending name! {:?}", err);
break
},
}
std::thread::sleep(Duration::from_secs(1));
}
future::ok::<(), ()>(())
}).then(|result| {
match result {
Ok(data) => println!("Producer finished with data: {:?}", data),
Err(err) => println!("Producer finished with error: {:?}", err),
}
future::ok::<(), ()>(())
})
);

(
producer_cpu,
rx.then(|r| r.unwrap()).boxed()
)
}
}

fn main() {
let producer = Producer {};

let (future, stream) = producer.search_names();
let names = CPU_POOL.spawn(
stream
.map(|name| {
println!("name = {:?}", name);
name
})
.collect()
.then(|result| {
match result {
Ok(data) => println!("Finished to read producer {:?}", data),
Err(err) => println!("Error reading stream of producer! {:?}", err)
}
future::ok::<(), ()>(())
})
);

names.wait();
}

关于concurrency - rust future -cpupool : inconsistent behavior explanations,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41950097/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com