- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我陷入了相对复杂的 celery 链配置,试图实现以下目标。假设有如下一系列任务:
chain1 = chain(
DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
ParseFile.s(), # parses file, returns list URLs to download
)
现在我想并行下载每个 URL,所以我所做的是:
urls = chain1.get()
download_tasks = map(lambda x: DownloadFile.s(x), urls)
res1 = celery.group(download_tasks)()
res1_data = res1.get()
最后,我想获取从 DownloadFile
返回的每个下载文件(从 ParseFile
返回临时文件名)并通过另一个任务链并行运行它(例如,它将是 group
的 chain
):
chains = []
for tmpfile in res:
chains.append(celery.chain(
foo.s(tmpfile),
bar.s(),
baz.s()
))
res2 = celery.group(*chains)()
res2_data = res2.get()
如果我在正常的 Python 进程(而不是另一个 celery 任务)中运行该方法,该方法效果很好,因为我可以等待 chain1
的结果。 ,然后为每个下载的文件构建下载任务组和新链。
但是,现在我想将所有这些东西包装到另一个 Celery 任务中,方法是将其包装在另一个 @app.task
中。装饰函数,结果发现您无法调用(或者确实不应该从任务内部调用 .get()
来等待另一个任务完成),并且我未能找到“移植”此工作流来运行的解决方案在一个任务里面。我尝试添加 res1
进入chain1
链,但 celery 提示 <GroupResult: ..... > is not JSON serializable
。
有人可以建议一种让它发挥作用的方法吗?谢谢!
最佳答案
确实,在任务中调用 .get()
是不好的。 Celery 的目标是并行执行异步任务,因此您不应该等待结果。
解决问题的一种方法是存储第一次处理的 url 结果(在文件或数据库中)。
我写了一个简短的示例,说明通过将结果写入文件可以做什么。我选择了 json
转储。
假设您的 main
中有一个 url
列表。首先,您使用 chain
的 group
启动异步处理所有这些 url。所有这些任务都将处理 url 并将要下载的 url 列表存储在位于指定 tmp 目录中的文件中。
然后,您还启动 check_dir
任务,该任务将检查目录中是否已写入文件,在本例中,处理每个文件并删除 tmp 目录中的相应文件。
使用我选择的参数,此任务每 30 秒自动重试一次,并且永远不会结束(我假设您有一个重复的作业要执行),因此您可以更改此设置,但它是为了让您了解如何管理。
我将其作为 main
运行,但如果需要,也可以将其包装到另一个 celery 任务中。
app_module.py
from __future__ import absolute_import
from celery import Celery
app = Celery('app')
app.config_from_object("settings")
if __name__ == '__main__':
app.start()
任务.py
from celery import group, chain
from app_module import app
import json
import glob
import os
__all__ = ('download_file',
'unpack_file',
'parse_file',
'foo',
'bar',
'process_downloaded_file',
'check_dir',)
path = "./data/tmp_dir/"
@app.task
def download_file(filename):
return filename
@app.task
def unpack_file(filename):
return "unzipped_" + filename
@app.task
def parse_file(filename):
# Fake parse task storing results in a temp directory
# results are stored in a json and contains the list of urls
with open(path + filename, "wb") as f:
d = {"files" : [filename+"_" + str(i) for i in range(0,5)]}
json.dump(d, f)
return True
@app.task
def foo(filename):
return "foo_" + filename
@app.task
def bar(filename):
return "bar_" + filename
@app.task
def process_downloaded_file(filename):
#process one file in the temp directory and at the end delete the file so it
# is not processed several times
with open(filename, "rb") as f:
d = json.load(f)
g = group(chain(download_file.s(f), foo.s(), bar.s()) for f in d["files"]).apply_async()
os.remove(filename)
return True
@app.task(bind=True)
def check_dir(self, tmp_dir, sleep=30):
#this task checks the tmp directory. If files have been written it processes
#every file in the directory. The task autoretries each *sleep* seconds
for f in glob.glob(tmp_dir + "*"):
process_downloaded_file.delay(f)
self.retry(args=(tmp_dir, sleep), countdown=sleep)
main.py
from celery import group, chain
from tasks import *
path = "./data/tmp_dir/"
urls = ["file1", "file2"]
group(chain(download_file.s(f), unpack_file.s(), parse_file.s()) for f in urls).apply_async()
check_dir.delay(path)
控制台输出:
[2017-02-14 18:10:41,630: INFO/MainProcess] Received task: arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2]
[2017-02-14 18:10:41,632: INFO/MainProcess] Received task: arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827]
[2017-02-14 18:10:41,637: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5]
[2017-02-14 18:10:41,666: INFO/MainProcess] Received task: arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805]
[2017-02-14 18:10:41,674: INFO/MainProcess] Task arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] succeeded in 0.0389260330703s: u'file1'
[2017-02-14 18:10:41,682: INFO/MainProcess] Received task: arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e]
[2017-02-14 18:10:41,689: INFO/MainProcess] Task arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] succeeded in 0.0534016339807s: u'file2'
[2017-02-14 18:10:41,691: INFO/MainProcess] Received task: arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7]
[2017-02-14 18:10:41,696: INFO/MainProcess] Task arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] succeeded in 0.00816849502735s: u'unzipped_file2'
[2017-02-14 18:10:41,704: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207]
[2017-02-14 18:10:41,706: INFO/MainProcess] Task arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] succeeded in 0.00894999306183s: True
[2017-02-14 18:10:41,708: INFO/MainProcess] Task arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] retry: Retry in 30s
[2017-02-14 18:10:41,709: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478]
[2017-02-14 18:10:41,713: INFO/MainProcess] Task arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] succeeded in 0.044072615914s: u'unzipped_file1'
[2017-02-14 18:10:41,714: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] eta:[2017-02-14 17:11:11.692241+00:00]
[2017-02-14 18:10:41,717: INFO/MainProcess] Received task: arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f]
[2017-02-14 18:10:41,720: INFO/MainProcess] Received task: arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104]
[2017-02-14 18:10:41,724: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] succeeded in 0.0153999190079s: True
[2017-02-14 18:10:41,725: INFO/MainProcess] Task arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] succeeded in 0.00395095907152s: True
[2017-02-14 18:10:41,726: INFO/MainProcess] Task arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] succeeded in 0.00449692492839s: u'unzipped_file1_0'
[2017-02-14 18:10:41,727: INFO/MainProcess] Received task: arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea]
[2017-02-14 18:10:41,728: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] succeeded in 0.0129376259865s: True
[2017-02-14 18:10:41,729: INFO/MainProcess] Received task: arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1]
[2017-02-14 18:10:41,731: INFO/MainProcess] Received task: arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed]
[2017-02-14 18:10:41,733: INFO/MainProcess] Task arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] succeeded in 0.003385586082s: u'unzipped_file1_1'
[2017-02-14 18:10:41,734: INFO/MainProcess] Task arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] succeeded in 0.00395720102824s: u'unzipped_file1_2'
[2017-02-14 18:10:41,735: INFO/MainProcess] Received task: arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce]
[2017-02-14 18:10:41,739: INFO/MainProcess] Task arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] succeeded in 0.00272180500906s: u'unzipped_file1_4'
[2017-02-14 18:10:41,740: INFO/MainProcess] Task arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] succeeded in 0.00340146606322s: u'unzipped_file1_3'
[2017-02-14 18:10:41,740: INFO/MainProcess] Received task: arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad]
[2017-02-14 18:10:41,742: INFO/MainProcess] Received task: arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0]
[2017-02-14 18:10:41,745: INFO/MainProcess] Received task: arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655]
[2017-02-14 18:10:41,747: INFO/MainProcess] Task arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] succeeded in 0.00358341098763s: u'unzipped_file2_0'
[2017-02-14 18:10:41,748: INFO/MainProcess] Task arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] succeeded in 0.0044348789379s: u'unzipped_file2_1'
[2017-02-14 18:10:41,749: INFO/MainProcess] Received task: arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd]
[2017-02-14 18:10:41,752: INFO/MainProcess] Received task: arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d]
[2017-02-14 18:10:41,754: INFO/MainProcess] Task arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] succeeded in 0.00349929102231s: u'unzipped_file2_2'
[2017-02-14 18:10:41,755: INFO/MainProcess] Task arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] succeeded in 0.00417044304777s: u'foo_unzipped_file1_0'
[2017-02-14 18:10:41,755: INFO/MainProcess] Received task: arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c]
[2017-02-14 18:10:41,757: INFO/MainProcess] Received task: arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf]
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] succeeded in 0.00325334002264s: u'unzipped_file2_3'
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] succeeded in 0.00384710694198s: u'unzipped_file2_4'
[2017-02-14 18:10:41,761: INFO/MainProcess] Received task: arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc]
[2017-02-14 18:10:41,764: INFO/MainProcess] Received task: arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc]
[2017-02-14 18:10:41,765: INFO/MainProcess] Task arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] succeeded in 0.00316555600148s: u'foo_unzipped_file1_1'
[2017-02-14 18:10:41,766: INFO/MainProcess] Task arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] succeeded in 0.00383736204822s: u'foo_unzipped_file1_2'
[2017-02-14 18:10:41,767: INFO/MainProcess] Received task: arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714]
[2017-02-14 18:10:41,769: INFO/MainProcess] Received task: arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c]
[2017-02-14 18:10:41,771: INFO/MainProcess] Task arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] succeeded in 0.00347809505183s: u'foo_unzipped_file1_3'
[2017-02-14 18:10:41,772: INFO/MainProcess] Task arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] succeeded in 0.00403305899817s: u'foo_unzipped_file1_4'
[2017-02-14 18:10:41,773: INFO/MainProcess] Received task: arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f]
[2017-02-14 18:10:41,775: INFO/MainProcess] Received task: arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e]
[2017-02-14 18:10:41,777: INFO/MainProcess] Task arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] succeeded in 0.00311726506334s: u'foo_unzipped_file2_0'
[2017-02-14 18:10:41,778: INFO/MainProcess] Task arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] succeeded in 0.00378636294045s: u'foo_unzipped_file2_1'
[2017-02-14 18:10:41,778: INFO/MainProcess] Received task: arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78]
[2017-02-14 18:10:41,780: INFO/MainProcess] Received task: arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23]
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] succeeded in 0.00324743904639s: u'foo_unzipped_file2_2'
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] succeeded in 0.00382692192215s: u'bar_foo_unzipped_file1_0'
[2017-02-14 18:10:41,784: INFO/MainProcess] Received task: arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07]
[2017-02-14 18:10:41,787: INFO/MainProcess] Received task: arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171]
[2017-02-14 18:10:41,788: INFO/MainProcess] Task arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] succeeded in 0.00343648903072s: u'foo_unzipped_file2_4'
[2017-02-14 18:10:41,789: INFO/MainProcess] Task arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] succeeded in 0.00413183600176s: u'foo_unzipped_file2_3'
[2017-02-14 18:10:41,790: INFO/MainProcess] Received task: arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947]
[2017-02-14 18:10:41,792: INFO/MainProcess] Received task: arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512]
[2017-02-14 18:10:41,794: INFO/MainProcess] Task arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] succeeded in 0.0031840458978s: u'bar_foo_unzipped_file1_2'
[2017-02-14 18:10:41,795: INFO/MainProcess] Task arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] succeeded in 0.00374374503735s: u'bar_foo_unzipped_file1_1'
[2017-02-14 18:10:41,796: INFO/MainProcess] Received task: arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8]
[2017-02-14 18:10:41,798: INFO/MainProcess] Task arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] succeeded in 0.00241802399978s: u'bar_foo_unzipped_file1_4'
[2017-02-14 18:10:41,798: INFO/MainProcess] Received task: arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d]
[2017-02-14 18:10:41,801: INFO/MainProcess] Received task: arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e]
[2017-02-14 18:10:41,803: INFO/MainProcess] Task arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] succeeded in 0.00308170204517s: u'bar_foo_unzipped_file1_3'
[2017-02-14 18:10:41,804: INFO/MainProcess] Task arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] succeeded in 0.00375492009334s: u'bar_foo_unzipped_file2_0'
[2017-02-14 18:10:41,804: INFO/MainProcess] Received task: arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f]
[2017-02-14 18:10:41,807: INFO/MainProcess] Received task: arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d]
[2017-02-14 18:10:41,808: INFO/MainProcess] Task arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] succeeded in 0.00304232595954s: u'bar_foo_unzipped_file2_2'
[2017-02-14 18:10:41,809: INFO/MainProcess] Task arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] succeeded in 0.00377448496874s: u'bar_foo_unzipped_file2_1'
[2017-02-14 18:10:41,810: INFO/MainProcess] Received task: arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2]
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] succeeded in 0.00181642104872s: u'bar_foo_unzipped_file2_4'
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] succeeded in 0.00239081599284s: u'bar_foo_unzipped_file2_3'
关于python - 在 Celery 链中使用分组结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42206618/
您好,我正在处理 BIRT 报告。我有一个查询,我必须对父级的重复数据进行分组,但子级也不能分组! 在我的查询中: item 是父项,item_ledger_entry 是子项。我有来自 item.N
我正在使用 GA API。 这是针对 MCF 目标报告(底部)的标准目标完成指标表(顶部) 看一下这个: 总数加起来 (12,238),但看看按 channel 分组的分割有多么不同!我以为这些会很接
我正在开发一个流量计数器,我想获得 IP 和重复计数,但是如何? 就像是 :select ip, count(ip) from Redirect 返回 : null total ip count 重定
我尝试编写一个正则表达式来匹配条件表达式,例如: a!=2 1+2=2+a 我尝试提取运算符。我当前的正则表达式是“.+([!=<>]+).+” 但问题是匹配器总是尝试匹配组中可能的最短字符串
在 MS Transact SQL 中,假设我有一个这样的表(订单): Order Date Order Total Customer # 09/30/2008 8
我想按 m.ID 分组,并对每个 m.id 求和 (pm.amount_construction* prod.anzahl) 实际上我有以下结果: Meterial_id | amount_const
我想根据多列中的值对值进行分组。这是一个例子: 我想得到输出: {{-30,-50,20},{-20,30,60},{-30,NULL or other value, 20}} 我设法到达: SELE
我正在尝试找出运行此查询的最佳方式。我基本上需要返回在我们的系统中只下了一个订单的客户的“登录”字段列表(登录字段基本上是客户 ID/ key )。 我们系统的一些背景...... 客户在同一日期下的
给定以下mysql结果集: id code name importance '1234', 'ID-CS-B', 'Chocolate Sauce'
大家好,我的数据框中有以下列: LC_REF 1 DT 16 2C 2 DT 16 2C 3 DT 16 2C 1 DT 16 3C 6 DT 16 3C 3
我有这样的 mongoDB 集合 { "_id" : "EkKTRrpH4FY9AuRLj", "stage" : 10, }, { "_id" : "EkKTRrpH4FY9
假设我有一组数据对,其中 index 0 是值,index 1 是类型: input = [ ('11013331', 'KAT'), ('9085267',
java中用stream进行去重,排序,分组 一、distinct 1. 八大基本数据类型 List collect = ListUtil.of(1, 2, 3, 1, 2).stream().fil
基本上,我从 TABLE_A 中的这个开始 France - 100 France - 200 France - 300 Mexico - 50 Mexico - 50 Mexico - 56 Pol
我希望这个正则表达式 ([A-Z]+)$ 将选择此示例中的最后一次出现: AB.012.00.022ABC-1 AB.013.00.022AB-1 AB.014.00.022ABAB-1 但我没有匹配
我创建了一个数据透视表,但数据没有组合在一起。 任何人都可以帮助我获得所需的格式吗? 我为获取数据透视表而编写的查询: DECLARE @cols AS NVARCHAR(MAX), -- f
我想按时间段(月,周,日,小时,...)选择计数和分组。例如,我想选择行数并将它们按 24 小时分组。 我的表创建如下。日期是时间戳。 CREATE TABLE MSG ( MSG_ID dec
在 SQL Server 2005 中,我有一个包含如下数据的表: WTN------------Date 555-111-1212 2009-01-01 555-111-1212 2009-
题 假设我有 k 个标量列,如果它们沿着每列彼此在一定距离内,我想对它们进行分组。 假设简单 k 是 2 并且它们是我唯一的列。 pd.DataFrame(list(zip(sorted(choice
问题 在以下数据框中 df : import random import pandas as pd random.seed(999) sz = 50 qty = {'one': 1, 'two': 2
我是一名优秀的程序员,十分优秀!