gpt4 book ai didi

async-await - 使用 future 从套接字列表中选择

转载 作者:行者123 更新时间:2023-11-29 07:46:59 25 4
gpt4 key购买 nike

我正在使用 futures-preview = "0.3.0-alpha.16"runtime = "0.3.0 在 nightly Rust 1.38 中尝试尚未稳定的 async-await 语法-alpha.6"。感觉真的很酷,但是文档(还)很少,我被卡住了。

超越basic examples我想创建一个应用程序:

  1. 接受给定端口上的 TCP 连接;
  2. 将从任何连接接收到的所有数据广播到所有事件连接。

现有的文档和示例让我走到了这一步:

#![feature(async_await)]
#![feature(async_closure)]

use futures::{
prelude::*,
select,
future::select_all,
io::{ReadHalf, WriteHalf, Read},
};

use runtime::net::{TcpListener, TcpStream};

use std::io;

async fn read_stream(mut reader: ReadHalf<TcpStream>) -> (ReadHalf<TcpStream>, io::Result<Box<[u8]>>) {
let mut buffer: Vec<u8> = vec![0; 1024];
match reader.read(&mut buffer).await {
Ok(len) => {
buffer.truncate(len);
(reader, Ok(buffer.into_boxed_slice()))
},
Err(err) => (reader, Err(err)),
}
}

#[runtime::main]
async fn main() -> std::io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:8080")?;
println!("Listening on {}", listener.local_addr()?);

let mut incoming = listener.incoming().fuse();
let mut writers: Vec<WriteHalf<TcpStream>> = vec![];
let mut reads = vec![];

loop {
select! {
maybe_stream = incoming.select_next_some() => {
let (mut reader, writer) = maybe_stream?.split();
writers.push(writer);
reads.push(read_stream(reader).fuse());
},
maybe_read = select_all(reads.iter()) => {
match maybe_read {
(reader, Ok(data)) => {
for writer in writers {
writer.write_all(data).await.ok(); // Ignore errors here
}
reads.push(read_stream(reader).fuse());
},
(reader, Err(err)) => {
let reader_addr = reader.peer_addr().unwrap();
writers.retain(|writer| writer.peer_addr().unwrap() != reader_addr);
},
}
}
}
}
}

这失败了:

error: recursion limit reached while expanding the macro `$crate::dispatch`
--> src/main.rs:36:9
|
36 | / select! {
37 | | maybe_stream = incoming.select_next_some() => {
38 | | let (mut reader, writer) = maybe_stream?.split();
39 | | writers.push(writer);
... |
55 | | }
56 | | }
| |_________^
|
= help: consider adding a `#![recursion_limit="128"]` attribute to your crate
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)

这很令人困惑。也许我以错误的方式使用了 select_all()?任何帮助使其工作的帮助都将不胜感激!

为了完整性,我的 Cargo.toml:

[package]
name = "async-test"
version = "0.1.0"
authors = ["xxx"]
edition = "2018"

[dependencies]
runtime = "0.3.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }

最佳答案

以防有人跟踪,我终于把它破解了。此代码有效:

#![feature(async_await)]
#![feature(async_closure)]
#![recursion_limit="128"]

use futures::{
prelude::*,
select,
stream,
io::ReadHalf,
channel::{
oneshot,
mpsc::{unbounded, UnboundedSender},
}
};

use runtime::net::{TcpListener, TcpStream};

use std::{
io,
net::SocketAddr,
collections::HashMap,
};

async fn read_stream(
addr: SocketAddr,
drop: oneshot::Receiver<()>,
mut reader: ReadHalf<TcpStream>,
sender: UnboundedSender<(SocketAddr, io::Result<Box<[u8]>>)>
) {
let mut drop = drop.fuse();
loop {
let mut buffer: Vec<u8> = vec![0; 1024];
select! {
result = reader.read(&mut buffer).fuse() => {
match result {
Ok(len) => {
buffer.truncate(len);
sender.unbounded_send((addr, Ok(buffer.into_boxed_slice())))
.expect("Channel error");
if len == 0 {
return;
}
},
Err(err) => {
sender.unbounded_send((addr, Err(err))).expect("Channel error");
return;
}
}
},
_ = drop => {
return;
},
}
}
}

enum Event {
Connection(io::Result<TcpStream>),
Message(SocketAddr, io::Result<Box<[u8]>>),
}

#[runtime::main]
async fn main() -> std::io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:8080")?;
eprintln!("Listening on {}", listener.local_addr()?);

let mut writers = HashMap::new();
let (sender, receiver) = unbounded();

let connections = listener.incoming().map(|maybe_stream| Event::Connection(maybe_stream));
let messages = receiver.map(|(addr, maybe_message)| Event::Message(addr, maybe_message));
let mut events = stream::select(connections, messages);

loop {
match events.next().await {
Some(Event::Connection(Ok(stream))) => {
let addr = stream.peer_addr().unwrap();
eprintln!("New connection from {}", addr);

let (reader, writer) = stream.split();
let (drop_sender, drop_receiver) = oneshot::channel();

writers.insert(addr, (writer, drop_sender));
runtime::spawn(read_stream(addr, drop_receiver, reader, sender.clone()));
},
Some(Event::Message(addr, Ok(message))) => {
if message.len() == 0 {
eprintln!("Connection closed by client: {}", addr);
writers.remove(&addr);
continue;
}
eprintln!("Received {} bytes from {}", message.len(), addr);
if &*message == b"quit\n" {
eprintln!("Dropping client {}", addr);
writers.remove(&addr);
continue;
}
for (&other_addr, (writer, _)) in &mut writers {
if addr != other_addr {
writer.write_all(&message).await.ok(); // Ignore errors
}
}
},
Some(Event::Message(addr, Err(err))) => {
eprintln!("Error reading from {}: {}", addr, err);
writers.remove(&addr);
},
_ => panic!("Event error"),
}
}
}

我使用一个 channel 并为每个客户端生成一个阅读任务。必须特别注意确保读者与作者分道扬镳:这就是使用 oneshot future 的原因。当 oneshot::Sender 被删除时,oneshot::Receiver future 解析为取消状态,这是一种通知机制,让读取任务知道是时候停止了。为了证明它有效,我们在收到“退出”消息后立即删除客户端。

可悲的是,关于 runtime::spawn 调用中未使用的 JoinHandle 有一个(看似无用的)警告,我真的不知道如何消除它.

关于async-await - 使用 future 从套接字列表中选择,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56930108/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com