gpt4 book ai didi

python - 哪个是在 Python 中并行运行多个任务的最佳方式

转载 作者:行者123 更新时间:2023-12-04 02:26:17 28 4
gpt4 key购买 nike

我有一个函数:

import time

def all_40k():
for _ in range(400000):
print('validate')
print('parsing')
print('inserting')
if __name__ == '__main__':
start_time = time.time()
all_40k()
print(f'used time:{time.time()-start_time}')

输出是:

used time:9.545064210891724

因为这个相同的函数重复了 40k 次,所以我希望同时运行 4 个并行函数,每个函数运行 10k,理想情况下这会快 4 倍。

所以我首先尝试了多线程:

import threading
import time
def first_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')


def second_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')

def third_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')

def forth_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')

thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)

thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
start_time = time.time()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print(f'used time:{time.time()-start_time}')

令我惊讶的是,输出是:

used time:23.058093309402466

然后我尝试了 asyncio:

import time
import asyncio

async def test_1():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')


async def test_2():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')


async def test_3():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')


async def test_4():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')


async def multiple_tasks():
input_coroutines = [test_1(), test_2(), test_3(),test_4()]
res = await asyncio.gather(*input_coroutines, return_exceptions=True)
return res

if __name__ == '__main__':
start_time = time.time()
res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
print(f'used time:{time.time()-start_time}')

输出是:

used time:9.295843601226807

最后我尝试了 ProcessPoolExecutor:

import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
for i in range(urls[0], urls[1]):
print('validate')
print('parsing')
print('inserting')

def run():
urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
with ProcessPoolExecutor() as excute:
excute.map(data_handler,urls)

if __name__ == '__main__':
start_time = time.time()
run()
stop_time = time.time()
print('used time %s' % (stop_time - start_time))

输出是:

used time 12.726619243621826

那么我怎样才能加快这个过程呢?我想我走错了路。有 friend 可以帮忙吗?最好的问候!

最佳答案

好的,所以你注意到了什么:

No parallelism   9.545064210891724
asyncio 9.295843601226807
multithreading 12.726619243621826
Thread Pool 23.058093309402466

首先,Asyncio 实际上并不使用线程,如果您能猜到的话,性能依赖于一些 I/O。 Asyncio 在循环中的任务之间交替,每当有人点击 await 时切换。如果不使用 await,它最终只会一次运行一个任务,根本不会切换。

对于线程,由于 Global Interpreter Lock,只有一个线程能够控制 Python 解释器。 .你在这里最终得到的是来自不同线程的一堆争用,所有线程都试图同时工作。这种上下文切换会减慢您的应用程序。与 asyncio 类似,如果您想在等待某些 I/O 的同时安排其他工作,您实际上只会获得这些加速。

好吧,现在肯定多处理案例应该运行得更快..对吧?好吧,每个进程都有自己的解释器锁,但是,阻碍在您的 print 语句中。每个进程都被阻止试图将它们的输出发送到同一个控制台管道。让我用一个例子来告诉你。

假设我们有一个方法要运行 4 次。一次串行一次并行

def run(thread):
print(f"Starting thread: {thread}")
for i in range(500000):
print('foobar')
print(f"Finished thread: {thread}")


def run_singlethreaded():
start_time = time.time()

for thread in ["local"] * 4:
run(thread)

stop_time = time.time()
return stop_time - start_time


def run_multiprocessing():
start_time = time.time()

with ProcessPoolExecutor(max_workers=4) as ex:
ex.map(run, ["mp0", "mp1", "mp2", "mp3"])

stop_time = time.time()
return stop_time - start_time

if __name__ == '__main__':
singlethreaded_time = run_singlethreaded()
multiprocessing_time = run_multiprocessing()
print(f"Finished singlethreaded in: {singlethreaded_time}")
print(f"Finished multiprocessing in: {multiprocessing_time}")

如果我们运行它并打印时间,您会惊讶地看到:

Finished singlethreaded in:  10.513998746871948
Finished multiprocessing in: 12.252000570297241

现在,如果我们将打印更改为更简单的不会导致 IO 瓶颈的内容:

def run(thread):
print(f"Starting thread: {thread}")
for i in range(100000000):
pass
print(f"Finished thread: {thread}")

您将获得预期的并行速度:

Finished singlethreaded in:  9.816999435424805
Finished multiprocessing in: 2.503000020980835

这里重要的一点是,在并行性可以帮助您之前,您需要了解您在哪里受到资源限制。在 IO 绑定(bind)应用程序的情况下,线程或 asyncio 可能会有所帮助。对于 CPU 密集型应用程序,多处理可能很有用。在其他情况下,两者都不会真正帮助您(例如 print 语句),因为瓶颈存在于应用程序外部的系统中。希望这对您有所帮助!

关于python - 哪个是在 Python 中并行运行多个任务的最佳方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67411039/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com