gpt4 book ai didi

asynchronous - 如何在 Actix-web 中的 WebSocket 处理程序中正确调用异步函数

转载 作者:行者123 更新时间:2023-12-03 11:25:50 25 4
gpt4 key购买 nike

我在这方面取得了一些进展,使用 into_actor().spawn() ,但我正在努力访问 ctx异步块内的变量。
我将首先显示 Web 套接字处理程序的编译片段,然后是处理程序的失败片段,然后是完整的代码示例以供引用。
工作片段:
专注火柴盒Ok(ws::Message::Text(text))

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
// process websocket messages
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => {
let future = async move {
let reader = processrunner::run_process(text).await;
let mut reader = reader.ok().unwrap();
while let Some(line) = reader.next_line().await.unwrap() {
// ctx.text(line);
println!("line = {}", line);
}
};

future.into_actor(self).spawn(ctx);
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => ctx.stop(),
}
}
}
使用 ctx 不起作用的代码段行未注释。
/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
// process websocket messages
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => {
let future = async move {
let reader = processrunner::run_process(text).await;
let mut reader = reader.ok().unwrap();
while let Some(line) = reader.next_line().await.unwrap() {
ctx.text(line);
println!("line = {}", line);
}
};

future.into_actor(self).spawn(ctx);
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => ctx.stop(),
}
}
}
完整的代码片段拆分为两个文件。
主文件
//! Simple echo websocket server.
//! Open `http://localhost:8080/ws/index.html` in browser
//! or [python console client](https://github.com/actix/examples/blob/master/websocket/websocket-client.py)
//! could be used for testing.
mod processrunner;
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_files as fs;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

/// do websocket handshake and start `MyWebSocket` actor
async fn ws_index(r: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
println!("{:?}", r);
let res = ws::start(MyWebSocket::new(), &r, stream);
println!("{:?}", res);
res
}

/// websocket connection is long running connection, it easier
/// to handle with an actor
struct MyWebSocket {
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
}

impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;

/// Method is called on actor start. We start the heartbeat process here.
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
}
}

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
// process websocket messages
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => {
let future = async move {
let reader = processrunner::run_process(text).await;
let mut reader = reader.ok().unwrap();
while let Some(line) = reader.next_line().await.unwrap() {
// ctx.text(line);
println!("line = {}", line);
}
};

future.into_actor(self).spawn(ctx);
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => ctx.stop(),
}
}
}

impl MyWebSocket {
fn new() -> Self {
Self { hb: Instant::now() }
}

/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");

// stop actor
ctx.stop();

// don't try to send a ping
return;
}

ctx.ping(b"");
});
}
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info");
env_logger::init();

HttpServer::new(|| {
App::new()
// enable logger
.wrap(middleware::Logger::default())
// websocket route
.service(web::resource("/ws/").route(web::get().to(ws_index)))
// static files
.service(fs::Files::new("/", "static/").index_file("index.html"))
})
// start http server on 127.0.0.1:8080
.bind("127.0.0.1:8080")?
.run()
.await
}

processrunner.rs
extern crate tokio;
use tokio::io::*;
use tokio::process::Command;

use std::process::Stdio;

//#[tokio::main]
pub async fn run_process(
text: String,
) -> std::result::Result<
tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
Box<dyn std::error::Error>,
> {
let mut cmd = Command::new(text);
cmd.stdout(Stdio::piped());

let mut child = cmd.spawn().expect("failed to spawn command");

let stdout = child
.stdout
.take()
.expect("child did not have a handle to stdout");

let lines = BufReader::new(stdout).lines();

// Ensure the child process is spawned in the runtime so it can
// make progress on its own while we await for any output.
tokio::spawn(async {
let status = child.await.expect("child process encountered an error");

println!("child status was: {}", status);
});
Ok(lines)
}
错误:
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> src/main.rs:57:41
|
57 | let future = async move {
| _________________________________________^
58 | | let reader = processrunner::run_process(text).await;
59 | | let mut reader = reader.ok().unwrap();
60 | | while let Some(line) = reader.next_line().await.unwrap() {
... |
63 | | }
64 | | };
| |_________________^
|
note: first, the lifetime cannot outlive the anonymous lifetime #2 defined on the method body at 45:5...
--> src/main.rs:45:5
|
45 | / fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
46 | | // process websocket messages
47 | | println!("WS: {:?}", msg);
48 | | match msg {
... |
74 | | }
75 | | }
| |_____^
note: ...so that the types are compatible
--> src/main.rs:57:41
|
57 | let future = async move {
| _________________________________________^
58 | | let reader = processrunner::run_process(text).await;
59 | | let mut reader = reader.ok().unwrap();
60 | | while let Some(line) = reader.next_line().await.unwrap() {
... |
63 | | }
64 | | };
| |_________________^
= note: expected `&mut actix_web_actors::ws::WebsocketContext<MyWebSocket>`
found `&mut actix_web_actors::ws::WebsocketContext<MyWebSocket>`
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `actix::fut::FutureWrap<impl std::future::Future, MyWebSocket>` will meet its required lifetime bounds
--> src/main.rs:66:41
|
66 | future.into_actor(self).spawn(ctx);
| ^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0495`.
cargo
[package]
name = "removed"
version = "0.1.0"
authors = ["removed"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2", features = ["full"] }
actix = "0.10"
actix-codec = "0.3"
actix-web = "3"
actix-web-actors = "3"
actix-files = "0.3"
awc = "2"
env_logger = "0.7"
futures = "0.3.1"
bytes = "0.5.3"

最佳答案

以下是基础知识。您可能需要在这里和那里做一些工作,但这有效。

use actix::prelude::*;
use tokio::process::Command;
use actix_web::{ web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use tokio::io::{ AsyncBufReadExt};
use actix::AsyncContext;
use tokio::stream::{ StreamExt};

use tokio::io::{BufReader};

use std::process::Stdio;

#[derive(Message)]
#[rtype(result = "Result<(), ()>")]
struct CommandRunner(String);


/// Define HTTP actor
struct MyWs;

impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
}

#[derive(Debug)]
struct Line(String);

impl StreamHandler<Result<Line, ws::ProtocolError>> for MyWs {
fn handle(
&mut self,
msg: Result<Line, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg {
Ok(line) => ctx.text(line.0),
_ => () //Handle errors
}
}
}

/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
ctx.notify(CommandRunner(text.to_string()));
},
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}

impl Handler<CommandRunner> for MyWs {
type Result = Result<(), ()>;
fn handle(&mut self, msg: CommandRunner, ctx: &mut Self::Context) -> Self::Result {
let mut cmd = Command::new(msg.0);

// Specify that we want the command's standard output piped back to us.
// By default, standard input/output/error will be inherited from the
// current process (for example, this means that standard input will
// come from the keyboard and standard output/error will go directly to
// the terminal if this process is invoked from the command line).
cmd.stdout(Stdio::piped());

let mut child = cmd.spawn()
.expect("failed to spawn command");

let stdout = child.stdout.take()
.expect("child did not have a handle to stdout");

let reader = BufReader::new(stdout).lines();

// Ensure the child process is spawned in the runtime so it can
// make progress on its own while we await for any output.
let fut = async move {
let status = child.await
.expect("child process encountered an error");

println!("child status was: {}", status);
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);
ctx.add_stream(reader.map(|l| Ok(Line(l.expect("Not a line")))));
Ok(())
}
}

async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
println!("{:?}", resp);
resp
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
.bind("127.0.0.1:8080")?
.run()
.await
}
运行 ls看起来像这样。
Example

关于asynchronous - 如何在 Actix-web 中的 WebSocket 处理程序中正确调用异步函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64434912/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com