gpt4 book ai didi

redis - 如何在异步 tokio 运行时中将 future::join_all 与多路复用的 redis 一起使用

转载 作者:行者123 更新时间:2023-12-03 11:26:28 25 4
gpt4 key购买 nike

我正在尝试使用 Rust redis client在异步复用模式下,用tokio作为异步运行时,以及要加入的动态数量的 future 。

我在一定数量的 future 上使用 future::join3 取得了成功,但我想多路复用更多命令(编译时不必知 Prop 体大小,但即使那将是一个改进)。

这是使用 future::join3 时的工作示例;该示例正确打印好的(一些("PONG")) 好的(一些("PONG")) 好的(一些("PONG"))

Cargo.toml

[package]
name = "redis_sample"
version = "0.1.0"
authors = ["---"]
edition = "2018"


[dependencies]
redis = { version = "0.17.0", features = ["aio", "tokio-comp", "tokio-rt-core"] }
tokio = { version = "0.2.23", features = ["full"] }
futures = "0.3.8"

src/main.rs

use futures::future;
use redis::RedisResult;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

let results: (RedisResult<Option<String>>, RedisResult<Option<String>>, RedisResult<Option<String>>) = future::join3(
redis::cmd("PING").query_async(&mut redis_connection.clone()),
redis::cmd("PING").query_async(&mut redis_connection.clone()),
redis::cmd("PING").query_async(&mut redis_connection),
).await;

println!("{:?} {:?} {:?}", results.0, results.1, results.2);

Ok(())
}

现在我想做同样的事情,但是使用 n 命令(比如说 10,但理想情况下我想将其调整为生产性能)。这是我所能得到的,但我无法克服借用规则;我尝试在 Vec 中存储一些中介(redis Cmd 或 future 本身)以延长它们的生命,但这还有其他问题(有多个 mut 引用)。

Cargo.toml 是一样的;这是 main.rs

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
for _ in 0..BATCH_SIZE {
commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
}
let results = future::join_all(commands).await;

println!("{:?}", results);

Ok(())
}

我收到了两个编译器警告(创建了一个在使用中被释放的临时文件),我不知道如何继续使用这段代码。我不是 100% 迷上了使用 Pin,但没有它我什至无法存储 future 。

完整的编译器输出:

   Compiling redis_sample v0.1.0 (/Users/gyfis/Documents/programming/rust/redis_sample)
error[E0716]: temporary value dropped while borrowed
--> redis_sample/src/main.rs:14:32
|
14 | commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
| ^^^^^^^^^^^^^^^^^^ - temporary value is freed at the end of this statement
| |
| creates a temporary which is freed while still in use
...
21 | }
| - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
|
= note: consider using a `let` binding to create a longer lived value

error[E0716]: temporary value dropped while borrowed
--> redis_sample/src/main.rs:14:69
|
14 | commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
| ^^^^^^^^^^^^^^^^^^^^^^^^ - temporary value is freed at the end of this statement
| |
| creates a temporary which is freed while still in use
...
21 | }
| - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
|
= note: consider using a `let` binding to create a longer lived value

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0716`.
error: could not compile `redis_sample`.

感谢任何帮助!

最佳答案

这应该可行,我只是延长了 redis_connection 的生命周期。

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
for _ in 0..BATCH_SIZE {
let mut redis_connection = redis_connection.clone();
commands.push(Box::pin(async move {
redis::cmd("PING").query_async(&mut redis_connection).await
}));
}
let results = future::join_all(commands).await;

println!("{:?}", results);

Ok(())
}

因为你在一个函数体内,你甚至不需要装箱 futures,类型推断可以完成所有工作:

use futures::future;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

let mut commands = vec![];
for _ in 0..BATCH_SIZE {
let mut redis_connection = redis_connection.clone();
commands.push(async move {
redis::cmd("PING").query_async::<_, Option<String>>(&mut redis_connection).await
});
}
let results = future::join_all(commands).await;

println!("{:?}", results);

Ok(())
}

关于redis - 如何在异步 tokio 运行时中将 future::join_all 与多路复用的 redis 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65380313/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com