gpt4 book ai didi

airflow - 如何在 Airflow 中将 MySqlOperator 与 xcom 一起使用?

转载 作者:行者123 更新时间:2023-12-03 00:13:57 25 4
gpt4 key购买 nike

我读了这个How to use airflow xcoms with MySqlOperator虽然它有一个类似的标题,但它并没有真正解决我的问题。

我有以下代码:

def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
return 'import_orders'
else:
return 'skip_operation'

query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
xcom_push=True,
dag=dag)

branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation

MySqlOperator 根据 BranchPythonOperator 选择下一个任务的编号返回一个编号。保证 MySqlOperator 返回的值大于 0

我的问题是 MySqlOperator 没有将任何内容推送到 XCOM当我进入 XCOM 时,在 UI 上我什么也看不到。 BranchPythonOperator 显然没有读取任何内容,因此我的代码失败了。

为什么XCOM在这里不起作用?

最佳答案

MySQL 运算符当前(撰写本文时为airflow 1.10.0)不支持在 XCom 中返回任何内容,因此目前的修复方法是自己编写一个小型运算符。您可以直接在 DAG 文件中执行此操作(未经测试,因此可能会出现愚蠢的错误):

from airflow.operators.mysql_operator import MySqlOperator as BaseMySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(BaseMySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_first(
self.sql,
parameters=self.parameters)


def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if str(xcom) == 'NewRecords':
return 'import_orders'
else:
return 'skip_operation'

query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = ReturningMySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
# xcom_push=True,
dag=dag)

branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation

关于airflow - 如何在 Airflow 中将 MySqlOperator 与 xcom 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52645905/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com