gpt4 book ai didi

python - 使用 AIOfile 读取异步文件

转载 作者:行者123 更新时间:2023-12-05 02:06:44 25 4
gpt4 key购买 nike

我正在尝试使用 asyncio 读取多个文件 (CSV),但我不想在这样做时阻塞主事件循环。

所以我检查了 AIOfile,它似乎保证读取不会阻塞。虽然这可能是真的,但以下代码片段需要花费大量时间才能完成,它与此处的示例基本相同 https://github.com/mosquito/aiofile#read-file-line-by-line

import asyncio
from aiofile import AIOFile, LineReader
from pathlib import Path
import time

counter = 0

async def main():
path = 'test_data'
global counter
data_dir = Path(path)
files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
list_of_files = [(path + '/' + file.name, file) for file in files_in_basepath]
for file in list_of_files:
line_count = 0
async with AIOFile(file[0]) as afp:
await afp.fsync()
async for line in LineReader(afp):
#print(line)
values = ''
line_values = line.split(',')
for item in line_values:
values = values + item + ' '
# print(values)
line_count += 1
print(f'Processed {line_count} lines in file {file[1].name}.')
counter += 1

start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")

这给出了糟糕的性能,100 个文件需要:

在 196.8809883594513 秒内处理了 100 个数据文件

与顺序处理那些文件相比,这真是不可思议......

在 0.9933180809020996 秒内处理了 100 个数据文件

所以我想知道这里发生了什么,而且我在几个地方看到了在执行程序中运行 IO 操作的建议,这样事件循环就不会被阻塞。

顺便提一下,我有一些其他代码可以在线程上运行它并且执行起来几乎与顺序执行一样好:

import concurrent.futures
import csv
import threading
import time
from pathlib import Path

c_lock = threading.Lock()
counter = 0

def read_data_file(files):
# Get the info from second item from tuple
info = files[1].stat()
global c_lock
global counter
c_lock.acquire()
print(info.st_mtime)
print(f'File name is {files[1].name} with size {round(info.st_size / float(1 << 10), 2)} KB')
with open(files[0]) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
# Just assume we do something very interesting with these values...
values = ''
for item in row:
values = values + item + ' '
#print(values)
line_count += 1
print(f'Processed {line_count} lines in file {files[1].name}.')
counter += 1
c_lock.release()

def read_data_files(path):
# List all files in data folder
data_dir = Path(path)
files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
list_of_files = []
for file in files_in_basepath:
list_of_files.append((path + '/' + file.name, file))
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
executor.map(read_data_file, list_of_files)


if __name__ == "__main__":
data_files = 'test_data'
start_time = time.time()
read_data_files(data_files)
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")

这给出了以下内容:

在 1.0079402923583984 秒内处理了 100 个数据文件

想知道我是否对 asyncio 做错了什么,或者我应该完全跳过它......我只是在尝试什么是处理所有这些文件的最有效方法,顺序的,线程的(包括 asyncio)或多处理)

最佳答案

您的多线程代码用一个巨大的锁锁定了所有 read_data_file,迫使它按顺序执行,导致线程版本的性能并不比顺序版本好。

由于代码没有使用 asyncio.gather 或类似的并行化,asyncio 版本也按顺序运行。至于为什么它比常规顺序版本慢 200 倍,这可能是问 aiofiles 开发人员的一个好问题。我怀疑每个行读取操作都单独交给了一个内部线程,由于在这样一个热循环中需要大量的簿记,所以速度变慢了。

总结:

  • 如果您的瓶颈是 IO 速度,您可能通过使用多线程有所收获,只要您注意不要因不必要的锁定而使事情顺序进行。 (GIL 不会有问题,因为它会围绕 IO 操作自动释放。)

  • 如果您的瓶颈是 CPU 速度,您可能想要研究多处理,因为 GIL 导致多线程无济于事。例如,在读取 CSV 文件时,解析文件内容并将其转换为数字所花费的时间可能比从磁盘读取文件所花费的时间相形见绌,尤其是当文件由操作系统缓存在内存中时。

  • asyncio 和 aiofiles 很可能不会帮助您提高处理 CSV 文件的速度。 aiofiles 在集成读取可能“卡住”的文件时最有用(例如,因为它们可能正在从不再存在的网络驱动器中读取)。在当前的实现中,它对于读取需要高吞吐量的文件没有用。

TL;DR 尝试正确使用线程来提高速度,如果这不起作用,则使用多处理。

关于python - 使用 AIOfile 读取异步文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62501080/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com