- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将文件上传到aws
中的rust
中,因为我使用的是 rusoto_s3
的s3 rust客户端,当这些部分从单个线程发送时,我设法使分段上传代码正常工作不是我想要的,我想上传大文件,并且希望能够以多个线程的形式发送这些部分,为此,我做了一些谷歌搜索,遇到了rayon。
有关信息,分段上传的工作方式如下:
Etag
Etag
和部件号。 #[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
let now = Instant::now();
dotenv().ok();
let local_filename = "./files/test_big.DCM";
let destination_filename = "24_time_test.dcm";
let mut file = std::fs::File::open(local_filename).unwrap();
const CHUNK_SIZE: usize = 7_000_000;
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let client = super::get_client().await;
let create_multipart_request = CreateMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
..Default::default()
};
// Start the multipart upload and note the upload_id generated
let response = client
.s3
.create_multipart_upload(create_multipart_request)
.await
.expect("Couldn't create multipart upload");
let upload_id = response.upload_id.unwrap();
// Create upload parts
let create_upload_part = |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
part_number: part_number,
..Default::default()
}
};
let completed_parts = Arc::new(Mutex::new(vec![]));
rayon::scope(|scope| {
let mut part_number = 1;
loop {
let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
println!("length: {}", buffer.len());
println!("part_number: {}", part_number);
if buffer.len() == 0 {
// The file has ended.
break;
}
let next_buffer = Vec::with_capacity(CHUNK_SIZE);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
scope.spawn(move |_| {
let part = create_upload_part(data_to_send.to_vec(), part_number);
{
let part_number = part.part_number;
let client = executor::block_on(super::get_client());
let response = executor::block_on(client.s3.upload_part(part));
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag
.clone(),
part_number: Some(part_number),
});
}
});
buffer = next_buffer;
part_number = part_number + 1;
}
});
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts.lock().unwrap().to_vec()),
};
let complete_req = CompleteMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
client
.s3
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
println!(
"time taken: {}, with chunk:: {}",
now.elapsed().as_secs(),
CHUNK_SIZE
);
}
这是我得到的输出和错误的示例:
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 1
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 2
maximum_bytes_to_read: 7000000
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
length: 7000000
我用谷歌搜索了这个错误,但是我对它到底是什么没有一个清晰的了解:
there is no reactor running, must be called from the context of Tokio runtime”
这是我发现的:
tokio
版本与我拥有的tokio版本不兼容。
tokio = { version = "1", features = ["full"] }
tokio-compat-02 = "0.1.2"
rusoto_s3 = "0.46.0"
rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rayon = "1.5.0"
我认为主要问题实际上是想在
async
线程中运行
rayon
代码。我尝试将
async
代码更改为使用
executor::block_on
阻止代码,我也花了一些时间试图使编译器满意,我有多个线程都想写入
let completed_parts = Arc::new(Mutex::new(vec![]));
,因此我在此处进行了一些克隆操作,以使编译器满意。
extern crate dotenv;
extern crate tokio;
use bytes::Bytes;
use dotenv::dotenv;
use futures::executor;
use futures::*;
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
use rusoto_s3::PutObjectRequest;
use rusoto_s3::StreamingBody;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, UploadPartRequest, S3,
};
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::fs;
使用rust 的新手,所以有很多 Action 片可以使这个正确!
最佳答案
感谢@Jmb的讨论,我摆脱了线程,将spawn
任务变成tokio
任务,如下所示:
创建一个要持有的矢量或 future ,以便我们可以等待它们:
let mut multiple_parts_futures = Vec::new();
产生
async
任务:
loop { // loop file chuncks
...
let send_part_task_future = tokio::task::spawn(async move {
// Upload part
...
}
然后稍后等待所有 future :
let _results = futures::future::join_all(multiple_parts_futures).await;
值得一提的是,完成的部分需要分类:
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
整个代码是:
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
let now = Instant::now();
dotenv().ok();
let local_filename = "./files/test_big.DCM";
let destination_filename = generate_unique_name();
let destination_filename_clone = destination_filename.clone();
let mut file = std::fs::File::open(local_filename).unwrap();
const CHUNK_SIZE: usize = 6_000_000;
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let client = super::get_client().await;
let create_multipart_request = CreateMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
..Default::default()
};
// Start the multipart upload and note the upload_id generated
let response = client
.s3
.create_multipart_upload(create_multipart_request)
.await
.expect("Couldn't create multipart upload");
let upload_id = response.upload_id.unwrap();
let upload_id_clone = upload_id.clone();
// Create upload parts
let create_upload_part = move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: client.bucket_name.to_owned(),
key: destination_filename_clone.to_owned(),
upload_id: upload_id_clone.to_owned(),
part_number: part_number,
..Default::default()
}
};
let create_upload_part_arc = Arc::new(create_upload_part);
let completed_parts = Arc::new(Mutex::new(vec![]));
let mut part_number = 1;
let mut multiple_parts_futures = Vec::new();
loop {
let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
println!("length: {}", buffer.len());
println!("part_number: {}", part_number);
if buffer.len() == 0 {
// The file has ended.
break;
}
let next_buffer = Vec::with_capacity(CHUNK_SIZE);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
let create_upload_part_arc_cloned = create_upload_part_arc.clone();
let send_part_task_future = tokio::task::spawn(async move {
let part = create_upload_part_arc_cloned(data_to_send.to_vec(), part_number);
{
let part_number = part.part_number;
let client = super::get_client().await;
let response = client.s3.upload_part(part).await;
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag
.clone(),
part_number: Some(part_number),
});
}
});
multiple_parts_futures.push(send_part_task_future);
buffer = next_buffer;
part_number = part_number + 1;
}
let client = super::get_client().await;
println!("waiting for futures");
let _results = futures::future::join_all(multiple_parts_futures).await;
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
println!("futures done");
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts_vector),
};
let complete_req = CompleteMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
client
.s3
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
println!(
"time taken: {}, with chunk:: {}",
now.elapsed().as_secs(),
CHUNK_SIZE
);
}
关于multithreading - 使用rusoto使用rust AWS分段上传,对 'there is no reactor running …`感到 panic 的多线程(人造丝),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66558012/
编辑备注 由于 Rust(版本:1.42)仍然没有稳定的 ABI ,推荐使用extern (目前相当于extern "C"(将来可能会改变))否则,可能需要重新编译库。 This article解释如
词法分析器/解析器文件位于 here非常大,我不确定它是否适合只检索 Rust 函数列表。也许我自己编写/使用另一个库是更好的选择? 最终目标是创建一种执行管理器。为了上下文化,它将能够读取包装在函数
我试图在 Rust 中展平 Enum 的向量,但我遇到了一些问题: enum Foo { A(i32), B(i32, i32), } fn main() { let vf =
我正在 64 位模式下运行的 Raspberry Pi 3 上使用 Rust 进行裸机编程。我已经实现了一个自旋锁,如下所示: use core::{sync::atomic::{AtomicBool
我无法理解以下示例是如何从 this code 中提炼出来的, 编译: trait A: B {} trait B {} impl B for T where T: A {} struct Foo;
在我写了一些代码和阅读了一些文章之后,我对 Rust 中的移动语义有点困惑,我认为值移动后,它应该被释放,内存应该是无效的。所以我尝试写一些代码来作证。 第一个例子 #[derive(Debug)]
https://doc.rust-lang.org/reference/types/closure.html#capture-modes struct SetVec { set: HashSe
考虑 const-generic 数据结构的经典示例:方矩阵。 struct Matrix { inner: [[T; N]; N] } 我想返回一个结构体,其 const 参数是动态定义的:
以下代码无法编译,因为 x在移动之后使用(因为 x 具有类型 &mut u8 ,它没有实现 Copy 特性) fn main() { let mut a: u8 = 1; let x:
我在玩 Rust,发现了下面的例子: fn main() { let mut x = [3, 4, 5].to_vec(); x; println!("{:?}", x); }
假设一个 Rust 2018 宏定义了一个 async里面的功能。它将使用的语法与 Rust 2015 不兼容。因此,如果您使用 2015 版编译您的 crate,那么宏中的扩展代码不会与它冲突吗?
假设我有一些 Foo 的自定义集合s: struct Bar {} struct Foo { bar: Bar } struct SubList { contents: Vec, }
代码如下: fn inner(x:&'a i32, _y:&'b i32) -> &'b i32 { x } fn main() { let a = 1; { let b
在lifetime_things的定义中,'b的生命周期比'a长,但实际上当我调用这个函数时,x1比y1长,但是这样可以编译成功: //here you could see 'b:'a means
我正在尝试检索 FLTK-RS Widget 周围的 Arc Mutex 包装器的内部值: pub struct ArcWidget(Arc>); impl ArcWidget{ pub
如下代码所示,我想封装一个定时函数,返回一个闭包的结果和执行时间。 use tap::prelude::Pipe; use std::time::{Instant, Duration}; pub fn
我想实现自己的通用容器,这是我正在使用的特征的片段: pub trait MyVec where Self: Default + Clone + IntoIterator, Self:
所需代码: 注释掉的块可以编译并工作,但是我想从嵌套的匹配样式转变为更简洁的函数链 async fn ws_req_resp(msg: String, conn: PgConn) -> Result>
我正在尝试编写一些代码,该代码将生成具有随机值的随机结构。对于结构,我具有以下特征和帮助程序宏: use rand::{thread_rng, Rng}; use std::fmt; pub trai
我有一个带有函数成员的结构: struct Foo { fun: Box, } type FooI = Foo; 这不起作用: error[E0106]: missing lifetime s
我是一名优秀的程序员,十分优秀!