- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
指定多个任务时,task_ids 如何工作?
在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)。
这是为什么呢?
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow',
'start_date': datetime.now(),
'provide_context': True
}
demo_dag = DAG(dag_id='first', start_date=datetime.now(), schedule_interval='@once',default_args=args)
def push_load_id(**kwargs):
kwargs['ti'].xcom_push(key='load_cycle_id_2',value=22222)
kwargs['ti'].xcom_push(key='load_cycle_id_3',value=44444)
def another_push_load_id(**kwargs):
kwargs['ti'].xcom_push(key='load_cycle_id_2',value=5555)
kwargs['ti'].xcom_push(key='anotherload_cycle_id_3',value=6666)
def pull_load_id(**kwargs):
ti = kwargs['ti'].xcom_pull(key='load_cycle_id_2', task_ids=['another_push_load_id','push_load_id'])
print(ti)
push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)
push_operator >> pull_operator
最佳答案
您的 dags 仅运行 push_load_id
和 pull_load_id
函数。您没有创建使用 another_push_load_id
函数的运算符。
代码的结尾应如下所示:
push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
another_push_operator = PythonOperator(task_id='push_load_id', python_callable= another_push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)
push_operator >> another_push_operator >> pull_operator
关于python - Airflow、XCom 和多个 task_id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53896957/
所以我认为由于下面的代码我遗漏了一些东西,我已经包含了错误消息,我认为任务表没有 task_id,但是当我运行迁移以将 task_id 添加到任务时,它仍然给了我这个错误。 路线 resourc
SELECT a.*, p.*, t.*, r.*, n.* FROM prop_assigns AS a LEFT JOIN properties AS p ON (p.property_id =
如何从 celery 中的 tasks.py 获取任务 ID from __future__ import absolute_import from pig_engine.celery import
假设有以下情况: [c1, c2, c3] >> child_task 其中所有 c1、c2、c3 和 child_task 都是运算符并且具有 task_id 分别等于 id1、id2、id3 和
指定多个任务时,task_ids 如何工作? 在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)
现在我从 async_result 得到一个 task_id 并且必须保存它以便稍后取回。如果我知道 task_id 是由什么组成的,那就更好了,这样我就可以重新计算它而不是从数据库中提取。例如:使用
我找不到任何用我自己的 task_id 设置 task_id 的例子 类似的东西... def testview1(request): for i in xrange(0,1000):
这可能是一个愚蠢的问题,但它让我从 Ruby 背景中难过。 当我尝试打印时,我有一个看起来像这样的对象。 print celery.AsyncResult.task_id >>> 我原以为这里会打印
我的目标是从 django celery 和弦调用中检索所有 task_ids ,以便我可以在以后需要时撤销任务。但是,我无法找出检索任务 ID 的正确方法。我将和弦执行为: c = chord((l
我启动了很多任务,但其中一些任务还没有完成(763 个任务),处于 PENDING 状态,但系统没有处理任何事情...给 celery task_id 重试这个任务是可能的吗? 最佳答案 你不能。您只
在项目中,我尝试轮询一个长时间运行的任务的 task.state 并更新其运行状态。它在开发中有效,但是当我将项目移动到生产服务器上时它就不起作用了。即使我可以看到任务在 flower 上开始,我也一
如果我之前不知道执行了哪个任务,如何提取任务的结果?这是设置:给定以下来源('tasks.py'): from celery import Celery app = Celery('tasks', b
我将 celery.result.AsyncResult 中的 task_id 存储在数据库中,并将其与任务影响的项目相关联。这允许我执行查询以检索与特定项目相关的所有任务的 task_id。 那么在
我可以在外部(使用 http 请求?)将与 dag_id 和 run_id 关联的特定 task_id 标记为成功/失败。 我的任务是在外部系统上长时间运行的任务,我不希望我的任务轮询系统以查找状态.
我有一个端点,它接受一个 ID task/:task_id/ .但是当我尝试访问端点中的 id 时,我收到此错误。 TypeError: get() got multiple values for a
所以,即使是空白的 Airflow 安装,我也有问题。 只要我试着跑 airflow test tutorial print_date 2015-06-01 我收到一个引发的异常,上面写着 Pendi
我正在尝试将 celery 任务中的数据输出到单独的窗口中。我是 JavaScript 和 AJAX 的新手,这就是我当前的问题所在。执行 View 后,将启动 celery 任务并呈现下一个 htm
假设我有一个 easteregg.py 文件: from airflow import DAG from dateutil import parser from datetime import tim
我是一名优秀的程序员,十分优秀!