gpt4 book ai didi

multithreading - 使用rusoto使用rust AWS分段上传,对 'there is no reactor running …`感到 panic 的多线程(人造丝)

转载 作者:行者123 更新时间:2023-12-03 11:46:26 33 4
gpt4 key购买 nike

我正在尝试将文件上传到aws中的rust中,因为我使用的是 rusoto_s3 的s3 rust客户端,当这些部分从单个线程发送时,我设法使分段上传代码正常工作不是我想要的,我想上传大文件,并且希望能够以多个线程的形式发送这些部分,为此,我做了一些谷歌搜索,遇到了rayon
有关信息,分段上传的工作方式如下:

  • 启动多部分-> aws将返回ID
  • 使用此ID发送不同的零件,传递文件块,零件编号-> aws将返回Etag
  • 发送完所有部件后,请发送完整的上传请求,其中包含所有已完成的部件,因为数组包含Etag和部件号。

  • 我是新手,来自C++和Java背景,这是我的代码:
    #[tokio::test]
    async fn if_multipart_then_upload_multiparts_dicom() {
    let now = Instant::now();
    dotenv().ok();
    let local_filename = "./files/test_big.DCM";
    let destination_filename = "24_time_test.dcm";

    let mut file = std::fs::File::open(local_filename).unwrap();
    const CHUNK_SIZE: usize = 7_000_000;
    let mut buffer = Vec::with_capacity(CHUNK_SIZE);

    let client = super::get_client().await;
    let create_multipart_request = CreateMultipartUploadRequest {
    bucket: client.bucket_name.to_owned(),
    key: destination_filename.to_owned(),
    ..Default::default()
    };

    // Start the multipart upload and note the upload_id generated
    let response = client
    .s3
    .create_multipart_upload(create_multipart_request)
    .await
    .expect("Couldn't create multipart upload");
    let upload_id = response.upload_id.unwrap();

    // Create upload parts
    let create_upload_part = |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
    UploadPartRequest {
    body: Some(body.into()),
    bucket: client.bucket_name.to_owned(),
    key: destination_filename.to_owned(),
    upload_id: upload_id.to_owned(),
    part_number: part_number,
    ..Default::default()
    }
    };

    let completed_parts = Arc::new(Mutex::new(vec![]));

    rayon::scope(|scope| {
    let mut part_number = 1;
    loop {
    let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
    println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
    file.by_ref()
    .take(maximum_bytes_to_read as u64)
    .read_to_end(&mut buffer)
    .unwrap();

    println!("length: {}", buffer.len());
    println!("part_number: {}", part_number);
    if buffer.len() == 0 {
    // The file has ended.
    break;
    }

    let next_buffer = Vec::with_capacity(CHUNK_SIZE);
    let data_to_send = buffer;
    let completed_parts_cloned = completed_parts.clone();
    scope.spawn(move |_| {
    let part = create_upload_part(data_to_send.to_vec(), part_number);
    {
    let part_number = part.part_number;
    let client = executor::block_on(super::get_client());
    let response = executor::block_on(client.s3.upload_part(part));

    completed_parts_cloned.lock().unwrap().push(CompletedPart {
    e_tag: response
    .expect("Couldn't complete multipart upload")
    .e_tag
    .clone(),
    part_number: Some(part_number),
    });
    }
    });

    buffer = next_buffer;
    part_number = part_number + 1;
    }
    });

    let completed_upload = CompletedMultipartUpload {
    parts: Some(completed_parts.lock().unwrap().to_vec()),
    };

    let complete_req = CompleteMultipartUploadRequest {
    bucket: client.bucket_name.to_owned(),
    key: destination_filename.to_owned(),
    upload_id: upload_id.to_owned(),
    multipart_upload: Some(completed_upload),
    ..Default::default()
    };
    client
    .s3
    .complete_multipart_upload(complete_req)
    .await
    .expect("Couldn't complete multipart upload");
    println!(
    "time taken: {}, with chunk:: {}",
    now.elapsed().as_secs(),
    CHUNK_SIZE
    );
    }
    这是我得到的输出和错误的示例:
    maximum_bytes_to_read: 7000000
    length: 7000000
    part_number: 1
    maximum_bytes_to_read: 7000000
    length: 7000000
    part_number: 2
    maximum_bytes_to_read: 7000000
    thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
    note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
    thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
    length: 7000000
    我用谷歌搜索了这个错误,但是我对它到底是什么没有一个清晰的了解:
    there is no reactor running, must be called from the context of Tokio runtime” 
    这是我发现的:
    another question with the same error
    and another question
    这似乎是一些兼容性问题,因为s3可能使用的 tokio版本与我拥有的tokio版本不兼容。
    以下是一些相关的依赖项:
    tokio = { version = "1", features = ["full"] }
    tokio-compat-02 = "0.1.2"
    rusoto_s3 = "0.46.0"
    rusoto_core = "0.46.0"
    rusoto_credential = "0.46.0"
    rayon = "1.5.0"
    我认为主要问题实际上是想在 async线程中运行 rayon代码。我尝试将 async代码更改为使用 executor::block_on阻止代码,我也花了一些时间试图使编译器满意,我有多个线程都想写入 let completed_parts = Arc::new(Mutex::new(vec![]));,因此我在此处进行了一些克隆操作,以使编译器满意。
    另外,如果我用过的脚架很重要,它们是:
    extern crate dotenv;
    extern crate tokio;
    use bytes::Bytes;
    use dotenv::dotenv;
    use futures::executor;
    use futures::*;
    use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
    use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
    use rusoto_s3::PutObjectRequest;
    use rusoto_s3::StreamingBody;
    use rusoto_s3::{
    CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
    CreateMultipartUploadRequest, UploadPartRequest, S3,
    };

    use std::io::Read;
    use std::sync::{Arc, Mutex};
    use std::time::Duration;
    use std::time::Instant;
    use tokio::fs;
    使用rust 的新手,所以有很多 Action 片可以使这个正确!

    最佳答案

    感谢@Jmb的讨论,我摆脱了线程,将spawn任务变成tokio任务,如下所示:
    创建一个要持有的矢量或 future ,以便我们可以等待它们:

    let mut multiple_parts_futures = Vec::new();
    产生 async任务:
    loop { // loop file chuncks
    ...
    let send_part_task_future = tokio::task::spawn(async move {
    // Upload part
    ...
    }
    然后稍后等待所有 future :
    let _results = futures::future::join_all(multiple_parts_futures).await;
    值得一提的是,完成的部分需要分类:
    let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
    completed_parts_vector.sort_by_key(|part| part.part_number);
    整个代码是:
    #[tokio::test]
    async fn if_multipart_then_upload_multiparts_dicom() {
    let now = Instant::now();
    dotenv().ok();
    let local_filename = "./files/test_big.DCM";
    let destination_filename = generate_unique_name();
    let destination_filename_clone = destination_filename.clone();
    let mut file = std::fs::File::open(local_filename).unwrap();
    const CHUNK_SIZE: usize = 6_000_000;
    let mut buffer = Vec::with_capacity(CHUNK_SIZE);

    let client = super::get_client().await;
    let create_multipart_request = CreateMultipartUploadRequest {
    bucket: client.bucket_name.to_owned(),
    key: destination_filename.to_owned(),
    ..Default::default()
    };

    // Start the multipart upload and note the upload_id generated
    let response = client
    .s3
    .create_multipart_upload(create_multipart_request)
    .await
    .expect("Couldn't create multipart upload");
    let upload_id = response.upload_id.unwrap();

    let upload_id_clone = upload_id.clone();
    // Create upload parts
    let create_upload_part = move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
    UploadPartRequest {
    body: Some(body.into()),
    bucket: client.bucket_name.to_owned(),
    key: destination_filename_clone.to_owned(),
    upload_id: upload_id_clone.to_owned(),
    part_number: part_number,
    ..Default::default()
    }
    };

    let create_upload_part_arc = Arc::new(create_upload_part);
    let completed_parts = Arc::new(Mutex::new(vec![]));

    let mut part_number = 1;

    let mut multiple_parts_futures = Vec::new();
    loop {
    let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
    println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
    file.by_ref()
    .take(maximum_bytes_to_read as u64)
    .read_to_end(&mut buffer)
    .unwrap();
    println!("length: {}", buffer.len());
    println!("part_number: {}", part_number);
    if buffer.len() == 0 {
    // The file has ended.
    break;
    }
    let next_buffer = Vec::with_capacity(CHUNK_SIZE);
    let data_to_send = buffer;
    let completed_parts_cloned = completed_parts.clone();
    let create_upload_part_arc_cloned = create_upload_part_arc.clone();
    let send_part_task_future = tokio::task::spawn(async move {
    let part = create_upload_part_arc_cloned(data_to_send.to_vec(), part_number);
    {
    let part_number = part.part_number;
    let client = super::get_client().await;
    let response = client.s3.upload_part(part).await;
    completed_parts_cloned.lock().unwrap().push(CompletedPart {
    e_tag: response
    .expect("Couldn't complete multipart upload")
    .e_tag
    .clone(),
    part_number: Some(part_number),
    });
    }
    });
    multiple_parts_futures.push(send_part_task_future);
    buffer = next_buffer;
    part_number = part_number + 1;
    }
    let client = super::get_client().await;
    println!("waiting for futures");
    let _results = futures::future::join_all(multiple_parts_futures).await;

    let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
    completed_parts_vector.sort_by_key(|part| part.part_number);
    println!("futures done");
    let completed_upload = CompletedMultipartUpload {
    parts: Some(completed_parts_vector),
    };

    let complete_req = CompleteMultipartUploadRequest {
    bucket: client.bucket_name.to_owned(),
    key: destination_filename.to_owned(),
    upload_id: upload_id.to_owned(),
    multipart_upload: Some(completed_upload),
    ..Default::default()
    };

    client
    .s3
    .complete_multipart_upload(complete_req)
    .await
    .expect("Couldn't complete multipart upload");
    println!(
    "time taken: {}, with chunk:: {}",
    now.elapsed().as_secs(),
    CHUNK_SIZE
    );
    }

    关于multithreading - 使用rusoto使用rust AWS分段上传,对 'there is no reactor running …`感到 panic 的多线程(人造丝),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66558012/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com