- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要在Airflow中实现等待任务。等待时间约为几个小时。
首先,TimeDeltaSensor 无法正常工作。SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
task_id="sleep_for_11_min",
delta=timedelta(minutes=SLEEP_MINUTES_1ST),
)
对于每日日程,例如:
schedule_interval='30 06 * * *'
只需等待下一个时间表:
[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come
这在代码中是非常明显的: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43
(更不用说使用计划时的已知错误:无或@once)
下一次尝试是使用 TimeSensor,如下所示:
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeSensor(
task_id="sleep_for_11_min",
provide_context=True,
target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
trigger_rule=TriggerRule.NONE_FAILED
)
这实际上效果很好,但在poke模式下,它需要一名工作人员来完成整个等待时间。我收到了使用重新安排模式的建议,但只需添加:
mode='reschedule',
在每次重新安排检查时生成新的安排,并且永远不会像这样完成:
[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....
(请注意,Airflow 在此处的日志中混合了 UTC 和我的时区 UTC+1)
下一次尝试是为 TimeSensor 生成相对于 DAG 的execution_date 的 target_time。但几次尝试都没有成功,例如:
task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}'
sleep_task_1 = TimeSensor(
task_id=task_id="sleep_for_11_min",
provide_context=True,
# target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
# target_time = task_target_time,
# target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),
# target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
poke_interval=120,
mode='reschedule',
timeout=10*60*60,
trigger_rule=TriggerRule.NONE_FAILED
)
在注释行(target_time....)中,您只能看到我尝试过的一些组合。有些在 DAG 创建时立即失败,有些在运行过程中生成如下错误:
[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
while not self.poke(context):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.
我想我理解整个理论 - 包括execution_date的任务上下文在操作符创建时不可用,仅在运行时可用。 Jinja 返回应转换为时间的 Pendulum 对象,但 Jinja 是一个字符串,我在创建时没有获得 Pendulum 方法。
但是为什么创建简单的东西这么难:
sleep 1000
在 Airflow 中。
( Airflow :v1.10.6,python 3.6.8)
最佳答案
这是正在“ sleep ”的 Airflow 传感器,因为我认为 TimeDeltaSensor 应该处于 sleep 状态。
最好在“重新安排”模式下使用。
它相对于任务实例开始的当前时间进行 sleep ,例如TimeSleepSensor 运算符,默认情况下,它仅在 sleep 持续时间后“戳”一次,并且具有默认超时,该超时将在请求 sleep_duration 后立即超时,以防发生导致戳操作失败的情况。
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
class TimeSleepSensor(BaseSensorOperator):
"""
Waits for specified time interval relative to task instance start
:param sleep_duration: time after which the job succeeds
:type sleep_duration: datetime.timedelta
"""
@apply_defaults
def __init__(self, sleep_duration, *args, **kwargs):
super(TimeSleepSensor, self).__init__(*args, **kwargs)
self.sleep_duration = sleep_duration
self.poke_interval = kwargs.get('poke_interval',int(sleep_duration.total_seconds()))
self.timeout = kwargs.get('timeout',int(sleep_duration.total_seconds()) + 30)
def poke(self, context):
ti = context["ti"]
sensor_task_start_date = ti.start_date
target_time = sensor_task_start_date + self.sleep_duration
self.log.info("Checking if the target time ({} - check:{}) has come - time to go: {}, start: {}, initial sleep_duration: {}"
.format(target_time, (timezone.utcnow() > target_time), (target_time-timezone.utcnow()), sensor_task_start_date, self.sleep_duration)
)
return timezone.utcnow() > target_time
使用很简单:
sleep_task = TimeSleepSensor(
task_id="sleep_task",
sleep_duration=timedelta(minutes=1800),
mode='reschedule'
)
关于python - Airflow:高效地执行等待( sleep )任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59757151/
在Airflow中,我一直在使用“airflow run”和“airflow test”,但不完全理解它们有何不同。他们有什么区别? 最佳答案 我自己通读了文档,发现它是多么令人困惑。 Airflow
我使用 Airflow 已经有一段时间了,它是由一位同事创建的。最近我遇到了一些错误,这需要我更深入地了解如何修复 Airflow 中的某些问题。 我确实理解这三个进程是什么,但我只是不明白运行它们时
AIRFLOW_HOME=/path/to/my/airflow_home 我收到这个警告... >airflow trigger_dag python_dag3 /Users/alexryan/mi
有没有人报告过他们在他们的公司中让 Airflow 扩展了多少?我正在考虑实现 Airflow 来执行 5,000 多个任务,每个任务每小时运行一次,有一天可以将其扩展到 20,000 多个任务。在检
问题 :我想使用 Github 上最新版本的 Apache-Airflow 安装 apache-airflow 以及所有依赖项? 我怎样才能使用 pip 做到这一点? 在生产环境中使用它是否安全? 最
我们在 AWS ECS 上运行 Airflow,并将所有 DAG 捆绑在一个 Docker 镜像中。我们不时更新 DAGS,并部署新版本的 Docker Image。当我们这样做时,ECS 将终止正在
问题很简单。我需要限制 Airflow 网络用户仅查看和执行某些 DAG 和任务。 如果可能,我宁愿不使用 Kerberos也不是 OAuth . Multi-tenancy option 似乎是一个
我们正在使用 Airflow 2.00。我正在尝试实现一个做两件事的 DAG: 通过 API 触发报告 从源到目标下载报告。 任务 1 和任务 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有
对于一项任务,有许多辅助任务 - 从文件/数据库中获取/保存属性、验证、审计。这些辅助方法并不耗时。 一个示例 DAG 流, fetch_data >> actual_processing >> va
有什么方法可以重新加载作业而不必重新启动服务器吗? 最佳答案 在airflow.cfg中,您具有以下两种配置来控制此行为: # after how much time a new DAGs shoul
我们可以通过将任务/dag 超时设置为 None 并手动触发其运行来使用 Airflow dag 来定义永无止境的作业(即具有无条件循环以消耗流数据的任务)吗?让 Airflow 监测永无止境的任务会
我是 Airflow 的新手,最近开始探索这个工具。我在 18.4 版本的 ubuntu 机器上安装了 1.10.10 版。从设置和安装的角度来看,一切正常,但是我在任何 DAG 中的任务都没有运行,
我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。 我认为最好的
我有一个 DAG,只要 FileSensor 检测到文件,它就会使用它,为每个文件生成任务,以 (1) 将文件移动到暂存区域,(2) 触发单独的 DAG 来处理文件。 FileSensor -> Mo
我需要手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表绑定(bind)。 最佳答案 只需在创建 DAG 时将 schedule_interval 设置为
所以这是一个愚蠢的想法...... 我在 Airflow 中创建了(许多)DAG...并且它有效...但是,我想以某种方式将其打包,以便我可以在不安装 Airflow 的情况下运行单个 DAG 运行;
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我们正在尝试将 MongoHook 和 GCSToLocalFilesystemOperator 导入到我们的 Airflow 项目中: docs for MongoHook docs for GCS
启动 Airflow 网络服务器时出现以下错误 balajee@Balajees-MacBook-Air.local:~$ Airflow 网络服务器 -p 8080 [2018-12-03 00:2
运行pip install airflow[postgres]命令后出现以下错误: > raise RuntimeError("By default one of Airflow's dependen
我是一名优秀的程序员,十分优秀!