gpt4 book ai didi

Airflow - 如何从 ecs 运营商推送 xcom?

转载 作者:行者123 更新时间:2023-12-04 10:53:03 25 4
gpt4 key购买 nike

在我的 Airflow dag 中,我有一个 ecs_operator 任务,然后是 python 运算符(operator)任务。我想使用 Airflow 的 xcom 功能将一些消息从 ECS 任务推送到 python 任务。我尝试了选项 do_xcom_push=True没有结果。找到下面的示例 dag。

dag = DAG(
dag_name, default_args=default_args, schedule_interval=None)
start = DummyOperator(task_id = 'start'
,dag =dag)
end = DummyOperator(task_id = 'end'
,dag =dag)
ecs_operator_args = {
'launch_type': 'FARGATE',
'task_definition': 'task-def:2',
'cluster': 'cluster-name',
'region_name': 'region',
'network_configuration': {
'awsvpcConfiguration':
{}
}
}
ecs_task = ECSOperator(
task_id='x_com_test'
,**ecs_operator_args
,do_xcom_push=True
,params={'my_param': 'Parameter-1'}
,dag=dag)


def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='x_com_test',key='the_message')
print("received message: '%s'" % msg)

pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag)

start >> ecs_task >> pull_task >> end

最佳答案

您需要为容器设置一个 cloudwatch 日志组。
ECSOperator 需要扩展以支持推送到 xcom:

from collections import deque
from airflow.utils import apply_defaults
from airflow.contrib.operators.ecs_operator import ECSOperator


class MyECSOperator(ECSOperator):
@apply_defaults
def __init__(self, xcom_push=False, **kwargs):
super(CLECSOperator, self).__init__(**kwargs)
self.xcom_push_flag = xcom_push

def execute(self, context):
super().execute(context)
if self.xcom_push_flag:
return self._last_log_event()

def _last_log_event(self):
if self.awslogs_group and self.awslogs_stream_prefix:
task_id = self.arn.split("/")[-1]
stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)
events = self.get_logs_hook().get_log_events(self.awslogs_group, stream_name)
last_event = deque(events, maxlen=1).pop()
return last_event["message"]


dag = DAG(
dag_name, default_args=default_args, schedule_interval=None)
start = DummyOperator(task_id = 'start'
,dag =dag)
end = DummyOperator(task_id = 'end'
,dag =dag)
ecs_operator_args = {
'launch_type': 'FARGATE',
'task_definition': 'task-def:2',
'cluster': 'cluster-name',
'region_name': 'region',
'awslogs_group': '/aws/ecs/myLogGroup',
'awslogs_stream_prefix': 'myStreamPrefix',
'network_configuration': {
'awsvpcConfiguration':
{}
}
}
ecs_task = MyECSOperator(
task_id='x_com_test'
,**ecs_operator_args
,xcom_push=True
,params={'my_param': 'Parameter-1'}
,dag=dag)


def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='x_com_test',key='return_value')
print("received message: '%s'" % msg)

pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag)

start >> ecs_task >> pull_task >> end
ecs_task将在完成执行之前从日志组中获取最后一个事件,并将其推送到 xcom。

关于Airflow - 如何从 ecs 运营商推送 xcom?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59372523/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com