- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
使用 Python 3 的 concurrent.futures
模块进行并行工作相当容易,如下所示。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()
在队列中插入和检索项目也非常方便。
q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()
我有一个脚本在后台运行以监听更新。现在,理论上假设,当这些更新到达时,我会将它们排队并使用 ThreadPoolExecutor
同时处理它们。
现在,所有这些组件都单独工作,并且有意义,但我如何才能将它们一起使用呢?我不知道是否可以实时从队列中提供 ThreadPoolExecutor
工作,除非要处理的数据是预先确定的?
简而言之,我只想每秒接收 4 条消息的更新,将它们插入队列,并让我的 concurrent.futures 处理它们。如果我不这样做,那么我就会陷入一种缓慢的顺序方法。
让我们以canonical example in the Python为例文档如下:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
URLS
列表是固定的。是否可以实时提供此列表并让工作人员在他们过来时处理它,也许出于管理目的来自队列?我对我的方法是否实际上可行感到有点困惑?
最佳答案
example来自 Python 文档,扩展为从队列中获取其工作。需要注意的一个变化是,此代码使用 concurrent.futures.wait
而不是 concurrent.futures.as_completed
以允许在等待其他工作完成时开始新工作.
import concurrent.futures
import urllib.request
import time
import queue
q = queue.Queue()
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def feed_the_workers(spacing):
""" Simulate outside actors sending in work to do, request each url twice """
for url in URLS + URLS:
time.sleep(spacing)
q.put(url)
return "DONE FEEDING"
def load_url(url, timeout):
""" Retrieve a single page and report the URL and contents """
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# start a future for a thread which sends work in through the queue
future_to_url = {
executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}
while future_to_url:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_url, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)
# if there is incoming work, start a new future
while not q.empty():
# fetch a url from the queue
url = q.get()
# Start the load operation and mark the future with its URL
future_to_url[executor.submit(load_url, url, 60)] = url
# process any completed futures
for future in done:
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
if url == 'FEEDER DONE':
print(data)
else:
print('%r page is %d bytes' % (url, len(data)))
# remove the now completed future
del future_to_url[future]
获取每个 url
两次的输出:
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes
关于python - 我将如何在实时场景中使用 concurrent.futures 和队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41648103/
websocket的用途/场景 先总结:高即时性服务,比如聊天室的群聊,server顺序收到了张三,李四的消息,立即就推送给王五,不能让王五等半天。 Ajax也可以一秒一刷,让王五去问张三说话没,如果
前端的工作过程里,本地开发、提供测试环境,总得有个用着顺手的服务器软件,这个场景里nginx很流行。 介绍两个好用的配置项:rewrite try_files @xxxx rewrite 比较
我有一个场景的两个不同角度的 2 个视频文件,我想重建场景的 3D 估计。它类似于 3D 传感器的作用(例如 Kinect、PrimeSense)。我正在寻找一个库,甚至是一个完善的机器视觉算法,以便
我已阅读RebaseProject页面并尝试了一个不平凡的例子(不是对一个完整的分支进行 rebase )。这与 rebase D 的情况类似我场景B。 这是rebase之前的情况: default
有没有办法将我的场景保存在 JavaFx 应用程序中单独的 Java 文件中?我尝试过这样的事情: public class MyApp extends Application { pri
我有这样的场景:用户想要查看大量有关自己的信息。例如:年龄、姓名、地位、收入、工作、爱好、 child 的名字、妻子的名字、酋长的名字、祖父/祖母的名字。大约 50 个变量。他可以选择任何变量来显示信
我希望有人能帮助我解决这个问题:我有一个包含条目的表。我想执行查询并根据模式获取得分最高的记录。模式将是:如果我的话按原样出现,那么该条目的分数将是最高的。如果该单词出现在句子中,则该条目的分数将低于
我正在尝试在我的应用程序委托(delegate)方法中实现一些逻辑。了解当前正在运行哪种场景将非常有帮助。 [[CCDirector sharedDirector] runningScene] 返回当
好的,这是一个有趣的。我有 2 个表:tbl_notes、tbl_notes_categories 简单地说,tbl_notes 有一个 categoryid,我将 2 个表与该 ID 相关联。所以,
我有一个使用并行运行的 Specflow、selenium、NUnit 的测试解决方案在 AssemblyInfo 中添加了这个:[程序集:Parallelizable(ParallelScope.F
我正在尝试弄清楚如何在 SpriteKit 中添加更多场景。如果我在 GameViewController 中使用 SpriteKit 生成的行 if let scene = GameScene.un
目录 1、业务背景 2、场景分析 3、流程设计 1、业务流程 2、导入流程
我是 Unity 的新手,所以修复起来可能非常简单。我使用了一个 3D Google SketchUp 模型,我想让玩家环顾模型。 super 简单。 我添加了 3D 平面,添加了相机并更新了设置以支
我需要标记要跳过的某些测试。但是,有些测试是参数化的,我只需要能够跳过某些场景。 我使用 py.test -m "hermes_only" 调用测试或 py.test -m "not hermes_o
我已经开始使用 SpecFlow 并想知道是否可以在规范之间重用场景 基本上我的想法是这样的(我可能从根本上是错误的:)) 我编写了一项功能来验证导航。 功能:导航 I should be able
在编写验证输入表单上的信息的 BDD 场景时,您将如何列出规则。 选项是: 1) 每个规则一个场景 2)场景大纲,每个领域和规则的例子 我们如何说某些不在特定字符集中的无效内容,例如: 鉴于我输入了一
我们如何使用 StoryQ 来测试预期出现异常的场景? 最佳答案 就实际代码而言,在测试代码的 .Then 部分,您需要创建一个 Action 或 Func 来确定正在测试的内容,然后在代码的 .Th
完成快速初学者努力通过点击按钮向场景添加节点。 我知道我可以使用点击手势来获取点击坐标并执行点击测试,然后在点击的 3D 空间中放置一个对象。但是,我想在设备屏幕的中央显示一个球体或十字准线,当点击屏
如何在表格中传递空格? Background: Given the following books |Author |(here several spaces)
我正在尝试从 Eric Haines' Standard Procedural Database (SPD) 渲染“mount”场景,但折射部分就是不想配合。我已经尝试了所有我能想到的方法来修复它。
我是一名优秀的程序员,十分优秀!