- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
当我启动包含一组任务和一个回调的 chord()
列表时,只有在完成所有任务组后才会调用回调,即使是不在当前和弦。
这里是更好解释的代码:
import time
from celery import Celery, group, chord
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task(name='SHORT_TASK')
def short_task(t):
time.sleep(t)
return t
@app.task(name='FINISH_GROUP')
def finish_group(res, nb):
print("Pipe #{} finished".format(nb))
return True
@app.task
def main(total):
tasks = []
for nb in range(1, total+1):
short_tasks = [short_task.si(i) for i in [0.5, 0.75, 1]]
chord(
group(short_tasks),
finish_group.s(nb)
).apply_async()
例如,我用 5 个项目启动它:
In [5]: main.delay(5)
Out[5]: <AsyncResult: db1f97f0-ff7a-4651-b2f9-11e27a001480>
结果:
[2017-11-06 13:50:38,374: INFO/MainProcess] Received task: tasks.main[6da738b5-4eae-4de4-9ac5-1dc67d210f1d]
[2017-11-06 13:50:38,409: INFO/MainProcess] Received task: SHORT_TASK[9581f9e0-1128-4b87-ae6b-16f238b2337e]
[2017-11-06 13:50:38,411: INFO/MainProcess] Received task: SHORT_TASK[579dc498-3770-4385-a25a-06173fbe639c]
[2017-11-06 13:50:38,412: INFO/MainProcess] Received task: SHORT_TASK[bfafb943-46d8-42e3-941f-b48a9c8e0186]
[2017-11-06 13:50:38,414: INFO/MainProcess] Received task: SHORT_TASK[a1208f06-250f-48ac-b3df-45c4525fe8eb]
[2017-11-06 13:50:38,416: INFO/MainProcess] Received task: SHORT_TASK[86ee7408-9d61-4909-bce8-c42cf691e9c2]
[2017-11-06 13:50:38,416: INFO/MainProcess] Received task: SHORT_TASK[e2bb22c0-1d20-4da7-91d9-45b7ed8bfc6f]
[2017-11-06 13:50:38,419: INFO/MainProcess] Received task: SHORT_TASK[7551199b-4690-45dd-a434-3911861f0093]
[2017-11-06 13:50:38,420: INFO/MainProcess] Received task: SHORT_TASK[362d18f4-2252-4a31-ad21-4a2d192fd22e]
[2017-11-06 13:50:38,421: INFO/MainProcess] Received task: SHORT_TASK[7561c33b-7020-4feb-b054-3919e4ae31c2]
[2017-11-06 13:50:38,423: INFO/MainProcess] Received task: SHORT_TASK[3ac997f5-6d0f-43b6-ab15-a6827a26665f]
[2017-11-06 13:50:38,423: INFO/MainProcess] Received task: SHORT_TASK[8b2ebb3a-293c-4bb8-88a3-5242750a082e]
[2017-11-06 13:50:38,423: INFO/ForkPoolWorker-3] Task tasks.main[6da738b5-4eae-4de4-9ac5-1dc67d210f1d] succeeded in 0.048569423001026735s: None
[2017-11-06 13:50:38,424: INFO/MainProcess] Received task: SHORT_TASK[efd68688-ec4a-418d-83b9-a55fd6cc1541]
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[4e82540e-f935-4288-828f-c6f66f84139a]
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[a94e4ec4-adcb-4a0f-b184-b36650105ed5]
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[0d4b5e24-7aaa-4eb8-8e54-70e769dfdb39]
[2017-11-06 13:50:38,918: INFO/ForkPoolWorker-2] Task SHORT_TASK[9581f9e0-1128-4b87-ae6b-16f238b2337e] succeeded in 0.5051485379808582s: 0.5
[2017-11-06 13:50:38,926: INFO/ForkPoolWorker-3] Task SHORT_TASK[a1208f06-250f-48ac-b3df-45c4525fe8eb] succeeded in 0.5012409449846018s: 0.5
[2017-11-06 13:50:39,165: INFO/ForkPoolWorker-1] Task SHORT_TASK[579dc498-3770-4385-a25a-06173fbe639c] succeeded in 0.7524393269850407s: 0.75
[2017-11-06 13:50:39,445: INFO/ForkPoolWorker-4] Task SHORT_TASK[bfafb943-46d8-42e3-941f-b48a9c8e0186] succeeded in 1.031865488999756s: 1
[2017-11-06 13:50:39,448: INFO/MainProcess] Received task: FINISH_GROUP[4506631f-f9cc-4e9e-a9e7-9a59c8f7c998]
[2017-11-06 13:50:39,668: INFO/ForkPoolWorker-1] Task SHORT_TASK[7551199b-4690-45dd-a434-3911861f0093] succeeded in 0.501304400007939s: 0.5
[2017-11-06 13:50:39,672: INFO/ForkPoolWorker-2] Task SHORT_TASK[86ee7408-9d61-4909-bce8-c42cf691e9c2] succeeded in 0.7513346789928619s: 0.75
[2017-11-06 13:50:39,932: INFO/ForkPoolWorker-3] Task SHORT_TASK[e2bb22c0-1d20-4da7-91d9-45b7ed8bfc6f] succeeded in 1.0058077470166609s: 1
[2017-11-06 13:50:39,936: INFO/MainProcess] Received task: FINISH_GROUP[f143d581-799b-45ff-9e11-edc0bb88006a]
[2017-11-06 13:50:40,175: INFO/ForkPoolWorker-2] Task SHORT_TASK[3ac997f5-6d0f-43b6-ab15-a6827a26665f] succeeded in 0.502920284983702s: 0.5
[2017-11-06 13:50:40,198: INFO/ForkPoolWorker-4] Task SHORT_TASK[362d18f4-2252-4a31-ad21-4a2d192fd22e] succeeded in 0.752579735009931s: 0.75
[2017-11-06 13:50:40,685: INFO/ForkPoolWorker-3] Task SHORT_TASK[8b2ebb3a-293c-4bb8-88a3-5242750a082e] succeeded in 0.7518302960088477s: 0.75
[2017-11-06 13:50:40,701: INFO/ForkPoolWorker-4] Task SHORT_TASK[4e82540e-f935-4288-828f-c6f66f84139a] succeeded in 0.5013290829956532s: 0.5
[2017-11-06 13:50:40,715: INFO/ForkPoolWorker-1] Task SHORT_TASK[7561c33b-7020-4feb-b054-3919e4ae31c2] succeeded in 1.0464465210097842s: 1
[2017-11-06 13:50:40,715: WARNING/ForkPoolWorker-1] Pipe #1 finished
[2017-11-06 13:50:40,716: INFO/ForkPoolWorker-1] Task FINISH_GROUP[4506631f-f9cc-4e9e-a9e7-9a59c8f7c998] succeeded in 0.000513697013957426s: True
[2017-11-06 13:50:40,716: WARNING/ForkPoolWorker-1] Pipe #2 finished
[2017-11-06 13:50:40,717: INFO/ForkPoolWorker-1] Task FINISH_GROUP[f143d581-799b-45ff-9e11-edc0bb88006a] succeeded in 0.0003622350050136447s: True
[2017-11-06 13:50:40,718: INFO/MainProcess] Received task: FINISH_GROUP[fc9be8c2-99f7-46b2-a810-47023e0a072a]
[2017-11-06 13:50:40,718: WARNING/ForkPoolWorker-1] Pipe #3 finished
[2017-11-06 13:50:40,718: INFO/ForkPoolWorker-1] Task FINISH_GROUP[fc9be8c2-99f7-46b2-a810-47023e0a072a] succeeded in 0.00038264598697423935s: True
[2017-11-06 13:50:41,215: INFO/ForkPoolWorker-2] Task SHORT_TASK[efd68688-ec4a-418d-83b9-a55fd6cc1541] succeeded in 1.0379863310081419s: 1
[2017-11-06 13:50:41,219: INFO/MainProcess] Received task: FINISH_GROUP[6a4dc66e-2232-4bad-9d85-9fbc63b8b847]
[2017-11-06 13:50:41,221: WARNING/ForkPoolWorker-2] Pipe #4 finished
[2017-11-06 13:50:41,222: INFO/ForkPoolWorker-2] Task FINISH_GROUP[6a4dc66e-2232-4bad-9d85-9fbc63b8b847] succeeded in 0.0018843600118998438s: True
[2017-11-06 13:50:41,440: INFO/ForkPoolWorker-3] Task SHORT_TASK[a94e4ec4-adcb-4a0f-b184-b36650105ed5] succeeded in 0.7531412789830938s: 0.75
[2017-11-06 13:50:41,708: INFO/ForkPoolWorker-4] Task SHORT_TASK[0d4b5e24-7aaa-4eb8-8e54-70e769dfdb39] succeeded in 1.005872479028767s: 1
[2017-11-06 13:50:41,711: INFO/MainProcess] Received task: FINISH_GROUP[388ee1c3-b80c-41af-bbfd-29b968e90aff]
[2017-11-06 13:50:41,712: WARNING/ForkPoolWorker-3] Pipe #5 finished
[2017-11-06 13:50:41,712: INFO/ForkPoolWorker-3] Task FINISH_GROUP[388ee1c3-b80c-41af-bbfd-29b968e90aff] succeeded in 0.0005500270053744316s: True
我启动了一个并发度为 4(prefork)的 Celery。
我们可以看到一开始收到了 15 个 SHORT_TASK,然后 worker 执行它,然后才调用 FINISH_GROUP 任务。
是否可以在他们相关的 SHORT_TASK 完成后启动 FINISH_GROUP 任务,而不是等待所有其他不相关的 SHORT_TASK?
也许我的 Canvas 不正确,或者是错误的 Celery 配置,我不知道。
感谢您的帮助!
最佳答案
你的测试有偏差,因为你只使用了一个 worker,time.sleep()
会阻塞那个 worker。这意味着即使并发为 4,它也不再处理任务。
Is it possible to launch the FINISH_GROUP task just after their associated SHORT_TASK have been finished, and not wait all the others non-related SHORT_TASK please ?
目前您没有等待其他short_task
完成,它们都安排在同一时间执行。由于您正在使用 sleep ,因此一旦所有 short_task
各自的和弦结束,finish_group
就会被调用。
您当前的执行方式如下:
| chord 1 | chord 2 | chord 3 |
|--------------|--------------|--------------|
| short_task 1 | | | |
| | short_task 1 | | |
| | | short_task 1 | |
| short_task 2 | | | |
| | short_task 2 | | |
| | | short_task 2 | |
| short_task 3 | | | v
| | short_task 3 | | execution order
| | | short_task 3 |
| finish_group | | |
| | finish_group | |
| | | finish_group |
如果删除 sleep ,添加更多工作人员,或使用 gevent。它应该看起来像这样:
| chord 1 | chord 2 | chord 3 |
|------------------|------------------|------------- ----|
| short_task 1,2,3 | short_task 1,2,3 | short_task 1,2,3 |
| finish_group | finish_group | finish_group |
而且您应该会看到在同一行上的任务将以不同的顺序出现在日志中(具体取决于哪个工作人员先处理)。但是 finish_group
仍然是最后一个。
请注意,使用 chord
时,不需要group
您的任务
chord(
short_tasks,
finish_group.s(nb)
)
相同的代码,但使用了 gevent:
import gevent
from celery import Celery, group, chord, chain
app = Celery('tasks', broker='redis://localhost/4', backend='redis://localhost/5')
@app.task()
def short_task(nb, i):
print('TEST: start short_task({}, {})'.format(nb, i))
gevent.sleep(1)
print('TEST: end short_task({}, {})'.format(nb, i))
return i
@app.task(name='FINISH_GROUP')
def finish_group(results, nb):
print('TEST: finish_group({}) -> {}'.format(nb, results))
@app.task
def main(total):
for nb in range(1, total+1):
short_tasks = [short_task.si(nb, i) for i in range(3)]
chord(short_tasks, finish_group.s(nb)).apply_async()
启动:
$ celery worker -A celery_test --loglevel=debug --concurrency=20 -P gevent 2>&1 | grep TEST
由于并行执行,输出将是困惑的。
[2017-11-06 16:40:08,085] TEST: start short_task(1, 0)
[2017-11-06 16:40:08,088] TEST: start short_task(1, 1)
[2017-11-06 16:40:08,091] TEST: start short_task(1, 2)
[2017-11-06 16:40:08,092] TEST: start short_task(2, 0)
[2017-11-06 16:40:08,094] TEST: start short_task(2, 1)
[2017-11-06 16:40:08,096] TEST: start short_task(2, 2)
[2017-11-06 16:40:08,100] TEST: start short_task(3, 0)
[2017-11-06 16:40:08,101] TEST: start short_task(3, 1)
[2017-11-06 16:40:08,103] TEST: start short_task(3, 2)
# ^ all short_task have been started at the same time
[2017-11-06 16:40:09,085] TEST: end short_task(1, 0)
[2017-11-06 16:40:09,089] TEST: end short_task(1, 1)
[2017-11-06 16:40:09,093] TEST: end short_task(1, 2)
[2017-11-06 16:40:09,106] TEST: end short_task(2, 0)
[2017-11-06 16:40:09,106] TEST: end short_task(2, 1)
[2017-11-06 16:40:09,107] TEST: end short_task(2, 2)
[2017-11-06 16:40:09,107] TEST: end short_task(3, 0)
[2017-11-06 16:40:09,108] TEST: end short_task(3, 1)
[2017-11-06 16:40:09,108] TEST: end short_task(3, 2)
# ^ total execution is only 1 second since 9 greenlet have slept together
[2017-11-06 16:40:09,115] TEST: finish_group(1) -> [0, 1, 2]
[2017-11-06 16:40:09,126] TEST: finish_group(2) -> [2, 1, 0]
[2017-11-06 16:40:09,128] TEST: finish_group(3) -> [0, 1, 2]
# ^ order of results are mixed depending of which greenlet finished first
关于python - celery : launch chord callback after its associated body,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47137592/
运行真机调试时,出现如下问题。 Could not launch “MY-APP-NAME” iPhone has denied the launch request. Internal launch
每次单击“运行方式”按钮时,都会收到以下消息。 “选品无法推出,近期也没有推出。” 如果我选择“运行方式”按钮附近的下拉箭头,然后选择“新配置”选项(这是我设置为使用本地主机启动的配置)......它
启动过去有效的python文件时出现上述错误。我想我知道这是怎么发生的,但不确定如何解决。我有两台计算机双启动不同的操作系统,并使用保管箱保持它们之间的同步。过去,我以这种方式将我的Eclipse工作
我尝试使用 Eclipse 构建并运行“Hello world”,但它无法运行。我不使用 Android 或其他任何纯 C++。 http://i57.tinypic.com/x51h5h.png 最
这个问题在这里已经有了答案: "Selection cannot be launched and there are no recent launches” when Eclipse for Andr
在 Kotlin 中有多种启动协程的方法。我发现了几个示例,其中 GlobalScope 和 CoroutineScope 被使用。但是后者是在启动协程时直接创建的: 使用 GlobalScope :
我正在使用 10gen 版本在 OS X 上安装 MongoDB。 但是their installation tutorial事实证明对我来说有点稀缺。 到目前为止,我已经找到了安装为 launch
我在互联网上到处寻找,并尝试了论坛所说的一切,但没有任何效果。这个错误不断出现。我试过运行我的java项目(不适用于android),即使下拉运行按钮不起作用,因为它说“不适用”。 最佳答案 Ecli
我理解 std::async 对以下参数的作用。 std::launch::async std::launch::deferred 然而,std::launch::async | 会发生什么? std
我正在尝试使用 Eclipse 打开一个解压缩的 Android 项目,但每次我尝试运行或调试时都会显示此错误:“无法启动选择并且最近没有启动”。 所以我查找并尝试从现有源创建一个新项目,但打开时出现
我删除了 React Native 创建的默认启动屏幕文件,而是创建了一个启动图像。当我在模拟器中运行应用程序时,会显示启动图像,但是当我存档项目并通过 Testflight 打开它时,它仍然显示旧的
从深层链接(具有应用特定方案的 URL)启动我的 iOS 应用时,我在日志中收到此错误: lsd[738] : LaunchServices: application launch failed -
当我使用 visual studio code 调试 nodejs 应用程序时。visual studio code 告诉我 request 'launch': cannot launch targe
我创建了 Cocoa 应用程序,它是启动代理的类型。为了分发这个,我还创建了一个包,它安装该应用程序并将 launchagnet plist 文件复制到/Libraries/LaunchAgents
大家好,我今天在开发的 Java Swing 应用程序中发现了一个很大的疑问。 我发现的问题是launch() 当我在本地尝试我开发的应用程序时,它运行时没有任何延迟,但是当我使用 jnlp 从我的
我正在尝试使用 JavaFX 设置视频在 JFrame 中播放。 但是一旦我调用 launch() 函数来设置 JavaFX 并播放视频,我就会收到以下错误 应用程序构造函数中出现异常线程“AWT-E
我试图在 Android Studio 的外部工具中设置 javah 命令,但我得到了 IllegalArgumentException: Not a valid class name: Files\
我正在尝试创建一个简单的应用程序。我创建了一个新的 AVD。我启动了它。等了一个小时,模拟器只显示ANDROID。我卡住了。有人可以帮忙吗。 一个小时的样子请不要标记重复。我尽可能多地搜索和尝试。 我
进程文件: launch or launch.exe 进程名称: Vantarakis Launchh 进程类别:存在安全风险的进程 英文描述: launch.exe is a part o
将 google chrome 更新到最新版本 89.0.4389.82 (Official Build) (64-bit) 后开始出现错误。操作系统:Windows 10 错误: Puppeteer
我是一名优秀的程序员,十分优秀!