- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在使用 cpu pools 时遇到了奇怪的行为:
#[macro_use]
extern crate lazy_static;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;
use std::time::Duration;
use futures_cpupool::{CpuPool, Builder, CpuFuture};
use futures::Stream;
use futures::{Future, future, lazy};
use futures::sync::mpsc;
use futures::Sink;
lazy_static! {
static ref CPU_POOL: CpuPool = {
Builder::new()
.pool_size(10)
.after_start(|| {
println!("Pool started one thread");
})
.before_stop(|| {
println!("Pool stopped one thread");
})
.create()
};
}
struct Producer {}
impl Producer {
fn search_names(&self) -> Box<Stream<Item = String, Error = String> + Send> {
let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1);
println!("Creating producer thread...");
let producer_cpu: CpuFuture<(), ()> = CPU_POOL.spawn(lazy(move || {
println!(" -- Begin to produce names");
for i in 0..10 {
match tx.send(Ok("name".to_string())).wait() {
Ok(t) => {
println!(" -- sent the name");
tx = t
}
Err(err) => {
println!(" -- Error occured sending name! {:?}", err);
break;
}
}
std::thread::sleep(Duration::from_secs(1));
}
future::ok::<(), ()>(())
})
.then(|result| {
match result {
Ok(data) => println!("Producer finished with data: {:?}", data),
Err(err) => println!("Producer finished with error: {:?}", err),
}
future::ok::<(), ()>(())
}));
rx.then(|r| r.unwrap()).boxed()
}
}
fn main() {
let producer = Producer {};
let names = CPU_POOL.spawn(producer.search_names()
.map(|name| {
println!("name = {:?}", name);
name
})
.collect()
.then(|result| {
match result {
Ok(data) => println!("Finished to read producer {:?}", data),
Err(err) => println!("Error reading stream of producer! {:?}", err),
}
future::ok::<(), ()>(())
}));
names.wait();
}
这里是对应的Cargo.toml
[package]
name = "example"
version = "0.1.0"
[dependencies]
lazy_static = "^0.1.*"
tokio-core = "^0.1"
futures = "^0.1"
futures-cpupool = "^0.1"
我每晚都在 Rust 上运行(1.16.0-nightly (df8debf6d 2017-01-25)
)
我希望这个程序生成 10 个 String
,通过 println
输出并退出。但是,大多数时候,程序不会生成 String
并正常退出,其他时候会正确生成 String
。
这是第一种情况的输出:
Creating producer thread...
Pool started one thread
Finished to read producer []
Pool started one thread
Pool started one thread
Pool started one thread
Pool started one thread
以及 String
生成时的输出
Pool started one thread
Pool started one thread
Pool started one thread
Pool started one thread
Creating producer thread...
-- Begin to produce names
-- sent the name
name = "name"
Pool started one thread
-- sent the name
name = "name"
Producer finished with data: ()
Finished to read producer ["name", "name"]
我的感觉是,对于第一种情况,生产者线程出于某种原因没有被安排到线程池中。我一定是错过了什么,但我不知道是什么。
最佳答案
问题的原因是生产者 future 的早期下跌。
在方法 search_names
上,产生值的 CpuFuture
在 search_names
返回时被丢弃。删除时,CpuFuture
将被取消,从而跳过值的生成。行为上的差异肯定来自 future 的下降和执行之间的竞争。
一个解决方案是像这样在整个应用程序中引用生产者的 future :
#[macro_use]
extern crate lazy_static;
extern crate tokio_core;
extern crate futures;
extern crate futures_cpupool;
use std::time::Duration;
use futures_cpupool::{CpuPool, Builder, CpuFuture};
use futures::Stream;
use futures::{Future, future, lazy};
use futures::sync::mpsc;
use futures::Sink;
lazy_static! {
static ref CPU_POOL: CpuPool = {
Builder::new()
.pool_size(5)
.after_start(|| {
println!("Pool started one thread");
})
.before_stop(|| {
println!("Pool stopped one thread");
})
.create()
};
}
struct Producer {}
impl Producer {
fn search_names(&self) -> (CpuFuture<(), ()>, Box<Stream<Item = String, Error = String> + Send>) {
let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1);
println!("Creating producer thread...");
let producer_cpu: CpuFuture<(), ()> = CPU_POOL.spawn(
lazy(move || {
println!(" -- Begin to produce names");
for i in 0..2 {
match tx.send(Ok("name".to_string())).wait() {
Ok(t) => {
println!(" -- sent the name");
tx = t
},
Err(err) => {
println!(" -- Error occured sending name! {:?}", err);
break
},
}
std::thread::sleep(Duration::from_secs(1));
}
future::ok::<(), ()>(())
}).then(|result| {
match result {
Ok(data) => println!("Producer finished with data: {:?}", data),
Err(err) => println!("Producer finished with error: {:?}", err),
}
future::ok::<(), ()>(())
})
);
(
producer_cpu,
rx.then(|r| r.unwrap()).boxed()
)
}
}
fn main() {
let producer = Producer {};
let (future, stream) = producer.search_names();
let names = CPU_POOL.spawn(
stream
.map(|name| {
println!("name = {:?}", name);
name
})
.collect()
.then(|result| {
match result {
Ok(data) => println!("Finished to read producer {:?}", data),
Err(err) => println!("Error reading stream of producer! {:?}", err)
}
future::ok::<(), ()>(())
})
);
names.wait();
}
关于concurrency - rust future -cpupool : inconsistent behavior explanations,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41950097/
我正在通过这个示例https://www.rusoto.org/futures.html学习Rust和Rusoto 而且我发现许多代码已经过时了。所以我改变了这样的代码: use rusoto_cor
这是一个理论问题。我有一个服务可以调用来完成工作,但该服务可能无法完成所有工作,因此我需要调用第二个服务来完成它。 我想知道是否有办法在没有 Await.result 的情况下做类似的事情map 函数
这个问题是关于如何阅读 Rust 文档并提高我对 Rust 的理解,从而了解如何解决这个特定的编译器错误。 我读过 tokio docs并试验了许多 examples .在编写自己的代码时,我经常遇到
我有一个使用分页的 HTTP api,我想将它包装到一个通用的 Rust 流中,以便所有端点都可以使用相同的接口(interface),这样我就可以使用 Stream 附带的特征函数特征。 我收到了这
我正在查看 AKKA 的 Java Futures API,我看到了很多处理同一类型的多个 future 的方法,但我没有看到任何处理不同类型的 future 的方法。我猜我让事情变得更加复杂了。 无
环境:Akka 2.1,scala 版本 2.10.M6,JDK 1.7,u5 现在是我的问题: 我有: future1 = Futures.future(new Callable>(){...});
我有一些代码可以将请求提交给另一个线程,该线程可能会也可能不会将该请求提交给另一个线程。这会产生 Future> 的返回类型.是否有一些非令人发指的方法可以立即将其变成 Future等待整个 futu
如果我有以下代码: Future a = new Future(() { print('a'); return 1; }); Future b = new Future.error('Error!')
我一直试图简化我在 Scala 中做 future 的方式。我有一次收到了 Future[Option[Future[Option[Boolean]]但我在下面进一步简化了它。有没有更好的方法来简化这
Scala 中从 Future[Option[Future[Int]]] 转换的最干净的方法是什么?至 Future[Option[Int]] ?甚至有可能吗? 最佳答案 有两个嵌套Future s
使用下面的示例,future2 如何在 future1 完成后使用 future1 的结果(不阻塞 future3 从被提交)? from concurrent.futures import Proc
这两个类代表了并发编程的优秀抽象,因此它们不支持相同的 API 有点令人不安。 具体根据docs : asyncio.Future is almost compatible with concurre
我正在尝试使用 wasm_bindgen 实现 API 类使用异步调用。 #![allow(non_snake_case)] use std::future::Future; use serde::{
这个问题在这里已经有了答案: Futures / Success race (3 个回答) 去年关闭。 所有的 future 最终可能会成功(有些可能会失败),但我们希望第一个成功。并希望将这一结果表
我在练习asyncio在编写多线程代码多年之后。 注意到一些我觉得很奇怪的东西。都在 asyncio在 concurrent有一个Future目的。 from asyncio import Futur
如何将Future[Option[Future[Option[X]]]]转换为Future[Option[X]]? 如果它是 TraversableOnce 而不是 Option 我会使用 Futur
我正在尝试同时发送 HTTP 请求。为此,我使用 concurrent.futures 这是简单的代码: import requests from concurrent import futures
我们在 vertx 中使用 Futures 的例子如下: Future fetchVehicle = getUserBookedVehicle(routingContext, client);
下面的函数,取自 here : fn connection_for( &self, pool_key: PoolKey, ) -> impl Future>, ClientError>
我正在围绕Java库编写一个小的Scala包装器。 Java库有一个对象QueryExecutor,它公开了2种方法: execute(query):结果 asyncExecute(query):Li
我是一名优秀的程序员,十分优秀!