gpt4 book ai didi

python - 我将如何在实时场景中使用 concurrent.futures 和队列?

转载 作者:太空狗 更新时间:2023-10-29 20:49:05 24 4
gpt4 key购买 nike

使用 Python 3 的 concurrent.futures 模块进行并行工作相当容易,如下所示。

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()

在队列中插入和检索项目也非常方便。

q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()

我有一个脚本在后台运行以监听更新。现在,理论上假设,当这些更新到达时,我会将它们排队并使用 ThreadPoolExecutor 同时处理它们。

现在,所有这些组件都单独工作,并且有意义,但我如何才能将它们一起使用呢?我不知道是否可以实时从队列中提供 ThreadPoolExecutor 工作,除非要处理的数据是预先确定的?

简而言之,我只想每秒接收 4 条消息的更新,将它们插入队列,并让我的 concurrent.futures 处理它们。如果我不这样做,那么我就会陷入一种缓慢的顺序方法。

让我们以canonical example in the Python为例文档如下:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))

URLS 列表是固定的。是否可以实时提供此列表并让工作人员在他们过来时处理它,也许出于管理目的来自队列?我对我的方法是否实际上可行感到有点困惑?

最佳答案

example来自 Python 文档,扩展为从队列中获取其工作。需要注意的一个变化是,此代码使用 concurrent.futures.wait 而不是 concurrent.futures.as_completed 以允许在等待其他工作完成时开始新工作.

import concurrent.futures
import urllib.request
import time
import queue

q = queue.Queue()

URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']

def feed_the_workers(spacing):
""" Simulate outside actors sending in work to do, request each url twice """
for url in URLS + URLS:
time.sleep(spacing)
q.put(url)
return "DONE FEEDING"

def load_url(url, timeout):
""" Retrieve a single page and report the URL and contents """
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

# start a future for a thread which sends work in through the queue
future_to_url = {
executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}

while future_to_url:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_url, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)

# if there is incoming work, start a new future
while not q.empty():

# fetch a url from the queue
url = q.get()

# Start the load operation and mark the future with its URL
future_to_url[executor.submit(load_url, url, 60)] = url

# process any completed futures
for future in done:
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
if url == 'FEEDER DONE':
print(data)
else:
print('%r page is %d bytes' % (url, len(data)))

# remove the now completed future
del future_to_url[future]

获取每个 url 两次的输出:

'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes

关于python - 我将如何在实时场景中使用 concurrent.futures 和队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41648103/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com