gpt4 book ai didi

node.js - 将内存中的文件下载并上传到 Google Drive

转载 作者:行者123 更新时间:2023-12-04 15:08:53 33 4
gpt4 key购买 nike

目标
使用 Google Drive APIs Resumable URL 将文件下载并上传到纯内存中的 Google Drive。
挑战/问题

I want to buffer the file as its being downloaded to memory (not filesystem) and subsequently upload to Google Drive.Google Drive API requires chunks to be a minimum length of 256 * 1024, (262144 bytes).


The process should pass a chunk from the buffer to be uploaded. If the chunk errors, that buffer chunk is retried up to 3 times. If the chunk succeeds, that chunk from the buffer should be cleared, and the process should continue until complete.


背景工作/研究 (引用以下)
我研究和测试过的大多数文章、示例和软件包都对流、管道和分块提供了一些见解,但请使用 filesystem作为可读流的起点。
我尝试了不同的方法,比如 passthroughhighWaterMark和第三方库,例如 request , gaxios , 和 got内置流/管道支持,但在过程的上传端无济于事。
意思是,我不知道如何构建 pipingchunking机制,是否带 bufferpipeline正确地流向上传过程直到完成,并以有效的方式处理进度和完成事件。
问题
  • 使用下面的代码,我如何适本地缓冲文件和 PUT到谷歌提供的正确 URL 的 Content-LengthContent-Range头,同时有足够的缓冲区空间来处理 3 次重试?
  • 在处理背压或缓冲方面,正在利用 .cork() .uncork()管理缓冲流的有效方法?
  • 有没有办法使用 TransformhighWaterMark 一起流和 pipeline有效地管理缓冲区?例如...
  • pipeline(
    downloadStream,
    transformStream,
    uploadStream,
    (err) => {
    if (err) {
    reject(err)
    } else {
    resolve(true)
    }
    }
    )
    下面是一个可视化模型和我想要完成的代码:
    可视化示例
    [====================]
    File Length (20 MB)

    [========== ]
    Download (10 MB)

    [====== ]
    Buffer (e.g. 6 MB, size 12 MB)

    [===]
    Upload Chunk (3 MB) => Error? Retry from Buffer (max 3 times)
    => Success? Empty Buffer => Continue =>
    [===]
    Upload next Chunk (3 MB)
    代码
    /* 
    Assume resumable_drive_url was already obtained from Google API
    with the proper access token, which already contains the
    Content-Type and Content-Length in the session.
    */

    transfer(download_url, resumable_drive_url, file_type, file_length) {

    return new Promise((resolve, reject) => {

    let timeout = setTimeout(() => {
    reject(new Error("Transfer timed out."))
    }, 80000)


    // Question #1: Should the passthrough stream
    // and .on events be declared here?

    const passthrough = new stream.PassThrough({
    highWaterMark: 256 * 1024
    })

    passthrough.on("error", (error) => {
    console.error(`Upload failed: ${error.message}`)
    reject(error.message)
    })

    passthrough.on("end", () => {
    clearTimeout(timeout)
    resolve(true)
    })


    // Download file
    axios({
    method: 'get',
    url: download_url,
    responseType: 'stream',
    maxRedirects: 1
    }).then(result => {

    // QUESTION #2: How do we buffer the file from here
    // via axios.put to the resumable_url with the correct
    // header information Content-Range and Content-Length?

    // CURIOSITY #1: Do we pipe from here
    // to a passthrough stream that maintains a minimum buffer size?

    result.data.pipe(passthrough)
    }
    ).catch(error => {
    reject(error)
    })


    })
    }
    引用文献
  • Chunked Upload Class -(体面的分块机制但臃肿;似乎有一种更有效的流管道方法)
  • Google Drive API v3 - Upload via Resumable URL with Multiple Requests
  • resumableUpload.js -(概念上正确,但使用文件系统)
  • Google-Drive-Uploader -(概念上正确,但使用文件系统和自定义 StreamFactory)
  • Resumable upload in Drive Rest API V3 - (体面但似乎臃肿和过时)
  • 最佳答案

    我相信你的目标和现状如下。

  • 您想下载数据并将下载的数据使用 Axios 和 Node.js 上传到 Google Drive。
  • 对于上传数据,您希望通过从流中检索数据,使用具有多个块的可恢复上传进行上传。
  • 您的访问 token 可用于将数据上传到 Google Drive。
  • 您已经知道要上传的数据的数据大小和 mimeType。

  • retrofit 要点:
  • 在这种情况下,为了实现多块的可续传,我想提出以下流程。
  • 从 URL 下载数据。
  • 为可恢复上传创建 session 。
  • 从流中检索下载的数据并将其转换为缓冲区。
  • 为此,我使用了 stream.Transform .
  • 在这种情况下,我停止流并将数据上传到 Google Drive。我想不出在不停止流的情况下可以实现的方法。
  • 我认为这部分可能是您的问题 2 和 3 的答案。

  • 当缓冲区大小与声明的块大小相同时,将缓冲区上传到 Google Drive。
  • 我认为这部分可能是您问题 3 的答案。

  • 当上传发生错误时,再次上传相同的缓冲区。在此示例脚本中,运行了 3 次重试。 3 次重试后,发生错误。
  • 我认为这部分可能是您问题 1 的答案。



  • 当上述流程反射(reflect)到您的脚本时,它变成如下。
    修改后的脚本:
    请在函数中设置变量 main() .
    const axios = require("axios");
    const stream = require("stream");

    function transfer(
    download_url,
    resumable_drive_url,
    file_type,
    file_length,
    accessToken,
    filename,
    chunkSize
    ) {
    return new Promise((resolve, reject) => {
    axios({
    method: "get",
    url: download_url,
    responseType: "stream",
    maxRedirects: 1,
    })
    .then((result) => {
    const streamTrans = new stream.Transform({
    transform: function (chunk, _, callback) {
    callback(null, chunk);
    },
    });

    // 1. Retrieve session for resumable upload.
    axios({
    method: "POST",
    url: resumable_drive_url,
    headers: {
    Authorization: `Bearer ${accessToken}`,
    "Content-Type": "application/json",
    },
    data: JSON.stringify({
    name: filename,
    mimeType: file_type,
    }),
    })
    .then(({ headers: { location } }) => {
    // 2. Upload the file.
    let startByte = 0;
    result.data.pipe(streamTrans);
    let bufs = [];
    streamTrans.on("data", async (chunk) => {
    bufs.push(chunk);
    const temp = Buffer.concat(bufs);
    if (temp.length >= chunkSize) {
    const dataChunk = temp.slice(0, chunkSize);
    const left = temp.slice(chunkSize);
    streamTrans.pause();
    let upcount = 0;
    const upload = function () {
    console.log(
    `Progress: from ${startByte} to ${
    startByte + dataChunk.length - 1
    } for ${file_length}`
    );
    axios({
    method: "PUT",
    url: location,
    headers: {
    "Content-Range": `bytes ${startByte}-${
    startByte + dataChunk.length - 1
    }/${file_length}`,
    },
    data: dataChunk,
    })
    .then(({ data }) => resolve(data))
    .catch((err) => {
    if (err.response.status == 308) {
    startByte += dataChunk.length;
    streamTrans.resume();
    return;
    }
    if (upcount == 3) {
    reject(err);
    }
    upcount++;
    console.log("Retry");
    upload();
    return;
    });
    };
    upload();
    bufs = [left];
    }
    });
    streamTrans.on("end", () => {
    const dataChunk = Buffer.concat(bufs);
    if (dataChunk.length > 0) {
    // 3. Upload last chunk.
    let upcount = 0;
    const upload = function () {
    console.log(
    `Progress(last): from ${startByte} to ${
    startByte + dataChunk.length - 1
    } for ${file_length}`
    );
    axios({
    method: "PUT",
    url: location,
    headers: {
    "Content-Range": `bytes ${startByte}-${
    startByte + dataChunk.length - 1
    }/${file_length}`,
    },
    data: dataChunk,
    })
    .then(({ data }) => resolve(data))
    .catch((err) => {
    if (upcount == 3) {
    reject(err);
    }
    upcount++;
    upload();
    return;
    });
    };
    upload();
    }
    });
    streamTrans.on("error", (err) => reject(err));
    })
    .catch((err) => reject(err));
    })
    .catch((error) => {
    reject(error);
    });
    });
    }

    function main() {
    const download_url = "###";
    const resumable_drive_url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable";
    const file_type = "###"; // Please set the mimeType of the downloaded data.
    const file_length = 12345; // Please set the data size of the downloaded data.
    const accessToken = "###"; // Please set the access token.
    const filename = "sample filename"; // Please set the filename on Google Drive.
    const chunkSize = 10485760; // This is used as the chunk size for the resumable upload. This is 10 MB as a sample. In this case, please set the multiples of 256 KB (256 x 1024 bytes).

    transfer(
    download_url,
    resumable_drive_url,
    file_type,
    file_length,
    accessToken,
    filename,
    chunkSize
    )
    .then((res) => console.log(res))
    .catch((err) => console.log(err));
    }

    main();
    结果:
    当上述脚本针对 23558108 的文件大小运行时(这是一个示例数据),在控制台中得到以下结果..
    Progress: from 0 to 10485759 for 23558108
    Progress: from 10485760 to 20971519 for 23558108
    Progress(last): from 20971520 to 23558107 for 23558108
    {
    kind: 'drive#file',
    id: '###',
    name: 'sample filename',
    mimeType: '###'
    }
    笔记:
  • 当你想使用single chunk实现可续传上传时,可以在here查看示例脚本。 .

  • 引用:
  • Class: stream.Transform
  • Perform a resumable upload
  • 关于node.js - 将内存中的文件下载并上传到 Google Drive,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65570556/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com