- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我陷入了相对复杂的 celery 链配置,试图实现以下目标。假设有如下一系列任务:
chain1 = chain(
DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
ParseFile.s(), # parses file, returns list URLs to download
)
现在我想并行下载每个 URL,所以我所做的是:
urls = chain1.get()
download_tasks = map(lambda x: DownloadFile.s(x), urls)
res1 = celery.group(download_tasks)()
res1_data = res1.get()
最后,我想获取从 DownloadFile
返回的每个下载文件(从 ParseFile
返回临时文件名)并通过另一个任务链并行运行它(例如,它将是 group
的 chain
):
chains = []
for tmpfile in res:
chains.append(celery.chain(
foo.s(tmpfile),
bar.s(),
baz.s()
))
res2 = celery.group(*chains)()
res2_data = res2.get()
如果我在正常的 Python 进程(而不是另一个 celery 任务)中运行该方法,该方法效果很好,因为我可以等待 chain1
的结果。 ,然后为每个下载的文件构建下载任务组和新链。
但是,现在我想将所有这些东西包装到另一个 Celery 任务中,方法是将其包装在另一个 @app.task
中。装饰函数,结果发现您无法调用(或者确实不应该从任务内部调用 .get()
来等待另一个任务完成),并且我未能找到“移植”此工作流来运行的解决方案在一个任务里面。我尝试添加 res1
进入chain1
链,但 celery 提示 <GroupResult: ..... > is not JSON serializable
。
有人可以建议一种让它发挥作用的方法吗?谢谢!
最佳答案
确实,在任务中调用 .get()
是不好的。 Celery 的目标是并行执行异步任务,因此您不应该等待结果。
解决问题的一种方法是存储第一次处理的 url 结果(在文件或数据库中)。
我写了一个简短的示例,说明通过将结果写入文件可以做什么。我选择了 json
转储。
假设您的 main
中有一个 url
列表。首先,您使用 chain
的 group
启动异步处理所有这些 url。所有这些任务都将处理 url 并将要下载的 url 列表存储在位于指定 tmp 目录中的文件中。
然后,您还启动 check_dir
任务,该任务将检查目录中是否已写入文件,在本例中,处理每个文件并删除 tmp 目录中的相应文件。
使用我选择的参数,此任务每 30 秒自动重试一次,并且永远不会结束(我假设您有一个重复的作业要执行),因此您可以更改此设置,但它是为了让您了解如何管理。
我将其作为 main
运行,但如果需要,也可以将其包装到另一个 celery 任务中。
app_module.py
from __future__ import absolute_import
from celery import Celery
app = Celery('app')
app.config_from_object("settings")
if __name__ == '__main__':
app.start()
任务.py
from celery import group, chain
from app_module import app
import json
import glob
import os
__all__ = ('download_file',
'unpack_file',
'parse_file',
'foo',
'bar',
'process_downloaded_file',
'check_dir',)
path = "./data/tmp_dir/"
@app.task
def download_file(filename):
return filename
@app.task
def unpack_file(filename):
return "unzipped_" + filename
@app.task
def parse_file(filename):
# Fake parse task storing results in a temp directory
# results are stored in a json and contains the list of urls
with open(path + filename, "wb") as f:
d = {"files" : [filename+"_" + str(i) for i in range(0,5)]}
json.dump(d, f)
return True
@app.task
def foo(filename):
return "foo_" + filename
@app.task
def bar(filename):
return "bar_" + filename
@app.task
def process_downloaded_file(filename):
#process one file in the temp directory and at the end delete the file so it
# is not processed several times
with open(filename, "rb") as f:
d = json.load(f)
g = group(chain(download_file.s(f), foo.s(), bar.s()) for f in d["files"]).apply_async()
os.remove(filename)
return True
@app.task(bind=True)
def check_dir(self, tmp_dir, sleep=30):
#this task checks the tmp directory. If files have been written it processes
#every file in the directory. The task autoretries each *sleep* seconds
for f in glob.glob(tmp_dir + "*"):
process_downloaded_file.delay(f)
self.retry(args=(tmp_dir, sleep), countdown=sleep)
main.py
from celery import group, chain
from tasks import *
path = "./data/tmp_dir/"
urls = ["file1", "file2"]
group(chain(download_file.s(f), unpack_file.s(), parse_file.s()) for f in urls).apply_async()
check_dir.delay(path)
控制台输出:
[2017-02-14 18:10:41,630: INFO/MainProcess] Received task: arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2]
[2017-02-14 18:10:41,632: INFO/MainProcess] Received task: arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827]
[2017-02-14 18:10:41,637: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5]
[2017-02-14 18:10:41,666: INFO/MainProcess] Received task: arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805]
[2017-02-14 18:10:41,674: INFO/MainProcess] Task arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] succeeded in 0.0389260330703s: u'file1'
[2017-02-14 18:10:41,682: INFO/MainProcess] Received task: arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e]
[2017-02-14 18:10:41,689: INFO/MainProcess] Task arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] succeeded in 0.0534016339807s: u'file2'
[2017-02-14 18:10:41,691: INFO/MainProcess] Received task: arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7]
[2017-02-14 18:10:41,696: INFO/MainProcess] Task arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] succeeded in 0.00816849502735s: u'unzipped_file2'
[2017-02-14 18:10:41,704: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207]
[2017-02-14 18:10:41,706: INFO/MainProcess] Task arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] succeeded in 0.00894999306183s: True
[2017-02-14 18:10:41,708: INFO/MainProcess] Task arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] retry: Retry in 30s
[2017-02-14 18:10:41,709: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478]
[2017-02-14 18:10:41,713: INFO/MainProcess] Task arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] succeeded in 0.044072615914s: u'unzipped_file1'
[2017-02-14 18:10:41,714: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] eta:[2017-02-14 17:11:11.692241+00:00]
[2017-02-14 18:10:41,717: INFO/MainProcess] Received task: arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f]
[2017-02-14 18:10:41,720: INFO/MainProcess] Received task: arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104]
[2017-02-14 18:10:41,724: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] succeeded in 0.0153999190079s: True
[2017-02-14 18:10:41,725: INFO/MainProcess] Task arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] succeeded in 0.00395095907152s: True
[2017-02-14 18:10:41,726: INFO/MainProcess] Task arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] succeeded in 0.00449692492839s: u'unzipped_file1_0'
[2017-02-14 18:10:41,727: INFO/MainProcess] Received task: arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea]
[2017-02-14 18:10:41,728: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] succeeded in 0.0129376259865s: True
[2017-02-14 18:10:41,729: INFO/MainProcess] Received task: arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1]
[2017-02-14 18:10:41,731: INFO/MainProcess] Received task: arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed]
[2017-02-14 18:10:41,733: INFO/MainProcess] Task arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] succeeded in 0.003385586082s: u'unzipped_file1_1'
[2017-02-14 18:10:41,734: INFO/MainProcess] Task arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] succeeded in 0.00395720102824s: u'unzipped_file1_2'
[2017-02-14 18:10:41,735: INFO/MainProcess] Received task: arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce]
[2017-02-14 18:10:41,739: INFO/MainProcess] Task arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] succeeded in 0.00272180500906s: u'unzipped_file1_4'
[2017-02-14 18:10:41,740: INFO/MainProcess] Task arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] succeeded in 0.00340146606322s: u'unzipped_file1_3'
[2017-02-14 18:10:41,740: INFO/MainProcess] Received task: arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad]
[2017-02-14 18:10:41,742: INFO/MainProcess] Received task: arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0]
[2017-02-14 18:10:41,745: INFO/MainProcess] Received task: arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655]
[2017-02-14 18:10:41,747: INFO/MainProcess] Task arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] succeeded in 0.00358341098763s: u'unzipped_file2_0'
[2017-02-14 18:10:41,748: INFO/MainProcess] Task arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] succeeded in 0.0044348789379s: u'unzipped_file2_1'
[2017-02-14 18:10:41,749: INFO/MainProcess] Received task: arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd]
[2017-02-14 18:10:41,752: INFO/MainProcess] Received task: arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d]
[2017-02-14 18:10:41,754: INFO/MainProcess] Task arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] succeeded in 0.00349929102231s: u'unzipped_file2_2'
[2017-02-14 18:10:41,755: INFO/MainProcess] Task arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] succeeded in 0.00417044304777s: u'foo_unzipped_file1_0'
[2017-02-14 18:10:41,755: INFO/MainProcess] Received task: arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c]
[2017-02-14 18:10:41,757: INFO/MainProcess] Received task: arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf]
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] succeeded in 0.00325334002264s: u'unzipped_file2_3'
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] succeeded in 0.00384710694198s: u'unzipped_file2_4'
[2017-02-14 18:10:41,761: INFO/MainProcess] Received task: arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc]
[2017-02-14 18:10:41,764: INFO/MainProcess] Received task: arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc]
[2017-02-14 18:10:41,765: INFO/MainProcess] Task arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] succeeded in 0.00316555600148s: u'foo_unzipped_file1_1'
[2017-02-14 18:10:41,766: INFO/MainProcess] Task arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] succeeded in 0.00383736204822s: u'foo_unzipped_file1_2'
[2017-02-14 18:10:41,767: INFO/MainProcess] Received task: arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714]
[2017-02-14 18:10:41,769: INFO/MainProcess] Received task: arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c]
[2017-02-14 18:10:41,771: INFO/MainProcess] Task arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] succeeded in 0.00347809505183s: u'foo_unzipped_file1_3'
[2017-02-14 18:10:41,772: INFO/MainProcess] Task arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] succeeded in 0.00403305899817s: u'foo_unzipped_file1_4'
[2017-02-14 18:10:41,773: INFO/MainProcess] Received task: arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f]
[2017-02-14 18:10:41,775: INFO/MainProcess] Received task: arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e]
[2017-02-14 18:10:41,777: INFO/MainProcess] Task arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] succeeded in 0.00311726506334s: u'foo_unzipped_file2_0'
[2017-02-14 18:10:41,778: INFO/MainProcess] Task arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] succeeded in 0.00378636294045s: u'foo_unzipped_file2_1'
[2017-02-14 18:10:41,778: INFO/MainProcess] Received task: arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78]
[2017-02-14 18:10:41,780: INFO/MainProcess] Received task: arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23]
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] succeeded in 0.00324743904639s: u'foo_unzipped_file2_2'
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] succeeded in 0.00382692192215s: u'bar_foo_unzipped_file1_0'
[2017-02-14 18:10:41,784: INFO/MainProcess] Received task: arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07]
[2017-02-14 18:10:41,787: INFO/MainProcess] Received task: arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171]
[2017-02-14 18:10:41,788: INFO/MainProcess] Task arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] succeeded in 0.00343648903072s: u'foo_unzipped_file2_4'
[2017-02-14 18:10:41,789: INFO/MainProcess] Task arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] succeeded in 0.00413183600176s: u'foo_unzipped_file2_3'
[2017-02-14 18:10:41,790: INFO/MainProcess] Received task: arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947]
[2017-02-14 18:10:41,792: INFO/MainProcess] Received task: arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512]
[2017-02-14 18:10:41,794: INFO/MainProcess] Task arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] succeeded in 0.0031840458978s: u'bar_foo_unzipped_file1_2'
[2017-02-14 18:10:41,795: INFO/MainProcess] Task arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] succeeded in 0.00374374503735s: u'bar_foo_unzipped_file1_1'
[2017-02-14 18:10:41,796: INFO/MainProcess] Received task: arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8]
[2017-02-14 18:10:41,798: INFO/MainProcess] Task arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] succeeded in 0.00241802399978s: u'bar_foo_unzipped_file1_4'
[2017-02-14 18:10:41,798: INFO/MainProcess] Received task: arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d]
[2017-02-14 18:10:41,801: INFO/MainProcess] Received task: arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e]
[2017-02-14 18:10:41,803: INFO/MainProcess] Task arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] succeeded in 0.00308170204517s: u'bar_foo_unzipped_file1_3'
[2017-02-14 18:10:41,804: INFO/MainProcess] Task arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] succeeded in 0.00375492009334s: u'bar_foo_unzipped_file2_0'
[2017-02-14 18:10:41,804: INFO/MainProcess] Received task: arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f]
[2017-02-14 18:10:41,807: INFO/MainProcess] Received task: arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d]
[2017-02-14 18:10:41,808: INFO/MainProcess] Task arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] succeeded in 0.00304232595954s: u'bar_foo_unzipped_file2_2'
[2017-02-14 18:10:41,809: INFO/MainProcess] Task arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] succeeded in 0.00377448496874s: u'bar_foo_unzipped_file2_1'
[2017-02-14 18:10:41,810: INFO/MainProcess] Received task: arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2]
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] succeeded in 0.00181642104872s: u'bar_foo_unzipped_file2_4'
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] succeeded in 0.00239081599284s: u'bar_foo_unzipped_file2_3'
关于python - 在 Celery 链中使用分组结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42206618/
我在网上搜索但没有找到任何合适的文章解释如何使用 javascript 使用 WCF 服务,尤其是 WebScriptEndpoint。 任何人都可以对此给出任何指导吗? 谢谢 最佳答案 这是一篇关于
我正在编写一个将运行 Linux 命令的 C 程序,例如: cat/etc/passwd | grep 列表 |剪切-c 1-5 我没有任何结果 *这里 parent 等待第一个 child (chi
所以我正在尝试处理文件上传,然后将该文件作为二进制文件存储到数据库中。在我存储它之后,我尝试在给定的 URL 上提供文件。我似乎找不到适合这里的方法。我需要使用数据库,因为我使用 Google 应用引
我正在尝试制作一个宏,将下面的公式添加到单元格中,然后将其拖到整个列中并在 H 列中复制相同的公式 我想在 F 和 H 列中输入公式的数据 Range("F1").formula = "=IF(ISE
问题类似于this one ,但我想使用 OperatorPrecedenceParser 解析带有函数应用程序的表达式在 FParsec . 这是我的 AST: type Expression =
我想通过使用 sequelize 和 node.js 将这个查询更改为代码取决于在哪里 select COUNT(gender) as genderCount from customers where
我正在使用GNU bash,版本5.0.3(1)-发行版(x86_64-pc-linux-gnu),我想知道为什么简单的赋值语句会出现语法错误: #/bin/bash var1=/tmp
这里,为什么我的代码在 IE 中不起作用。我的代码适用于所有浏览器。没有问题。但是当我在 IE 上运行我的项目时,它发现错误。 而且我的 jquery 类和 insertadjacentHTMl 也不
我正在尝试更改标签的innerHTML。我无权访问该表单,因此无法编辑 HTML。标签具有的唯一标识符是“for”属性。 这是输入和标签的结构:
我有一个页面,我可以在其中返回用户帖子,可以使用一些 jquery 代码对这些帖子进行即时评论,在发布新评论后,我在帖子下插入新评论以及删除 按钮。问题是 Delete 按钮在新插入的元素上不起作用,
我有一个大约有 20 列的“管道分隔”文件。我只想使用 sha1sum 散列第一列,它是一个数字,如帐号,并按原样返回其余列。 使用 awk 或 sed 执行此操作的最佳方法是什么? Accounti
我需要将以下内容插入到我的表中...我的用户表有五列 id、用户名、密码、名称、条目。 (我还没有提交任何东西到条目中,我稍后会使用 php 来做)但由于某种原因我不断收到这个错误:#1054 - U
所以我试图有一个输入字段,我可以在其中输入任何字符,但然后将输入的值小写,删除任何非字母数字字符,留下“。”而不是空格。 例如,如果我输入: 地球的 70% 是水,-!*#$^^ & 30% 土地 输
我正在尝试做一些我认为非常简单的事情,但出于某种原因我没有得到想要的结果?我是 javascript 的新手,但对 java 有经验,所以我相信我没有使用某种正确的规则。 这是一个获取输入值、检查选择
我想使用 angularjs 从 mysql 数据库加载数据。 这就是应用程序的工作原理;用户登录,他们的用户名存储在 cookie 中。该用户名显示在主页上 我想获取这个值并通过 angularjs
我正在使用 autoLayout,我想在 UITableViewCell 上放置一个 UIlabel,它应该始终位于单元格的右侧和右侧的中心。 这就是我想要实现的目标 所以在这里你可以看到我正在谈论的
我需要与 MySql 等效的 elasticsearch 查询。我的 sql 查询: SELECT DISTINCT t.product_id AS id FROM tbl_sup_price t
我正在实现代码以使用 JSON。 func setup() { if let flickrURL = NSURL(string: "https://api.flickr.com/
我尝试使用for循环声明变量,然后测试cols和rols是否相同。如果是,它将运行递归函数。但是,我在 javascript 中执行 do 时遇到问题。有人可以帮忙吗? 现在,在比较 col.1 和
我举了一个我正在处理的问题的简短示例。 HTML代码: 1 2 3 CSS 代码: .BB a:hover{ color: #000; } .BB > li:after {
我是一名优秀的程序员,十分优秀!