gpt4 book ai didi

python - 在类中使用 ProcessPoolExecutor 时无法 pickle 协程对象

转载 作者:太空狗 更新时间:2023-10-30 00:12:54 35 4
gpt4 key购买 nike

我正在尝试让 asyncio 与子进程和限制一起工作。我已经以功能方式完成了此操作,但是当我尝试以 opp 样式实现相同的逻辑时,出现了几个问题。主要是 Can't pickle coroutine/generator 错误。我追踪了一些,但不是全部

import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
from random import randint

class async_runner(object):
def __init__(self):
self.futures = [] # container to store current futures
self.futures_total = []
self.loop = asyncio.get_event_loop() # main event_loop
self.executor = ProcessPoolExecutor()
self.limit = 1

def run(self, func, *args):
temp_loop = asyncio.new_event_loop()
try:
coro = func(*args)
asyncio.set_event_loop(temp_loop)
ret = temp_loop.run_until_complete(coro)
return ret
finally:
temp_loop.close()
def limit_futures(self, futures, limit):
self.futures_total = iter(futures)
self.futures = [future for future in islice(self.futures_total,0,limit)]
async def first_to_finish():
while True:
await asyncio.sleep(0)
for f in self.futures:
if f.done(): # here raised TypeError: can't pickle coroutine objects
print(f.done())
self.futures.remove(f)
try:
#newf = next(self.futures_total)
#self.futures.append(newf)
print(f.done())
except StopIteration as e:
pass
return f.result()
while len(self.futures) > 0:
yield first_to_finish()
async def run_limited(self, func, args, limit):
self.limit = int(limit)
self.futures_total = (self.loop.run_in_executor(self.executor, self.run, func, x) for x in range(110000,119990))
for ret in self.limit_futures(self.futures_total, 4): # limitation - 4 per all processes
await ret
def set_execution(self, func, args, limit):
ret = self.loop.run_until_complete(self.run_limited(func, args, limit))
return ret
async def asy(x):
print('enter: ', x)
await asyncio.sleep(randint(1,3))
print('finishing ', x)
return x

runner = async_runner()
ret = runner.set_execution(asy,urls,2)
print(ret)

但这工作正常:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
import time

async def asy(x):
print('enter: ', x)
await asyncio.sleep(1)
print('finishing ', x)
return x

def run(corofn, *args):
loop = asyncio.new_event_loop()
try:
coro = corofn(*args)
asyncio.set_event_loop(loop)
ret = loop.run_until_complete(coro)
#print(ret)
return ret
finally:
loop.close()
def limit_futures(futures, limit):
futures_sl = [
c for c in islice(futures, 0, limit)
]
print(len(futures_sl))
async def first_to_finish(futures):
while True:
await asyncio.sleep(0)
for f in futures_sl:
if f.done():
futures_sl.remove(f)
try:
newf = next(futures)
futures_sl.append(newf)
except StopIteration as e:
pass
return f.result()
while len(futures_sl) > 0:
yield first_to_finish(futures)
async def main():
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor()
futures = (loop.run_in_executor(executor, run, asy, x) for x in range(110000,119990))
'''
CASE balls to the wall!
await asyncio.gather(*futures)
'''
for ret in limit_futures(futures, 4): # limitation - 4 per all processes
await ret

if __name__ == '__main__':
start = time.time()
'''
# CASE single
ret = [asy(x) for x in range(510000,510040)]
exit()
'''
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Elapsed time: {:.3f} sec".format(time.time() - start))

我不明白为什么 multiprocessing 模块只在对象正在使用时尝试 pickle 任何东西,而不是在任何情况下

最佳答案

multiprocessing 需要 pickle async_runner 实例的原因是因为 self.runner 是一个绑定(bind)方法,这意味着它“包含”了 async_runner实例。

由于您实际上并未在 run 方法中使用 self,因此您可以将其设为 staticmethod 以避免此问题。

关于python - 在类中使用 ProcessPoolExecutor 时无法 pickle 协程对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47662040/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com