gpt4 book ai didi

python - Celery:异步检索上一个任务的部分结果

转载 作者:行者123 更新时间:2023-12-01 04:04:39 25 4
gpt4 key购买 nike

我想链接我的 ETL 工作流程,由此 Load 任务可以异步传输来自 ExtractTransform 任务的部分结果,而无需等待 ExtractTransform 完成。 celery 可以吗?

我正在考虑的两种方法:

方法 1

创建一个 ETLTask,其中 LoadTask(以某种方式)不断从 ETLTask 获取部分结果并将其出列(本质上分离生产者和消费者)。我无法从 AsyncResult 看出如果可能的话。听起来我只是想走独立的生产者和消费者的路线,我不知道如何在 Celery 中做到这一点。

class ExtractTransformTask(Task):

def long_running_extract_transform(self):
pass

def run(self):
return self.long_running_extract_transform()

class LoadTask(Task):

def long_running_load(self):
pass

def run(self, results):
self.long_running_load(results)

class ETLTask(Task):

def run(self):
et_result = ExtractTransformTask.delay()
# while et_result PENDING or SUCCESS
# dequeue current results and load with LoadTask instance

方法 2

分块提取源数据并创建多个加载任务。

最佳答案

使用方法 2 的解决方案。

class ExtractTransformMixin(object):

def long_running_extract_transform(self, chunkify=False):
pass

class LoadTask(Task):

def long_running_load(self):
pass

def run(self, results):
self.long_running_load(results)

class ETLTask(ExtractTransformMixin, Task):

def run(self):
load_results = ResultSet([])
for chunk in long_running_extract_transform(chunkify=True):
load_results.add(LoadTask().delay(chunk))
return load_results

关于python - Celery:异步检索上一个任务的部分结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35846342/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com