gpt4 book ai didi

python - 使 Boto3 上传调用阻塞(单线程)

转载 作者:太空狗 更新时间:2023-10-30 01:56:22 26 4
gpt4 key购买 nike

编辑 :我最初的假设被证明部分错误。我在这里添加了一个冗长的答案,我邀请其他人进行压力测试和纠正。

我正在寻找一种以单线程方式利用 Boto3 S3 API 来模拟线程安全键值存储的方法。简而言之,我想 使用调用线程而不是新线程 来进行上传。

Boto3(或 .upload_fileobj() )中 .upload_file() 方法的默认行为,据我所知,是将任务启动到一个新线程并立即返回 None

docs :

This is a managed transfer which will perform a multipart upload in multiple threads if necessary.



(如果我首先对此的理解是错误的,那么对此进行更正也会有所帮助。这是在 Boto3 1.9.134 中。)
>>> import io
>>> import boto3
>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')
>>> buf = io.BytesIO(b"test")
>>> res = bucket.upload_fileobj(buf, 'testobj')
>>> res is None
True

现在,假设 buf 不是一个短的 4 字节字符串,而是一个巨大的文本 blob,需要花费不可忽略的时间才能完全上传。

我还使用此函数来检查是否存在具有给定键的对象:
def key_exists_in_bucket(bucket_obj, key: str) -> bool:
try:
bucket_obj.Object(key).load()
except botocore.exceptions.ClientError:
return False
else:
return True

如果对象按名称存在,我的目的是不重写它。

这里的竞争条件非常明显:异步启动上传,然后快速检查 key_exists_in_bucket() ,如果对象仍在写入,则返回 False ,然后因此不必要地再次写入。

有没有办法确保 bucket.upload_fileobj() 由当前线程调用,而不是在该方法范围内创建的新线程?

我意识到这会减慢速度。在这种情况下,我愿意牺牲速度。

最佳答案

我认为,由于这个问题的答案和another similar question似乎都存在直接冲突,因此最好直接使用 pdb 到源头。

概括

  • boto3默认使用多个线程(10)
  • 但是,它不是异步的,因为它在返回之前等待(连接)这些线程,而不是使用“即发即弃”技术
  • 因此,以这种方式,如果您尝试与多个客户端的s3存储桶进行通信,则读/写线程安全性就位。


  • 细节

    我要在此处解决的一个方面是 Multiple(子线程)并不意味着顶层方法本身是非阻塞的:如果调用线程启动了向多个子线程的上传,但要等待它们线程完成并返回,我敢说那仍然是一个阻塞的调用。不利的一面是,如果用 asyncio讲方法调用是“即发即弃”调用。使用 threading,这实际上取决于是否曾经调用过 x.join()

    这是从Victor Val摘录的启动调试器的初始代码:
    import io
    import pdb

    import boto3

    # From dd if=/dev/zero of=100mb.txt bs=50M count=1
    buf = io.BytesIO(open('100mb.txt', 'rb').read())
    bucket = boto3.resource('s3').Bucket('test-threads')
    pdb.run("bucket.upload_fileobj(buf, '100mb')")

    该堆栈框架来自Boto 1.9.134。

    现在跳进 pdb:
    .upload_fileobj()首先调用一个嵌套方法-尚不多见。
    (Pdb) s
    --Call--
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()
    -> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,
    (Pdb) s

    (Pdb) l
    574
    575 :type Config: boto3.s3.transfer.TransferConfig
    576 :param Config: The transfer configuration to be used when performing the
    577 upload.
    578 """
    579 -> return self.meta.client.upload_fileobj(
    580 Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,
    581 Callback=Callback, Config=Config)
    582
    583
    584

    因此顶层方法的确返回了一些内容,但尚不清楚该如何最终变成 None

    因此,我们介入其中。

    现在, .upload_fileobj()确实有一个 config参数,默认情况下为None:
    (Pdb) l 531
    526
    527 subscribers = None
    528 if Callback is not None:
    529 subscribers = [ProgressCallbackInvoker(Callback)]
    530
    531 config = Config
    532 if config is None:
    533 config = TransferConfig()
    534
    535 with create_transfer_manager(self, config) as manager:
    536 future = manager.upload(

    这意味着 config成为默认的 TransferConfig() :
  • use_threads-如果为True,则在执行S3传输时将使用线程。如果为False,则在执行传输时将不使用任何线程:所有逻辑都将在主线程中运行。
  • max_concurrency-发出请求以执行传输的最大线程数。如果use_threads设置为False,则提供的值将被忽略,因为该传输只会使用主线程。

  • 哇,它们在这里:
    (Pdb) unt 534
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()
    -> with create_transfer_manager(self, config) as manager:
    (Pdb) config
    <boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
    (Pdb) config.use_threads
    True
    (Pdb) config.max_concurrency
    10

    现在,我们在调用堆栈中向下移动一个级别以使用 TransferManager(上下文管理器)。在这一点上, max_concurrency已经用作类似名称 max_request_concurrency的参数:
    # https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223

    # The executor responsible for making S3 API transfer requests
    self._request_executor = BoundedExecutor(
    max_size=self._config.max_request_queue_size,
    max_num_threads=self._config.max_request_concurrency,
    tag_semaphores={
    IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
    self._config.max_in_memory_upload_chunks),
    IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
    self._config.max_in_memory_download_chunks)
    },
    executor_cls=executor_cls
    )

    至少在此boto3版本中,该类来自单独的库 s3transfer
    (Pdb) n
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()
    -> future = manager.upload(
    (Pdb) manager
    <s3transfer.manager.TransferManager object at 0x7f178db437f0>
    (Pdb) manager._config
    <boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
    (Pdb) manager._config.use_threads
    True
    (Pdb) manager._config.max_concurrency
    10

    接下来,让我们进入 manager.upload()。这是该方法的全部内容:
    (Pdb) l 290, 303
    290 -> if extra_args is None:
    291 extra_args = {}
    292 if subscribers is None:
    293 subscribers = []
    294 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
    295 call_args = CallArgs(
    296 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
    297 subscribers=subscribers
    298 )
    299 extra_main_kwargs = {}
    300 if self._bandwidth_limiter:
    301 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
    302 return self._submit_transfer(
    303 call_args, UploadSubmissionTask, extra_main_kwargs)

    (Pdb) unt 301
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()
    -> return self._submit_transfer(
    (Pdb) extra_main_kwargs
    {}

    (Pdb) UploadSubmissionTask
    <class 's3transfer.upload.UploadSubmissionTask'>
    (Pdb) call_args
    <s3transfer.utils.CallArgs object at 0x7f178db5a5f8>

    (Pdb) l 300, 5
    300 if self._bandwidth_limiter:
    301 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
    302 -> return self._submit_transfer(
    303 call_args, UploadSubmissionTask, extra_main_kwargs)
    304
    305 def download(self, bucket, key, fileobj, extra_args=None,

    啊,很可爱-因此,我们需要至少降低一级才能查看实际的基础上传。
    (Pdb) s
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()
    -> call_args, UploadSubmissionTask, extra_main_kwargs)
    (Pdb) s
    --Call--
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()
    -> def _submit_transfer(self, call_args, submission_task_cls,
    (Pdb) s
    > /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()
    -> if not extra_main_kwargs:

    (Pdb) l 440, 10
    440 -> if not extra_main_kwargs:
    441 extra_main_kwargs = {}
    442
    443 # Create a TransferFuture to return back to the user
    444 transfer_future, components = self._get_future_with_components(
    445 call_args)
    446
    447 # Add any provided done callbacks to the created transfer future
    448 # to be invoked on the transfer future being complete.
    449 for callback in get_callbacks(transfer_future, 'done'):
    450 components['coordinator'].add_done_callback(callback)

    好的,现在我们有了一个 TransferFuture,它在 s3transfer/futures.py 中定义。尚无确切证据证明线程已经启动,但是当 futures涉及到时,听起来确实像这样。
    (Pdb) l
    444 transfer_future, components = self._get_future_with_components(
    445 call_args)
    446
    447 # Add any provided done callbacks to the created transfer future
    448 # to be invoked on the transfer future being complete.
    449 -> for callback in get_callbacks(transfer_future, 'done'):
    450 components['coordinator'].add_done_callback(callback)
    451
    452 # Get the main kwargs needed to instantiate the submission task
    453 main_kwargs = self._get_submission_task_main_kwargs(
    454 transfer_future, extra_main_kwargs)
    (Pdb) transfer_future
    <s3transfer.futures.TransferFuture object at 0x7f178db5a780>

    乍一看,下面的最后一行来自 TransferCoordinator类:
    class TransferCoordinator(object):
    """A helper class for managing TransferFuture"""
    def __init__(self, transfer_id=None):
    self.transfer_id = transfer_id
    self._status = 'not-started'
    self._result = None
    self._exception = None
    self._associated_futures = set()
    self._failure_cleanups = []
    self._done_callbacks = []
    self._done_event = threading.Event() # < ------ !!!!!!

    您通常会看到 threading.Event being used for one thread to signal事件状态,而其他线程可以等待该事件发生。
    TransferCoordinatorused by TransferFuture.result()

    好了,从上面绕过去,我们现在到了 s3transfer.futures.BoundedExecutor 及其 max_num_threads属性:
    class BoundedExecutor(object):
    EXECUTOR_CLS = futures.ThreadPoolExecutor
    # ...
    def __init__(self, max_size, max_num_threads, tag_semaphores=None,
    executor_cls=None):
    self._max_num_threads = max_num_threads
    if executor_cls is None:
    executor_cls = self.EXECUTOR_CLS
    self._executor = executor_cls(max_workers=self._max_num_threads)

    这基本上是 equivalent:
    from concurrent import futures

    _executor = futures.ThreadPoolExecutor(max_workers=10)

    但是仍然存在一个问题:这是“一劳永逸”的操作,还是该调用实际上是在等待线程完成并返回?

    似乎是后者。 .result()调用 self._done_event.wait(MAXINT)
    # https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249

    def result(self):
    self._done_event.wait(MAXINT)

    # Once done waiting, raise an exception if present or return the
    # final result.
    if self._exception:
    raise self._exception
    return self._result

    最后,要重新运行Victor Val的测试,这似乎可以确认上述内容:
    >>> import boto3
    >>> import time
    >>> import io
    >>>
    >>> buf = io.BytesIO(open('100mb.txt', 'rb').read())
    >>>
    >>> bucket = boto3.resource('s3').Bucket('test-threads')
    >>> start = time.time()
    >>> print("starting to upload...")
    starting to upload...
    >>> bucket.upload_fileobj(buf, '100mb')
    >>> print("finished uploading")
    finished uploading
    >>> end = time.time()
    >>> print("time: {}".format(end-start))
    time: 2.6030001640319824

    (此示例在网络优化的实例上运行时,执行时间可能会更短。但是2.5秒仍然是一个很大的时间段,根本不表示线程已启动且未等待。)

    最后,这是 Callback.upload_fileobj()的示例。随后是来自文档的 an example

    首先,一个小 helper 可以有效地获取缓冲区的大小:
    def get_bufsize(buf, chunk=1024) -> int:
    start = buf.tell()
    try:
    size = 0
    while True:
    out = buf.read(chunk)
    if out:
    size += chunk
    else:
    break
    return size
    finally:
    buf.seek(start)

    类本身:
    import os
    import sys
    import threading
    import time

    class ProgressPercentage(object):
    def __init__(self, filename, buf):
    self._filename = filename
    self._size = float(get_bufsize(buf))
    self._seen_so_far = 0
    self._lock = threading.Lock()
    self.start = None

    def __call__(self, bytes_amount):
    with self._lock:
    if not self.start:
    self.start = time.monotonic()
    self._seen_so_far += bytes_amount
    percentage = (self._seen_so_far / self._size) * 100
    sys.stdout.write(
    "\r%s %s of %s (%.2f%% done, %.2fs elapsed\n" % (
    self._filename, self._seen_so_far, self._size,
    percentage, time.monotonic() - self.start))
    # Use sys.stdout.flush() to update on one line
    # sys.stdout.flush()

    例子:
    In [19]: import io 
    ...:
    ...: from boto3.session import Session
    ...:
    ...: s3 = Session().resource("s3")
    ...: bucket = s3.Bucket("test-threads")
    ...: buf = io.BytesIO(open('100mb.txt', 'rb').read())
    ...:
    ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))
    mykey 262144 of 104857600.0 (0.25% done, 0.00s elapsed
    mykey 524288 of 104857600.0 (0.50% done, 0.00s elapsed
    mykey 786432 of 104857600.0 (0.75% done, 0.01s elapsed
    mykey 1048576 of 104857600.0 (1.00% done, 0.01s elapsed
    mykey 1310720 of 104857600.0 (1.25% done, 0.01s elapsed
    mykey 1572864 of 104857600.0 (1.50% done, 0.02s elapsed

    关于python - 使 Boto3 上传调用阻塞(单线程),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55799556/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com