gpt4 book ai didi

Rust future -- 将两个 future 结合在一起

转载 作者:行者123 更新时间:2023-11-29 08:35:21 26 4
gpt4 key购买 nike

我有这个代码

use futures::Map;
use futures::sink::SendAll;
use futures::sink::SinkFromErr;
use futures::stream::Forward;
use futures::sync::mpsc::Receiver;
use futures::sync::mpsc::Sender;
use futures::{Future, Stream, Sink};
use std::boxed::FnBox;
use tokio_core::reactor::Core;
use websocket::async::futures::stream::SplitSink;
use websocket::async::futures::stream::SplitStream;
use websocket::ClientBuilder;
use websocket;

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
Box::new(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();

let f = ClientBuilder::new("wss://...")
.unwrap()
.async_connect(None, &handle)
.from_err::<Error>()
.map(|(duplex, _)| duplex.split())
.and_then(|(sink, stream): (SplitSink<_>, SplitStream<_>)| {

let writer: Map<SendAll<SinkFromErr<SplitSink<_>, _>, _>, _> =
sink
.sink_from_err()
.send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
.map(|_| ());

// Trying to uncomment these lines:
// let reader =
// stream
// .forward(send);
//
// reader.join(writer)

// Comment this out:
writer
});

core.run(f).expect("Unable to run");
})
}

quick_error! {
#[derive(Debug)]
pub enum Error {
WebSocket(err: websocket::WebSocketError) {
from()
description("websocket error")
display("WebSocket error: {}", err)
cause(err)
}
Receiver(err: ()) {
description("receiver error")
display("Receiver error")
}
}
}

为了清楚起见,我添加了一些类型注释。这个版本编译,但我想要另一个 future ,从流 (stream) 读取并写入 send。我不能让它编译,我输入的错误是完全无法理解的。所以我的问题是:

  1. 如何编译 forward() 调用? (尝试启用注释掉的代码)
  2. 您是如何想出编译的代码的?以我的经验,由于类型太复杂且类型错误难以理解,因此无法编写和理解大量 futures 代码。

最佳答案

我需要两次 map_err 调用,一次用于映射来自 stream 的 websocket 错误,一次用于映射来自 send 的发送者错误:

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
Box::new(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();

let f = ClientBuilder::new("wss://...")
.unwrap()
.async_connect(None, &handle)
.from_err::<Error>()
.map(|(duplex, _)| duplex.split())
.and_then(|(sink, stream): (SplitSink<_>, SplitStream<_>)| {

let writer: Map<SendAll<SinkFromErr<SplitSink<_>, _>, _>, _> =
sink
.sink_from_err()
.send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
.map(|_| ());

let reader =
stream
.map_err(Error::WebSocket)
.forward(send.sink_map_err(Error::Sender));

reader.join(writer)
});

core.run(f).expect("Unable to run");
})
}

关于Rust future -- 将两个 future 结合在一起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46569399/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com