- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
假设有以下情况:
[c1, c2, c3] >> child_task
其中所有 c1
、c2
、c3
和 child_task
都是运算符并且具有 task_id
分别等于 id1
、id2
、id3
和 child_id
。
任务 child_task
也是一个 PythonOperator
,具有 provide_context=True
和 python_callable=dummy_func
def dummy_func(**context):
#...
是否有可能在 dummy_func
中检索所有 parent 的 ID(可能通过使用上下文以某种方式浏览 dag)?
本例中的预期结果将是列表 ['id1', 'id2', 'id3']
。
最佳答案
upstream_task_ids
和downstream_task_ids
properties of BaseOperator
仅用于此目的。
from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids
但是请注意,使用此属性
,您只能获得任务的直接(上游/下游)邻居。为了获得所有祖先或后代 任务
,您可以快速编造出古老的图论方法,例如这个 BFS
-类似实现
from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator
def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
ancestor_task_ids: Set[str] = set()
tasks_queue: Queue = Queue()
# determine parent tasks to begin BFS
for task in my_task.upstream_list:
tasks_queue.put(item=task)
# perform BFS
while not tasks_queue.empty():
task: BaseOperator = tasks_queue.get()
ancestor_task_ids.add(element=task.task_id)
for _task in task.upstream_list:
tasks_queue.put(item=_task)
# Convert task_ids to actual tasks
ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
return ancestor_tasks
以上代码段未经测试,但我相信您可以从中获得灵感
引用资料
关于airflow - Apache Airflow - 获取所有父 task_id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54728513/
所以我认为由于下面的代码我遗漏了一些东西,我已经包含了错误消息,我认为任务表没有 task_id,但是当我运行迁移以将 task_id 添加到任务时,它仍然给了我这个错误。 路线 resourc
SELECT a.*, p.*, t.*, r.*, n.* FROM prop_assigns AS a LEFT JOIN properties AS p ON (p.property_id =
如何从 celery 中的 tasks.py 获取任务 ID from __future__ import absolute_import from pig_engine.celery import
假设有以下情况: [c1, c2, c3] >> child_task 其中所有 c1、c2、c3 和 child_task 都是运算符并且具有 task_id 分别等于 id1、id2、id3 和
指定多个任务时,task_ids 如何工作? 在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)
现在我从 async_result 得到一个 task_id 并且必须保存它以便稍后取回。如果我知道 task_id 是由什么组成的,那就更好了,这样我就可以重新计算它而不是从数据库中提取。例如:使用
我找不到任何用我自己的 task_id 设置 task_id 的例子 类似的东西... def testview1(request): for i in xrange(0,1000):
这可能是一个愚蠢的问题,但它让我从 Ruby 背景中难过。 当我尝试打印时,我有一个看起来像这样的对象。 print celery.AsyncResult.task_id >>> 我原以为这里会打印
我的目标是从 django celery 和弦调用中检索所有 task_ids ,以便我可以在以后需要时撤销任务。但是,我无法找出检索任务 ID 的正确方法。我将和弦执行为: c = chord((l
我启动了很多任务,但其中一些任务还没有完成(763 个任务),处于 PENDING 状态,但系统没有处理任何事情...给 celery task_id 重试这个任务是可能的吗? 最佳答案 你不能。您只
在项目中,我尝试轮询一个长时间运行的任务的 task.state 并更新其运行状态。它在开发中有效,但是当我将项目移动到生产服务器上时它就不起作用了。即使我可以看到任务在 flower 上开始,我也一
如果我之前不知道执行了哪个任务,如何提取任务的结果?这是设置:给定以下来源('tasks.py'): from celery import Celery app = Celery('tasks', b
我将 celery.result.AsyncResult 中的 task_id 存储在数据库中,并将其与任务影响的项目相关联。这允许我执行查询以检索与特定项目相关的所有任务的 task_id。 那么在
我可以在外部(使用 http 请求?)将与 dag_id 和 run_id 关联的特定 task_id 标记为成功/失败。 我的任务是在外部系统上长时间运行的任务,我不希望我的任务轮询系统以查找状态.
我有一个端点,它接受一个 ID task/:task_id/ .但是当我尝试访问端点中的 id 时,我收到此错误。 TypeError: get() got multiple values for a
所以,即使是空白的 Airflow 安装,我也有问题。 只要我试着跑 airflow test tutorial print_date 2015-06-01 我收到一个引发的异常,上面写着 Pendi
我正在尝试将 celery 任务中的数据输出到单独的窗口中。我是 JavaScript 和 AJAX 的新手,这就是我当前的问题所在。执行 View 后,将启动 celery 任务并呈现下一个 htm
假设我有一个 easteregg.py 文件: from airflow import DAG from dateutil import parser from datetime import tim
我是一名优秀的程序员,十分优秀!