gpt4 book ai didi

asynchronous - 从 tokio 套接字反序列化

转载 作者:行者123 更新时间:2023-11-29 07:58:37 24 4
gpt4 key购买 nike

我正在使用 tokio 来实现一个服务器,该服务器与使用 serde (bincode) 序列化的消息进行通信。没有异步和 future 我会做

extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;

type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;

// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
let message: Item = deserialize_from(reader, Infinite)?;
Ok(message)
}

fn main() {

let ser = serialize("Test", Infinite).unwrap();
let buf = Cursor::new(ser);

let mut reader = std::io::BufReader::new(buf);

println!("{:?}", decode(&mut reader))
}

但我需要的是一个decode 函数,它可以像 asyncronous socket 一样工作

// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
// Does not work:
let message: Item = deserialize_from(reader, Infinite)?;
Ok(message)
}

我唯一的想法是在编码时手动将长度写入缓冲区,然后使用 read_exact:

// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
let size = serialized_size(item);
let mut buf = serialize(&size, Infinite).unwrap();
let ser = serialize(item, Infinite).unwrap();
buf.extend(ser);
Ok(buf)
}

// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
where R: AsyncRead + 'static {

let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
let size = deserialize::<u64>(&mut &buf[..]).unwrap();
Ok((reader, size as usize))
}).and_then(|(reader, size)| {
read_exact(reader, vec![0u8; size])
}).and_then(|(reader, buf)| {
let item = deserialize(&mut &buf[..]).unwrap();
Ok(item)
});

Box::new(read)
}

fn main() {

let ser = encode_async(&String::from("Test")).unwrap();
let buf = Cursor::new(ser);

let mut reader = std::io::BufReader::new(buf);
let dec = decode_async(reader).wait();

println!("{:?}", dec)
}

有没有更好的方式实现解码?

最佳答案

deserialize_from无法处理 IO 错误,尤其是那种 WouldBlock这是由异步(非阻塞)Readerer 在等待更多数据时返回的。这是受接口(interface)限制:deserialize_from 不返回 Future 或部分状态,它返回完整解码的 Result 并且不会知道如何将 Readerer 与事件循环结合起来以在不忙循环的情况下处理 WouldBlock

理论上可以实现一个async_deserialize_from,但是不能使用serde提供的接口(interface),除非你提前读取完整的数据进行解码,否则会失败目的。

您需要使用 tokio_io::io::read_to_end 读取完整数据或 tokio_io::io::read_exact (您当前使用的是什么),如果您知道“无休止”流(或后跟其他数据的流)中编码数据的大小。

关于asynchronous - 从 tokio 套接字反序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48558605/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com