gpt4 book ai didi

rust - 为什么我的 Future 实现在被轮询一次并且 NotReady 后被阻止?

转载 作者:行者123 更新时间:2023-11-29 08:35:29 26 4
gpt4 key购买 nike

我实现了 future 并向它发出请求,但它阻止了我的 curl 并且日志显示 poll 只被调用了一次。

我有没有做错什么?

use failure::{format_err, Error};
use futures::{future, Async};
use hyper::rt::Future;
use hyper::service::{service_fn, service_fn_ok};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use std::{
sync::{Arc, Mutex},
task::Waker,
thread,
};

pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
completed: bool,
resp: String,
}

impl Future for TimerFuture {
type Item = Response<Body>;
type Error = hyper::Error;
fn poll(&mut self) -> futures::Poll<Response<Body>, hyper::Error> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
return Ok(Async::Ready(Response::new(Body::from(
shared_state.resp.clone(),
))));
} else {
return Ok(Async::NotReady);
}
}
}

impl TimerFuture {
pub fn new(instance: String) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
resp: String::new(),
}));
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
let res = match request_health(instance) {
Ok(status) => status.clone(),
Err(err) => {
error!("{:?}", err);
format!("{}", err)
}
};
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
shared_state.resp = res;
});

TimerFuture { shared_state }
}
}

fn request_health(instance_name: String) -> Result<String, Error> {
std::thread::sleep(std::time::Duration::from_secs(1));
Ok("health".to_string())
}

type BoxFut = Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send>;
fn serve_health(req: Request<Body>) -> BoxFut {
let mut response = Response::new(Body::empty());
let path = req.uri().path().to_owned();
match (req.method(), path) {
(&Method::GET, path) => {
return Box::new(TimerFuture::new(path.clone()));
}
_ => *response.status_mut() = StatusCode::NOT_FOUND,
}
Box::new(future::ok(response))
}

fn main() {
let endpoint_addr = "0.0.0.0:8080";
match std::thread::spawn(move || {
let addr = endpoint_addr.parse().unwrap();
info!("Server is running on {}", addr);
hyper::rt::run(
Server::bind(&addr)
.serve(move || service_fn(serve_health))
.map_err(|e| eprintln!("server error: {}", e)),
);
})
.join()
{
Ok(e) => e,
Err(e) => println!("{:?}", e),
}
}

编译运行这段代码后,一个8080端口的服务器就运行起来了。使用 curl 调用服务器,它会阻塞:

curl 127.0.0.1:8080/my-health-scope

最佳答案

Did I implement anything wrong?

是的,你没有阅读和关注the documentation for the method you are implementing (强调我的):

When a future is not ready yet, the Async::NotReady value will be returned. In this situation the future will also register interest of the current task in the value being produced. This is done by calling task::park to retrieve a handle to the current Task. When the future is then ready to make progress (e.g. it should be polled again) the unpark method is called on the Task.

作为 minimal, reproducible example ,让我们使用这个:

use futures::{future::Future, Async};
use std::{
mem,
sync::{Arc, Mutex},
thread,
time::Duration,
};

pub struct Timer {
data: Arc<Mutex<String>>,
}

impl Timer {
pub fn new(instance: String) -> Self {
let data = Arc::new(Mutex::new(String::new()));

thread::spawn({
let data = data.clone();
move || {
thread::sleep(Duration::from_secs(1));
*data.lock().unwrap() = instance;
}
});

Timer { data }
}
}

impl Future for Timer {
type Item = String;
type Error = ();

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let mut data = self.data.lock().unwrap();

eprintln!("poll was called");

if data.is_empty() {
Ok(Async::NotReady)
} else {
let data = mem::replace(&mut *data, String::new());
Ok(Async::Ready(data))
}
}
}

fn main() {
let v = Timer::new("Some text".into()).wait();
println!("{:?}", v);
}

它只打印一次“poll was called”。

你可以在Future::poll的实现中调用task::current(以前是task::park),保存结果值,然后使用 Task::notify 的值(以前是 Task::unpark),只要 future 可能再次被轮询:

use futures::{
future::Future,
task::{self, Task},
Async,
};
use std::{
mem,
sync::{Arc, Mutex},
thread,
time::Duration,
};

pub struct Timer {
data: Arc<Mutex<(String, Option<Task>)>>,
}

impl Timer {
pub fn new(instance: String) -> Self {
let data = Arc::new(Mutex::new((String::new(), None)));
let me = Timer { data };

thread::spawn({
let data = me.data.clone();
move || {
thread::sleep(Duration::from_secs(1));
let mut data = data.lock().unwrap();

data.0 = instance;
if let Some(task) = data.1.take() {
task.notify();
}
}
});

me
}
}

impl Future for Timer {
type Item = String;
type Error = ();

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let mut data = self.data.lock().unwrap();

eprintln!("poll was called");

if data.0.is_empty() {
let v = task::current();
data.1 = Some(v);
Ok(Async::NotReady)
} else {
let data = mem::replace(&mut data.0, String::new());
Ok(Async::Ready(data))
}
}
}

fn main() {
let v = Timer::new("Some text".into()).wait();
println!("{:?}", v);
}

另见:

关于rust - 为什么我的 Future 实现在被轮询一次并且 NotReady 后被阻止?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58377995/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com