- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在试验 tokio-tungstenite crate 基于 URL 创建聊天室。例如,我有一个连接到 ws://localhost:8080/abcd
的客户端。我的理解是我必须使用 tokio_tungstenite::accept_hdr_async
函数来访问 header 以获取 /abcd
路径,但我在使用它时遇到了问题。 copy_headers_callback
的第二个参数应该是什么?
我的代码基于 this example :
use std::{
collections::HashMap,
env,
io::Error as IoError,
net::SocketAddr,
sync::{Arc, Mutex},
marker::Unpin,
};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tungstenite::{
protocol::Message,
handshake::server::{Request},
};
type Sender = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Sender>>>;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct BroadcastJsonStruct {
message: String,
sender_addr: SocketAddr,
}
async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, client_addr: SocketAddr) {
println!("Incoming TCP connection from: {}, raw stream: {}", client_addr, raw_stream.local_addr().unwrap());
let copy_headers_callback = |request: &Request| {
for (name, value) in request.headers().iter() {
println!("Name: {}, value: ", name.to_string())
// headers.insert(name.to_string(), value.clone());
}
Ok(None)
};
//accept a new asynchronous WebSocket connection
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
copy_headers_callback,
)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", client_addr);
// Insert the write part of this peer to the peer map.
let (sender, receiver) = unbounded();
peer_map.lock().unwrap().insert(client_addr, sender);
//set up the incoming and outgoing
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
println!("Received a message from {}: {}", client_addr, msg.to_text().unwrap());
let peers = peer_map.lock().unwrap();
//make a new struct to be serialized
let broadcast_data = BroadcastJsonStruct {
message: msg.to_text().unwrap().to_owned(),
sender_addr: client_addr.to_owned(),
};
let new_msg = Message::Text(
serde_json::to_string(&broadcast_data).expect("problem serializing broadcast_data")
);
println!("New message {}", new_msg.to_text().unwrap());
// We want to broadcast the message to everyone except ourselves.
//filter returns addresses that aren't our current address
let broadcast_recipients =
peers.iter().filter(|(peer_addr, _)| peer_addr != &&client_addr).map(|(_, ws_sink)| ws_sink);
//send the message to all the recipients
for recp in broadcast_recipients {
recp.unbounded_send(new_msg.clone()).unwrap();
}
future::ok(())
});
let receive_from_others = receiver.map(Ok).forward(outgoing);
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
println!("{} disconnected", &client_addr);
peer_map.lock().unwrap().remove(&client_addr);
}
#[tokio::main]
async fn main() -> Result<(), IoError> {
//see if there is a server address specified in the command line argument, else use default address
let server_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
let state = PeerMap::new(Mutex::new(HashMap::new()));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&server_addr).await; //create the server on the address
let listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", server_addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, client_addr)) = listener.accept().await {
tokio::spawn(handle_connection(state.clone(), stream, client_addr));
}
Ok(())
}
我的错误:
error[E0593]: closure is expected to take 2 arguments, but it takes 1 argument
--> src/main.rs:45:9
|
34 | let copy_headers_callback = |request: &Request| {
| ------------------- takes 1 argument
...
45 | copy_headers_callback,
| ^^^^^^^^^^^^^^^^^^^^^ expected closure that takes 2 arguments
|
::: /Users/harryli/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.12.0/src/lib.rs:151:8
|
151 | C: Callback + Unpin,
| -------- required by this bound in `accept_hdr_async`
|
= note: required because of the requirements on the impl of `Callback` for `[closure@src/main.rs:34:33: 40:10]`
error: aborting due to previous error; 1 warning emitted
不确定我的方法是否合理。我是 Rust 的新手,所以我很感激任何见解!
最佳答案
我最终得到了像这样打印 header 名称和值的回调!
我没有在标题中找到路径/abcd
。我的解决方法是将 window.location.pathname
作为协议(protocol)传递,然后在服务器中访问该协议(protocol)。
下面的代码显示了一个服务器的工作示例,该服务器仅使用相同的协议(protocol)向客户端广播消息。
JS
const socket = new WebSocket("ws://localhost:8080", window.location.pathname.replace(/\//ig, "-")
Rust 回调:
let mut protocol = HeaderValue::from_static("");
let copy_headers_callback = |request: &Request, mut response: Response| -> Result<Response, ErrorResponse> {
for (name, value) in request.headers().iter() {
println!("Name: {}, value: {}", name.to_string(), value.to_str().expect("expected a value"));
}
//access the protocol in the request, then set it in the response
protocol = request.headers().get(SEC_WEBSOCKET_PROTOCOL).expect("the client should specify a protocol").to_owned(); //save the protocol to use outside the closure
let response_protocol = request.headers().get(SEC_WEBSOCKET_PROTOCOL).expect("the client should specify a protocol").to_owned();
response.headers_mut().insert(SEC_WEBSOCKET_PROTOCOL, response_protocol);
Ok(response)
};
//accept a new asynchronous WebSocket connection
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
copy_headers_callback,
)
.await
.expect("Error during the websocket handshake occurred");
完整的 Rust 代码:
use std::{
collections::HashMap,
env,
io::Error as IoError,
net::SocketAddr,
sync::{Arc, Mutex},
};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tungstenite::{
protocol::Message,
handshake::server::{Request, Response, ErrorResponse},
};
use http::header::{
HeaderValue,
SEC_WEBSOCKET_PROTOCOL,
};
type Sender = UnboundedSender<Message>;
struct PeerStruct {
protocol: HeaderValue,
sender: Sender,
}
type PeerMap = Arc<Mutex<HashMap<SocketAddr, PeerStruct>>>;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct BroadcastJsonStruct {
message: String,
sender_addr: SocketAddr,
}
async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, client_addr: SocketAddr) {
println!("Incoming TCP connection from: {}, raw stream: {}", client_addr, raw_stream.local_addr().unwrap());
let mut protocol = HeaderValue::from_static("");
let copy_headers_callback = |request: &Request, mut response: Response| -> Result<Response, ErrorResponse> {
for (name, value) in request.headers().iter() {
println!("Name: {}, value: {}", name.to_string(), value.to_str().expect("expected a value"));
}
//access the protocol in the request, then set it in the response
protocol = request.headers().get(SEC_WEBSOCKET_PROTOCOL).expect("the client should specify a protocol").to_owned(); //save the protocol to use outside the closure
let response_protocol = request.headers().get(SEC_WEBSOCKET_PROTOCOL).expect("the client should specify a protocol").to_owned();
response.headers_mut().insert(SEC_WEBSOCKET_PROTOCOL, response_protocol);
Ok(response)
};
//accept a new asynchronous WebSocket connection
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
copy_headers_callback,
)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", client_addr);
// Insert the write part of this peer to the peer map.
let (sender, receiver) = unbounded();
peer_map.lock().unwrap().insert(client_addr, PeerStruct {
protocol: protocol.to_owned(),
sender: sender,
});
//set up the incoming and outgoing
let (outgoing, incoming) = ws_stream.split();
//this function broadcasts messages to all other connected clients using the same protocol
let broadcast_incoming = incoming.try_for_each(|msg| {
println!("Received a message from {}: {}", client_addr, msg.to_text().unwrap());
let peers = peer_map.lock().unwrap();
//make a new struct to be serialized
let broadcast_data = BroadcastJsonStruct {
message: msg.to_text().unwrap().to_owned(),
sender_addr: client_addr.to_owned(),
};
let new_msg = Message::Text(
serde_json::to_string(&broadcast_data).expect("problem serializing broadcast_data")
);
println!("New message {}", new_msg.to_text().unwrap());
//filter addresses that aren't the message sender's address AND are using the same protocol
let broadcast_recipients = peers.iter().filter(
|(peer_addr, _)|
peer_addr != &&client_addr
&& peers.get(peer_addr).expect("peer_addr should be a key in the HashMap").protocol.to_str().expect("expected a string")==protocol.to_str().expect("expected a string")
).map(|(_, ws_sink)| ws_sink);
//send the message to all the recipients
for recp in broadcast_recipients {
recp.sender.unbounded_send(new_msg.clone()).unwrap();
}
future::ok(())
});
let receive_from_others = receiver.map(Ok).forward(outgoing);
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
println!("{} disconnected", &client_addr);
peer_map.lock().unwrap().remove(&client_addr);
}
#[tokio::main]
async fn main() -> Result<(), IoError> {
//see if there is a server address specified in the command line argument, else use default address
let server_addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
let state = PeerMap::new(Mutex::new(HashMap::new()));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&server_addr).await; //create the server on the address
let listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", server_addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, client_addr)) = listener.accept().await {
tokio::spawn(handle_connection(state.clone(), stream, client_addr));
}
Ok(())
}
cargo .toml
[package]
name = "server"
version = "0.1.0"
authors = ["harryli0088 <harryli0088@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio-tungstenite = "*"
tokio = { version = "0.3", features = ["full"] }
futures-channel = "0.3"
futures-util = "0.3.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
http = "0.2.2"
[dependencies.tungstenite]
version = "0.11.1"
default-features = false
关于rust - 使用 tokio-tungstenite 时如何获取 header ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65376514/
编辑备注 由于 Rust(版本:1.42)仍然没有稳定的 ABI ,推荐使用extern (目前相当于extern "C"(将来可能会改变))否则,可能需要重新编译库。 This article解释如
词法分析器/解析器文件位于 here非常大,我不确定它是否适合只检索 Rust 函数列表。也许我自己编写/使用另一个库是更好的选择? 最终目标是创建一种执行管理器。为了上下文化,它将能够读取包装在函数
我试图在 Rust 中展平 Enum 的向量,但我遇到了一些问题: enum Foo { A(i32), B(i32, i32), } fn main() { let vf =
我正在 64 位模式下运行的 Raspberry Pi 3 上使用 Rust 进行裸机编程。我已经实现了一个自旋锁,如下所示: use core::{sync::atomic::{AtomicBool
我无法理解以下示例是如何从 this code 中提炼出来的, 编译: trait A: B {} trait B {} impl B for T where T: A {} struct Foo;
在我写了一些代码和阅读了一些文章之后,我对 Rust 中的移动语义有点困惑,我认为值移动后,它应该被释放,内存应该是无效的。所以我尝试写一些代码来作证。 第一个例子 #[derive(Debug)]
https://doc.rust-lang.org/reference/types/closure.html#capture-modes struct SetVec { set: HashSe
考虑 const-generic 数据结构的经典示例:方矩阵。 struct Matrix { inner: [[T; N]; N] } 我想返回一个结构体,其 const 参数是动态定义的:
以下代码无法编译,因为 x在移动之后使用(因为 x 具有类型 &mut u8 ,它没有实现 Copy 特性) fn main() { let mut a: u8 = 1; let x:
我在玩 Rust,发现了下面的例子: fn main() { let mut x = [3, 4, 5].to_vec(); x; println!("{:?}", x); }
假设一个 Rust 2018 宏定义了一个 async里面的功能。它将使用的语法与 Rust 2015 不兼容。因此,如果您使用 2015 版编译您的 crate,那么宏中的扩展代码不会与它冲突吗?
假设我有一些 Foo 的自定义集合s: struct Bar {} struct Foo { bar: Bar } struct SubList { contents: Vec, }
代码如下: fn inner(x:&'a i32, _y:&'b i32) -> &'b i32 { x } fn main() { let a = 1; { let b
在lifetime_things的定义中,'b的生命周期比'a长,但实际上当我调用这个函数时,x1比y1长,但是这样可以编译成功: //here you could see 'b:'a means
我正在尝试检索 FLTK-RS Widget 周围的 Arc Mutex 包装器的内部值: pub struct ArcWidget(Arc>); impl ArcWidget{ pub
如下代码所示,我想封装一个定时函数,返回一个闭包的结果和执行时间。 use tap::prelude::Pipe; use std::time::{Instant, Duration}; pub fn
我想实现自己的通用容器,这是我正在使用的特征的片段: pub trait MyVec where Self: Default + Clone + IntoIterator, Self:
所需代码: 注释掉的块可以编译并工作,但是我想从嵌套的匹配样式转变为更简洁的函数链 async fn ws_req_resp(msg: String, conn: PgConn) -> Result>
我正在尝试编写一些代码,该代码将生成具有随机值的随机结构。对于结构,我具有以下特征和帮助程序宏: use rand::{thread_rng, Rng}; use std::fmt; pub trai
我有一个带有函数成员的结构: struct Foo { fun: Box, } type FooI = Foo; 这不起作用: error[E0106]: missing lifetime s
我是一名优秀的程序员,十分优秀!