gpt4 book ai didi

python - Apache Airflow Xcom 从动态任务名称中提取

转载 作者:太空宇宙 更新时间:2023-11-04 02:14:14 27 4
gpt4 key购买 nike

我已经在 DAG(Bash 和 Docker Operators)中成功创建了动态任务,但是我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。

for i in range(0, max_tasks):
task_scp_queue = BashOperator(task_id="scp_queue_task_{}".format(i), bash_command="""python foo""", retries=3, dag=dag, pool="scp_queue_pool", queue="foo", provide_context=True, xcom_push=True) # Pull the manifest ID from the previous task via xcom'

task_process_queue = DockerOperator(task_id="process_task_{}".format(i), command="""python foo --queue-name={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), retries=3, dag=dag, pool="process_pool", api_version="auto", image="foo", queue="foo", execution_timeout=timedelta(minutes=5))
task_manifest = DockerOperator(api_version="auto", task_id="manifest_task_{}".format(i), image="foo", retries=3, dag=dag, command=""" python --manifestid={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), pool="manfiest_pool", queue="d_parser")

task_psql_queue.set_downstream(task_scp_queue)
task_process_queue.set_upstream(task_scp_queue)
task_manifest.set_upstream(task_process_queue)

如您所见,我尝试仅在 Jinja 模板中使用 Python 格式字符串来传递其中的 i 变量,但这不起作用。

我也尝试过使用“task.task_id”,并创建一个仅包含 task_id 的新字符串,但这也不起作用。

编辑:

现在命令看起来像这样

command="""python foo \ 
--queue-name="{{
task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}"
""".format(i)

我的 Airflow 调试日志看起来像

Using Master Queue: process_{ 
task_instance.xcom_pull(task_ids='scp_queue_task_31') }

因此正在填充字符串值,但未执行 xcom_pull。

最佳答案

我很困惑这怎么行不通。您收到的错误日志会很有帮助。

简而言之,如果 max_tasks=2,您所做的看起来不错,您将获得:

task_psql_queue.taskid --> scp_queue_task_0 >> process_task_0 >> manifest_task_0
\-> scp_queue_task_1 >> process_task_1 >> manifest_task_1

我怀疑您不需要超时,它真的很短。因为你有很长的行并且随机重新排序你的命名参数我会重新格式化你写的内容:

for i in range(0, max_tasks):
task_scp_queue = BashOperator(
task_id="scp_queue_task_{}".format(i),
dag=dag,
retries=3, # you could make it a default arg on the dag
pool="scp_queue_pool",
queue="foo", # you really want both queue and pool? When debugging remove them.
bash_command="python foo", # Maybe you snipped a multiline command
provide_context=True, # BashOp doesn't have this argument
xcom_push=True, # PUSH the manifest ID FOR the NEXT task via xcom
)

task_process_queue = DockerOperator(
task_id="process_task_{}".format(i),
dag=dag,
retries=3,
pool="process_pool",
queue="foo",
execution_timeout=timedelta(minutes=5),
api_version="auto",
image="foo",
command="python foo --queue-name="
"{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
)

task_manifest = DockerOperator(
task_id="manifest_task_{}".format(i),
retries=3,
dag=dag,
pool="manfiest_pool",
queue="d_parser",
api_version="auto",
image="foo",
command="python --manifestid="
"{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
)

task_psql_queue >> task_scp_queue >> task_process_queue >> task_manifest

哦,现在看,您没有将 task_ids 作为字符串传递。尝试:

        command="python foo --queue-name="
"{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),
… … …
command="python --manifestid="
"{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),

关于python - Apache Airflow Xcom 从动态任务名称中提取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52996205/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com