gpt4 book ai didi

python - 如何从 Airflow 传感器中提取 xcom 值?

转载 作者:行者123 更新时间:2023-12-04 00:50:16 29 4
gpt4 key购买 nike

主要问题:如果不存在,我正在尝试创建一个 BigQuery 表。
联系方式:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。
问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。众所周知,poke 方法需要返回一个 bool 值。
这就是我创建任务的方式:

check_if_table_exists = BigQueryTableSensor(
task_id='check_if_table_exists',
project_id='my_project',
dataset_id='my_dataset',
table_id='my_table',
bigquery_conn_id='bigquery_default',
timeout=120,
do_xcom_push=True,
)

# Output: INFO - Success criteria met. Exiting.

get_results = BashOperator(
task_id='get_results',
bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
)

# Output: INFO - Running command: echo None
查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(
enter image description here
问题:
  • 有没有办法获得我的传感器的返回值?
  • 有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。
  • 最佳答案

    是的,这是可能的,我让它像这样工作:

    class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
    *args,
    **kwargs):
    super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
    application_id = context['ti'].xcom_pull(key='application_id')
    print("We found " + application_id)
    return True
    这是一个完整的 DAG 示例:
    import os
    import sys
    from datetime import datetime
    from airflow import DAG, settings
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.sensors.base_sensor_operator import BaseSensorOperator
    from airflow.utils.decorators import apply_defaults


    dag = DAG('my_dag_name',
    description='DAG ',
    schedule_interval=None,
    start_date=datetime(2021, 1, 7),
    tags=["samples"],
    catchup=False)

    class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
    *args,
    **kwargs):
    super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
    application_id = context['ti'].xcom_pull(key='application_id')
    print("We found " + application_id)
    return True


    def launch_spark_job(**kwargs):
    application_id = "application_1613995447156_11473"
    kwargs['ti'].xcom_push(key='application_id', value=application_id)


    launch_spark_job_op = PythonOperator(task_id='test_python',
    python_callable=launch_spark_job,
    provide_context=True,
    dag=dag)

    wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
    dag=dag,
    mode="reschedule")

    launch_spark_job_op >> wait_spark_job_sens

    关于python - 如何从 Airflow 传感器中提取 xcom 值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67186944/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com