gpt4 book ai didi

带有 aioboto3 的 Python 3 asyncio 似乎是连续的

转载 作者:行者123 更新时间:2023-12-04 12:58:06 25 4
gpt4 key购买 nike

我正在将一个简单的 python 3 脚本移植到 AWS Lambda。
该脚本很简单:它从十几个 S3 对象中收集信息并返回结果。
使用的脚本 multiprocessing.Pool并行收集所有文件。虽然 multiprocessing不能在 AWS Lambda 环境中使用,因为 /dev/shm不见了。
所以我想而不是写一个肮脏的multiprocessing.Process/multiprocessing.Queue更换,我会尝试 asyncio反而。
我正在使用最新版本的 aioboto3 (8.0.5) 在 Python 3.8 上。
我的问题是,我似乎无法在文件的原始顺序下载和多路复用下载的 asyncio 事件循环之间获得任何改进。
这是我的代码的两个版本。

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
'some/key/1',
[...]
'some/key/10',
]

async def download_aio():
"""Concurrent download of all objects from S3"""
async with aioboto3.client('s3') as s3:
objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
objects = await asyncio.gather(*objects)
buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
"""Sequentially download all objects from S3"""
s3 = boto3.client('s3')
for key in KEYS:
object = s3.get_object(Bucket=BUCKET, Key=key)
object['Body'].read()

def run_sequential():
download()

def run_concurrent():
loop = asyncio.get_event_loop()
#loop.set_default_executor(ProcessPoolExecutor(10))
#loop.set_default_executor(ThreadPoolExecutor(10))
loop.run_until_complete(download_aio())
两者的时间 run_sequential()run_concurrent()非常相似(十几个 10MB 文件约 3 秒)。
我确信并发版本不是,原因有很多:
  • 我尝试切换到 Process/ThreadPoolExecutor ,而我在函数运行期间产生的进程/线程,尽管它们什么都不做
  • 顺序和并发的时间非常接近,虽然我的网口肯定没有饱和,CPU也没有绑定(bind)
  • 并发版本所用的时间随文件数量线性增加。

  • 我确定缺少某些东西,但我无法理解是什么。
    有任何想法吗?

    最佳答案

    在花了几个小时试图了解如何使用之后 aioboto3正确,我决定只切换到我的备份解决方案。
    我最终推出了自己的天真版本 multiprocessing.Pool在 AWS lambda 环境中使用。
    如果将来有人偶然发现这个线程,那就是这里。它远非完美,但很容易更换 multiprocessing.Pool对于我的简单案例。

    from multiprocessing import Process, Pipe
    from multiprocessing.connection import wait


    class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self, process_count=1):
    assert process_count >= 1
    self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe, index, func):
    def wrapper(args):
    try:
    result = func(args)
    except Exception as exc: # pylint: disable=broad-except
    result = exc
    pipe.send((index, result))
    return wrapper

    def __enter__(self):
    return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
    pass

    def map(self, function, arguments):
    pending = list(enumerate(arguments))
    running = []
    finished = [None] * len(pending)
    while pending or running:
    # Fill the running queue with new jobs
    while len(running) < self.process_count:
    if not pending:
    break
    index, args = pending.pop(0)
    pipe_parent, pipe_child = Pipe(False)
    process = Process(
    target=Pool.wrap_pipe(pipe_child, index, function),
    args=(args, ))
    process.start()
    running.append((index, process, pipe_parent))
    # Wait for jobs to finish
    for pipe in wait(list(map(lambda t: t[2], running))):
    index, result = pipe.recv()
    # Remove the finished job from the running list
    running = list(filter(lambda x: x[0] != index, running))
    # Add the result to the finished list
    finished[index] = result

    return finished

    关于带有 aioboto3 的 Python 3 asyncio 似乎是连续的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63628262/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com