- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我读了这个How to use airflow xcoms with MySqlOperator虽然它有一个类似的标题,但它并没有真正解决我的问题。
我有以下代码:
def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
return 'import_orders'
else:
return 'skip_operation'
query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
xcom_push=True,
dag=dag)
branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)
get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
MySqlOperator
根据 BranchPythonOperator
选择下一个任务的编号返回一个编号。保证 MySqlOperator
返回的值大于 0
。
我的问题是 MySqlOperator
没有将任何内容推送到 XCOM
当我进入 XCOM
时,在 UI 上我什么也看不到。 BranchPythonOperator
显然没有读取任何内容,因此我的代码失败了。
为什么XCOM
在这里不起作用?
最佳答案
MySQL 运算符当前(撰写本文时为airflow 1.10.0)不支持在 XCom 中返回任何内容,因此目前的修复方法是自己编写一个小型运算符。您可以直接在 DAG 文件中执行此操作(未经测试,因此可能会出现愚蠢的错误):
from airflow.operators.mysql_operator import MySqlOperator as BaseMySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
class ReturningMySqlOperator(BaseMySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_first(
self.sql,
parameters=self.parameters)
def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if str(xcom) == 'NewRecords':
return 'import_orders'
else:
return 'skip_operation'
query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = ReturningMySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
# xcom_push=True,
dag=dag)
branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)
get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
关于airflow - 如何在 Airflow 中将 MySqlOperator 与 xcom 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52645905/
自过去 6 个月以来,我一直在使用 Airlfow。我很高兴在 Airflow 中定义工作流程。 我有以下场景,我无法获得 xcom 值(以黄色突出显示)。 请在以下示例代码中找到代码: 工作流程 d
DockerOperator 有一个参数 xcom_push,设置后会将 Docker 容器的输出推送到 Xcom: t1 = DockerOperator(task_id='run-hello-wo
我有任务失败时的松弛警报,但我也希望有恢复消息。 当任务最初失败时,它会在其 on_failure_callback 中执行 xcom_push。我在此处保存的内容可在下一次 DAG 运行中使用: c
在我的 Airflow dag 中,我有一个 ecs_operator 任务,然后是 python 运算符(operator)任务。我想使用 Airflow 的 xcom 功能将一些消息从 ECS 任
我有一个 Airflow 管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中。我使用 CloudSqlInstanceImportOperator 导入 CSV 文件
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我已经在 DAG(Bash 和 Docker Operators)中成功创建了动态任务,但是我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。 for i in range(0, ma
指定多个任务时,task_ids 如何工作? 在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)
我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我正在存储一个列表,我想使用列表中的每个元素来动态创建下游任务。 我的用例是我有一个上游运算符(operator)检查文件的 sftp
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从运行的第一个任务中获取数据。这是当前代码: from airflow import DAG from airflow.operators.b
创建了一个图像包含 /airflow/xcom/return.json在所有子目录上使用 chmod +x由于日志显示找不到文件或目录(已尝试 chmod +x) strtpodbefore = Ku
我创建了一个 xcom,我想将结果作为 PostgresOperator 参数获取。我试过了 my_task = PostgresOperator( task_id=‘my_task',
我正在尝试获取 XCOM使用 API 的特定 DAG 的值? 找不到任何方法。 有什么想法吗?! 最佳答案 Airflow 可能从版本 2 开始就公开了 API。 Airflow API xcom 值
主要问题:如果不存在,我正在尝试创建一个 BigQuery 表。 联系方式:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTa
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我需要引用 BashOperator 返回的变量。在我的 task_archive_s3_file 中,我需要从 get_s3_file 获取文件名。该任务只是将 {{ ti.xcom_pull(ta
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我广泛搜索了 Airflow 博客和文档来调试我遇到的问题。 我想要解决的问题 检查 ftp 服务器上是否存在特定文件 如果存在,请将其上传到云端 如果不存在,请向客户发送电子邮件,报告未找到文件 我
我尝试通过 airflow cli test 命令测试 2 个任务` 第一个任务运行,自动将最后一个控制台推送到 xcom,我按预期在 Airflow GUI 中看到了值 some value 当我通
我是一名优秀的程序员,十分优秀!