gpt4 book ai didi

python - Airflow:高效地执行等待( sleep )任务

转载 作者:行者123 更新时间:2023-12-01 06:34:03 34 4
gpt4 key购买 nike

我需要在Airflow中实现等待任务。等待时间约为几个小时。

首先,TimeDeltaSensor 无法正常工作。

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
task_id="sleep_for_11_min",
delta=timedelta(minutes=SLEEP_MINUTES_1ST),
)

对于每日日程,例如:

schedule_interval='30 06 * * *'

只需等待下一个时间表:

[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come

这在代码中是非常明显的: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43

(更不用说使用计划时的已知错误:无或@once)

下一次尝试是使用 TimeSensor,如下所示:

 SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeSensor(
task_id="sleep_for_11_min",
provide_context=True,
target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
trigger_rule=TriggerRule.NONE_FAILED
)

这实际上效果很好,但在poke模式下,它需要一名工作人员来完成整个等待时间。我收到了使用重新安排模式的建议,但只需添加:

mode='reschedule',

在每次重新安排检查时生成新的安排,并且永远不会像这样完成:

[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....

(请注意,Airflow 在此处的日志中混合了 UTC 和我的时区 UTC+1)

下一次尝试是为 TimeSensor 生成相对于 DAG 的execution_date 的 target_time。但几次尝试都没有成功,例如:

task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}' 
sleep_task_1 = TimeSensor(
task_id=task_id="sleep_for_11_min",
provide_context=True,
# target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
# target_time = task_target_time,
# target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),
# target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
poke_interval=120,
mode='reschedule',
timeout=10*60*60,
trigger_rule=TriggerRule.NONE_FAILED
)

在注释行(target_time....)中,您只能看到我尝试过的一些组合。有些在 DAG 创建时立即失败,有些在运行过程中生成如下错误:

[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
while not self.poke(context):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.

我想我理解整个理论 - 包括execution_date的任务上下文在操作符创建时不可用,仅在运行时可用。 Jinja 返回应转换为时间的 Pendulum 对象,但 Jinja 是一个字符串,我在创建时没有获得 Pendulum 方法。

但是为什么创建简单的东西这么难:

sleep 1000

在 Airflow 中。

( Airflow :v1.10.6,python 3.6.8)

最佳答案

这是正在“ sleep ”的 Airflow 传感器,因为我认为 TimeDeltaSensor 应该处于 sleep 状态。

最好在“重新安排”模式下使用。

它相对于任务实例开始的当前时间进行 sleep ,例如TimeSleepSensor 运算符,默认情况下,它仅在 sleep 持续时间后“戳”一次,并且具有默认超时,该超时将在请求 sleep_duration 后立即超时,以防发生导致戳操作失败的情况。

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta

class TimeSleepSensor(BaseSensorOperator):
"""
Waits for specified time interval relative to task instance start

:param sleep_duration: time after which the job succeeds
:type sleep_duration: datetime.timedelta
"""

@apply_defaults
def __init__(self, sleep_duration, *args, **kwargs):
super(TimeSleepSensor, self).__init__(*args, **kwargs)
self.sleep_duration = sleep_duration
self.poke_interval = kwargs.get('poke_interval',int(sleep_duration.total_seconds()))
self.timeout = kwargs.get('timeout',int(sleep_duration.total_seconds()) + 30)


def poke(self, context):
ti = context["ti"]

sensor_task_start_date = ti.start_date
target_time = sensor_task_start_date + self.sleep_duration

self.log.info("Checking if the target time ({} - check:{}) has come - time to go: {}, start: {}, initial sleep_duration: {}"
.format(target_time, (timezone.utcnow() > target_time), (target_time-timezone.utcnow()), sensor_task_start_date, self.sleep_duration)
)

return timezone.utcnow() > target_time

使用很简单:

    sleep_task = TimeSleepSensor(
task_id="sleep_task",
sleep_duration=timedelta(minutes=1800),
mode='reschedule'
)

关于python - Airflow:高效地执行等待( sleep )任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59757151/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com