gpt4 book ai didi

python - Airflow - EMR 运算符(operator)中的任务实例

转载 作者:行者123 更新时间:2023-12-01 21:57:57 26 4
gpt4 key购买 nike

在 Airflow 中,我面临着需要将 job_flow_id 传递到我的 emr 步骤之一的问题。我能够从运算符(operator)处检索 job_flow_id,但是当我要创建提交到集群的步骤时,task_instance 值不正确。我有以下代码:

def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]

dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))

try:

create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)

load_data_steps = issue_step('load', ['arg1', 'arg2'])

load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id

load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)

check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)

create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover

except AirflowException as ae:
print ae.message

问题是,当我检查 EMR 时,我在 load_data 步骤中看到的不是 --cluster-id j-1234,而是 --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}",这会导致我的步骤失败。

如何获取步骤函数中的实际值?

谢谢,节日快乐

最佳答案

我发现airflow存储库上有关于this的PR 。问题是 EmrAddStepsOperator 中没有步骤模板。为了解决这个问题,我做了以下工作:

  • 创建了一个继承自 EmrAddStepsOperator 的自定义运算符
  • 将此运算符添加为插件
  • 在我的 DAG 文件中调用新的运算符

这里是文件 custom_emr_add_step_operator.py 中的自定义运算符和插件的代码(请参见下面的树)

from __future__ import division, absolute_import, print_function

from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator


class CustomEmrAddStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above

@apply_defaults
def __init__(
self,
*args, **kwargs):
super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)

def execute(self, context):
super(CustomEmrAddStepsOperator, self).execute(context=context)


# Defining the plugin class
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
operators = [CustomEmrAddStepsOperator]

在我的 DAG 文件中,我以这种方式调用插件

from airflow.operators import CustomEmrAddStepsOperator

我的项目和插件的结构如下所示:

├── config
│   └── airflow.cfg
├── dags
│   ├── __init__.py
│   └── my_dag.py
├── plugins
│   ├── __init__.py
│   └── operators
│   ├── __init__.py
│   └── custom_emr_add_step_operator.py
└── requirements.txt

如果您使用的是 PyCharm 等 IDE,则会出现错误,因为它表示无法找到该模块。但是当你运行Airflow时,就不会出现这个问题了。还要记住,确保在您的 airflow.cfg 中,您将指向正确的 plugins 文件夹,以便 Airflow 能够读取您新创建的插件。

关于python - Airflow - EMR 运算符(operator)中的任务实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47959481/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com