- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我是 Airflow 的新手。
我遇到过一个场景,父 DAG 需要将一些动态数字(比如 n
)传递给子 DAG。
SubDAG 将使用此数字动态创建 n
并行任务。
Airflow 文档未涵盖实现此目的的方法。所以我探索了几种方法:
我曾尝试作为 xcom 值传递,但出于某种原因,SubDAG 未解析为传递的值。
父 Dag 文件
def load_dag(**kwargs):
number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
dag_data = json.dumps({
"number_of_runs": number_of_runs
})
return dag_data
# ------------------ Tasks ------------------------------
load_config = PythonOperator(
task_id='load_config',
provide_context=True,
python_callable=load_dag,
dag=dag)
t1 = SubDagOperator(
task_id=CHILD_DAG_NAME,
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
default_args=default_args,
dag=dag,
)
子目录文件
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None)
variabe_names = {}
for i in range(num_of_runs):
variabe_names['task' + str(i + 1)] = DummyOperator(
task_id='dummy_task',
dag=dag_subdag,
)
return dag_subdag
我还尝试将 number_of_runs
作为全局变量传递,但没有成功。
我们还尝试将此值写入数据文件。但是子 DAG 抛出 File doesn't exist error
。这可能是因为我们正在动态生成此文件。
有人可以帮我解决这个问题吗?
最佳答案
我已经使用选项 3 完成了它。关键是如果文件不存在,则返回一个没有任务的有效 dag。因此 load_config 将生成一个文件,其中包含您的任务数量或更多信息(如果需要)。你的 subdag 工厂看起来像:
def subdag(...):
sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
file_path = "/path/to/generated/file"
if os.path.exists(file_path):
data_file = open(file_path)
list_tasks = data_file.readlines()
for task in list_tasks:
DummyOperator(
task_id='task_'+task,
default_args=args,
dag=sdag,
)
return sdag
在 dag 生成时,您将看到一个没有任务的子标签。 dag执行时,load_config完成后,可以看到动态生成的subdag
关于python - Airflow : Passing a dynamic value to Sub DAG operator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44365716/
我正在尝试使用 flot 绘制 SQL 数据库中的数据图表,这是使用 php 收集的,然后使用 json 编码的。 目前看起来像: [{"month":"February","data":482},
我有一个来自 php 行的 json 结果,类似于 ["value"]["value"] 我尝试使用内爆函数,但得到的结果是“value”“value” |id_kategori|created_at
脚本 1 将记录 two 但浏览器仍会将 select 元素呈现为 One。该表单还将提交值 one。 脚本 2 将记录、呈现和提交 两个。我希望它们是同义词并做同样的事情。请解释它们为何不同,以及我
我的python字典结构是这样的: ips[host][ip] 每行 ips[host][ip] 看起来像这样: [host, ip, network, mask, broadcast, mac, g
在 C# 中 我正在关注的一本书对设置和获取属性提出了这样的建议: double pri_test; public double Test { get { return pri_test; }
您可能熟悉 enum 位掩码方案,例如: enum Flags { FLAG1 = 0x1, FLAG2 = 0x2, FLAG3 = 0x4, FLAG4 = 0x8
在一些地方我看到了(String)value。在一些地方value.toString() 这两者有什么区别,在什么情况下我需要使用哪一个。 new Long(value) 和 (Long)value
有没有什么时候 var result = !value ? null : value[0]; 不会等同于 var result = value ? value[0] : null; 最佳答案 在此处将
我正在使用扫描仪检测设备。目前,我的条形码的值为 2345345 A1。因此,当我扫描到记事本或文本编辑器时,输出将类似于 2345345 A1,这是正确的条形码值。 问题是: 当我第一次将条形码扫描
我正在读取 C# 中的资源文件并将其转换为 JSON 字符串格式。现在我想将该 JSON 字符串的值转换为键。 例子, [ { "key": "CreateAccount", "text":
我有以下问题: 我有一个数据框,最多可能有 600 万行左右。此数据框中的一列包含某些 ID。 ID NaN NaN D1 D1 D1 NaN D1 D1 NaN NaN NaN NaN D2 NaN
import java.util.*; import java.lang.*; class Main { public static void main (String[] args) thr
我目前正在开发我的应用程序,使其设计基于 Holo 主题。在全局范围内我想做的是工作,但我对文件夹 values、values-v11 和 values-v14. 所以我知道: values 的目标是
我遇到了一个非常奇怪的问题。 我的公司为我们的各种 Assets 使用集中式用户注册网络服务。我们一般通过HttpURLConnection使用请求方法GET向Web服务发送请求,通过qs设置参数。这
查询: UPDATE nominees SET votes = ( SELECT votes FROM nominees WHERE ID =1 ) +1 错误: You can't specify
如果我运行一段代码: obj = {}; obj['number'] = 1; obj['expressionS'] = 'Sin(0.5 * c1)'; obj['c
我正在为我的应用创建一个带有 Twitter 帐户的登录页面。当我构建我的项目时会发生上述错误。 values/strings.xml @dimen/abc_text_size_medium
我在搜索引擎中使用以下 View : CREATE VIEW msr_joined_view AS SELECT table1.id AS msr_id, table1.msr_number, tab
为什么验证会返回此错误。如何解决? ul#navigation li#navigation-3 a.current Value Error : background-position Too
我有一个数据名如下 import pandas as pd d = { 'Name' : ['James', 'John', 'Peter', 'Thomas', 'Jacob', 'Andr
我是一名优秀的程序员,十分优秀!