gpt4 book ai didi

python-3.x - Google Storage python api 并行下载

转载 作者:行者123 更新时间:2023-12-04 18:56:30 29 4
gpt4 key购买 nike

使用 gsutil 将大量文件并行下载到本地机器是微不足道的。通过添加 -m旗帜:

gsutil -m cp gs://my-bucket/blob_prefix* .

在python中,我一次只能下载一个文件:
client = storage.Client()
bucket = client.get_bucket(gs_bucket_name)
blobs = [blob for blob in bucket.list_blobs(prefix=blob_prefix)]
for blob in blobs:
blob.download_to_filename(filename)

我最好将数据直接下载到内存中(类似于 blob.download_as_string() ),最好是下载到生成器中。顺序真的不重要。

这个功能是否存在于 python api 中?
如果没有,那么最好的方法是什么?

编辑

我已经实现了这个黑客:
def fetch_data_from_storage(fetch_pattern):
"""Download blobs to local first, then load them into Generator."""
tmp_save_dir = os.path.join("/tmp", "tmp_gs_download")
if os.path.isdir(tmp_save_dir):
shutil.rmtree(tmp_save_dir) # empty tmp dir first
os.makedirs(tmp_save_dir) # create tmp dir

download_command = ["gsutil", "-m", "cp", "gs://{}/{}".format(bucket.name, fetch_pattern), tmp_save_dir]
resp = subprocess.call(download_command)

for file in os.listdir(tmp_save_dir):
with open(os.path.join(tmp_save_dir, file), 'r') as f_data:
content = json.load(f_data)
yield content

请告知这是否在某处以更好的方式实现。

最佳答案

好的,这是我的多处理多线程解决方案。下面是它的工作原理:

  • 1) 使用子流程和 gsutil ls -l pattern获取 blob 名称及其文件大小的列表。这是在 __main__ 中输入的模式
  • 2) 根据最大​​批量大小创建批次名称。默认为 1MB。然后,一个大文件将只创建一批 1。
  • 3) 每批被送到不同的工序。默认进程 = cpu_count - 2
  • 4)每个进程中的每个batch都是多线程的(默认maxthreads = 10)在下一个batch开始之前需要完成一个batch的线程。
  • 5) 每个线程下载一个 blob 并将其与其元数据结合起来。
  • 6) 结果通过共享资源和内存分配向上传播。

  • 我写这个的原因:
  • 我还需要元数据,它在 gsutil 中丢失了(关键)
  • 如果某些部分失败(轻微),我想要一些重试控制

  • ~2500 个小 (<50kb) (=55MB) 文件的速度比较:
  • 一次一个文件(包括元数据):25m13s
  • gsutil -m cp (无元数据):0m35s
  • 下面的代码(包括元数据):1m43s

  • from typing import Iterable, Generator
    import logging
    import json
    import datetime
    import re
    import subprocess
    import multiprocessing
    import threading

    from google.cloud import storage

    logging.basicConfig(level='INFO')
    logger = logging.getLogger(__name__)


    class StorageDownloader:

    def __init__(self, bucket_name):
    self.bucket_name = bucket_name
    self.bucket = storage.Client().bucket(bucket_name)

    def create_blob_batches_by_pattern(self, fetch_pattern, max_batch_size=1e6):
    """Fetch all blob names according to the pattern and the blob size.

    :param fetch_pattern: The gsutil matching pattern for files we want to download.
    A gsutil pattern is used instead of blob prefix because it is more powerful.
    :type fetch_pattern: str
    :param max_batch_size: Maximum size per batch in bytes. Default = 1 MB = 1e6 bytes
    :type max_batch_size: float or int
    :return: Generator of batches of blob names.
    :rtype: Generator of list
    """
    download_command = ["gsutil", "ls", "-l", "gs://{}/{}".format(self.bucket.name, fetch_pattern)]
    logger.info("Gsutil list command command: {}".format(download_command))
    blob_details_raw = subprocess.check_output(download_command).decode()
    regexp = r"(\d+) +\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ +gs:\/\/\S+?\/(\S+)"
    # re.finditer returns a generator so we don't duplicate memory need in case the string is quite large
    cum_batch_size = 0
    batch = []
    batch_nr = 1
    for reg_match in re.finditer(regexp, blob_details_raw):
    blob_name = reg_match.group(2)
    byte_size = int(reg_match.group(1))

    batch.append(blob_name)
    cum_batch_size += byte_size

    if cum_batch_size > max_batch_size:
    yield batch
    batch = []
    cum_batch_size = 0
    batch_nr += 1

    logger.info("Created {} batches with roughly max batch size = {} bytes".format(batch_nr, int(max_batch_size)))
    if batch:
    yield batch # if we still have a batch left, then it must also be yielded

    @staticmethod
    def download_batch_into_memory(batch, bucket, inclue_metadata=True, max_threads=10):
    """Given a batch of storage filenames, download them into memory.

    Downloading the files in a batch is multithreaded.

    :param batch: A list of gs:// filenames to download.
    :type batch: list of str
    :param bucket: The google api pucket.
    :type bucket: google.cloud.storage.bucket.Bucket
    :param inclue_metadata: True to inclue metadata
    :type inclue_metadata: bool
    :param max_threads: Number of threads to use for downloading batch. Don't increase this over 10.
    :type max_threads: int
    :return: Complete blob contents and metadata.
    :rtype: dict
    """
    def download_blob(blob_name, state):
    """Standalone function so that we can multithread this."""
    blob = bucket.blob(blob_name=blob_name)
    content = json.loads(blob.download_as_string())
    if inclue_metadata:
    blob.reload()
    metadata = blob.metadata
    if metadata:
    state[blob_name] = {**content, **metadata}
    state[blob_name] = content

    batch_data = {bn: {} for bn in batch}
    threads = []
    active_thread_count = 0
    for blobname in batch:
    thread = threading.Thread(target=download_blob, kwargs={"blob_name": blobname, "state": batch_data})
    threads.append(thread)
    thread.start()
    active_thread_count += 1
    if active_thread_count == max_threads:
    # finish up threads in batches of size max_threads. A better implementation would be a queue
    # from which the threads can feed, but this is good enough if the blob size is roughtly the same.
    for thread in threads:
    thread.join()
    threads = []
    active_thread_count = 0

    # wait for the last of the threads to be finished
    for thread in threads:
    thread.join()
    return batch_data

    def multiprocess_batches(self, batches, max_processes=None):
    """Spawn parallel process for downloading and processing batches.

    :param batches: An iterable of batches, probably a Generator.
    :type batches: Iterable
    :param max_processes: Maximum number of processes to spawn. None for cpu_count
    :type max_processes: int or None
    :return: The response form all the processes.
    :rtype: dict
    """
    if max_processes is None:
    max_processes = multiprocessing.cpu_count() - 2
    logger.info("Using {} processes to process batches".format(max_processes))

    def single_proc(mp_batch, mp_bucket, batchresults):
    """Standalone function so that we can multiprocess this."""
    proc_res = self.download_batch_into_memory(mp_batch, mp_bucket)
    batchresults.update(proc_res)

    pool = multiprocessing.Pool(processes=max_processes)
    batch_results = multiprocessing.Manager().dict()

    jobs = []
    for batch in batches:
    logger.info("Processing batch with {} elements".format(len(batch)))
    # the client is not thread safe, so need to recreate the client for each process.
    bucket = storage.Client().get_bucket(self.bucket_name)
    proc = pool.Process(
    target=single_proc,
    kwargs={"mp_batch": batch, "mp_bucket": bucket, "batchresults": batch_results}
    )
    jobs.append(proc)
    proc.start()

    for job in jobs:
    job.join()

    logger.info("finished downloading {} blobs".format(len(batch_results)))
    return batch_results

    def bulk_download_as_dict(self, fetch_pattern):
    """Download blobs from google storage to

    :param fetch_pattern: A gsutil storage pattern.
    :type fetch_pattern: str
    :return: A dict with k,v pairs = {blobname: blob_data}
    :rtype: dict
    """
    start = datetime.datetime.now()
    filename_batches = self.create_blob_batches_by_pattern(fetch_pattern)
    downloaded_data = self.multiprocess_batches(filename_batches)
    logger.info("time taken to download = {}".format(datetime.datetime.now() - start))
    return downloaded_data


    if __name__ == '__main__':
    stor = StorageDownloader("mybucket")
    data = stor.bulk_download_as_dict("some_prefix*")

    这仍然可以使用相当多的优化(例如将线程排队而不是等待块完成),但这对我来说现在已经足够了。

    关于python-3.x - Google Storage python api 并行下载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51508478/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com