gpt4 book ai didi

python - 多重处理。池和速率限制

转载 作者:太空宇宙 更新时间:2023-11-03 17:24:55 27 4
gpt4 key购买 nike

我正在发出一些 API 请求,这些请求限制为每秒 20 个。为了得到答案,等待时间约为 0.5 秒,我想使用 multiprocessing.Pool.map 并使用这个装饰器 rate-limiting所以我的代码看起来像

def fun(vec):
#do stuff

def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate

@RateLimited(20)
def multi(vec):
p = Pool(5)
return p.map(f, vec)

我有 4 个核心,这个程序运行良好,并且与循环版本相比,时间有所改进。此外,当 Pool 参数为 4,5,6 时,它可以工作,并且 Pool(6) 的时间更短,但是当我使用 7+ 时,我收到错误(我猜每秒连接数太多)。

然后,如果我的函数更复杂并且可以执行 1-5 个请求,则装饰器将无法按预期工作。在这种情况下我还能用什么?

更新

对于任何想要使用 Pool 的人,请记住关闭它,否则您将使用所有 RAM

def multi(vec):
p = Pool(5)
res=p.map(f, vec)
p.close()
return res

更新2

我发现像这样的东西WebRequestManager可以做到这一点。问题是它不适用于多处理。包含 19-20 个进程的池,因为时间存储在运行请求时需要调用的类中。

最佳答案

你上面的缩进不一致,这使得回答这个问题变得更加困难,但我会尝试一下。

看来您的速率限制是错误的;如果 f 应该受到限制,则需要限制对 f 的调用,而不是对 multi 的调用。在分派(dispatch)到的事情中执行此操作是行不通的,因为 fork 的工作程序将各自独立地进行限制( fork 的进程将独立跟踪自上次调用以来的时间)。

最简单的方法是限制 Pool 从中提取结果的迭代器产生结果的速度。例如:

import collections
import time
def rate_limited_iterator(iterable, limit_per_second):
# Initially, we can run immediately limit times
runats = collections.deque([time.monotonic()] * limit_per_second)
for x in iterable:
runat, now = runats.popleft(), time.monotonic()
if now < runat:
time.sleep(runat - now)
runats.append(time.monotonic() + 1)
yield x

def multi(vec):
with Pool(5) as p:
return list(p.imap(f, rate_limited_iterator(vec, 20)))

关于python - 多重处理。池和速率限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32700733/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com