- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我刚开始阅读有关 Prefect 的文章(并且有一点使用 Airflow 的经验)。
我的目标是设置一个每天在 Prefect 中运行的任务,并将数据收集到一个文件夹中(我想这肯定是 Prefect 可以帮助我做的)。
我的目标也是填充历史数据(就像我及时运行这项工作一样)。
在 Airflow 中有 start_date 的概念,当它在过去设置时,DAG 将从该日期开始运行并在每个时间间隔填充。
例如,如果我有一个任务需要一个日期并返回该日期的数据,例如:
# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
return fetched_values_in_json(date)
在 Prefect 中是否有一种本地方式可以做到这一点?
schedule
成为:
from datetime import datetime, timedelta
from prefect.schedules import Schedule
schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])
然后我做
flow.run()
我只是得到:
INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00
我期望的是自
start_date
以来运行我已提供,然后暂停,直到到达当前时间并等待下一个时间表。
最佳答案
Prefect 不会对您的 Flow 或其任务如何依赖时间做出任何隐含的假设,因此执行回填取决于您的 Flow 的结构。时间显式影响流通常有两种方式:
Parameter
或 DateTimeParameter
prefect.context
(其中包括许多与时间相关的字段,描述为 here )import pendulum
import prefect
@prefect.task
def do_something_time_specific():
"""
This task uses the timestamp provided to the custom `backfill_time`
context key; if that does not exist, it falls back on the built-in
`scheduled_start_time` context key.
"""
current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
flow = Flow("backfill", tasks=[do_something_time_specific])
## using Core
flow.run() # will use current timestamp
flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
context={"backfill_time": "1986-01-02"}) # will use old timestamp
使用参数
import pendulum
import prefect
current_time = prefect.Parameter("current_time", default=None)
@prefect.task
def do_something_time_specific(current_time):
"""
This task uses the timestamp provided to the task explicitly.
"""
current_time = current_time or pendulum.now("utc") # uses "now" if not provided
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
with Flow("backfill") as flow:
do_something_time_specific(current_time)
## using Core
flow.run() # will use current timestamp
flow.run(current_time="1986-01-02") # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
parameters={"current_time": "1986-01-02"}) # will use old timestamp
较新的参数类,例如
DateTimeParameter
提供更好的打字保证,但希望这能证明这个想法。
flow.run(run_on_schedule=False)
在 Core 中为具有计划的流创建临时运行。
关于etl - 有没有办法为 Prefect 中的新 Flow 回填历史数据(一次)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64029629/
我正在编写一个 ETL(在带有 mongodb 后端的 python 中)并且想知道:应该将 ETL 什么样的标准函数和工具称为 ETL? 此 ETL 将尽可能通用,采用可编写脚本和模块化的方法。大多
我正在构建一个从 informatica cdc 获取数据的暂存区。现在,例如,假设我正在复制两个表以进行增量加载。每次加载后,我必须从暂存表中删除已处理的数据。我连接这两个表来填充我的目标维度。问题
我们目前使用 Datastage ETL 来 - 每天从 15 个表(3 个不同的架构)导出一个 CSV/文本文件,其中包含数据。 我想知道是否有一种更简单的方法可以在不使用 ETL 的情况下完成此操
我有两个 CSV 文件: 首先包含以下格式的 ~ 500M 记录 id,name 10000023432,Tom User 13943423235,Blah Person 第二个包含 ~ 1.5B 以
在这篇文章中,我不是在问任何教程,如何做某事,在这篇文章中,我请求您的帮助,如果有人可以用简单的话解释我,什么是 DWH(数据仓库)以及什么是 ETL。 当然,我在 google 和 youtube
在这篇文章中,我不是在问任何教程,如何做某事,在这篇文章中,我请求您的帮助,如果有人可以用简单的话解释我,什么是 DWH(数据仓库)以及什么是 ETL。 当然,我在 google 和 youtube
我有多个日志文件 1.csv、2.csv 和 3.csv 由日志报告生成。我想读取这些文件并使用 Scriptella 同时解析它们。 最佳答案 Scriptella 不提供开箱即用的并行作业执行。相
我正在构建一个 ETL,它将通过一个变量在不同的源上运行。 我如何执行我的工作(rake 任务) Kiba.run(Kiba.parse(IO.read(etl_file),etl_file)) 并为
我有 3 个表,一个用于顶点 A,一个用于顶点 B,第三个用于从 B 到 A 的边。如何将此图导入 OrientDB? 目前,教程只说如何导入两个csv文件,一个用于顶点A,另一个用于顶点B和从A连接
将 Apache NIFI 用于 ETL 过程的效果如何,源为 HDFS,目标为 Oracle DB。 Apache NIFI 与 Pentaho、Datastage 等其他 ETL 工具相比有哪些局
我最近才开始使用 Rhino-Etl对于非常简单的 ETL 过程,并取得了巨大的成功。我现在有一个稍微复杂的场景要解决,但我没有发现 ConventionInputCommandOperation 以
我正忙于处理 ETL 管道,但对于这个特定问题,我需要获取一个数据表,并将每一列变成一个集合 - 即一个唯一数组。 我正在努力思考如何在 Kiba 框架内实现这一目标。 这是我要实现的目标的本质: 来
我读过 Presto 用于临时查询,而 Hive/spark 更适用于 ETL 场景。在 ETL 中不使用 Presto 的原因似乎是因为 Presto 查询可能会失败并且没有中间查询容错。 然而,看
我有 2 个 csv 文件。 人.csv ID,PetID,Jumps 1,101,Yes 2,102,No 3,103,Yes 宠物.csv ID,Name 101,Dog 102,Cat 103,
我正在寻找 ETL 工具,在谷歌上发现了很多关于 Pentaho Kettle 的信息。 我还需要一个数据分析器在 Star Schema 上运行,以便业务用户可以玩转并生成任何类型的报告或矩阵。 P
我目前正在评估 Talend ETL(用于数据集成的 Talend Open Studio)。 我想知道如何/是否可以将 ETL 作业公开为 Web 服务。 我知道我可以将作业导出为 Web 服务并通
我是 Actor 建模领域的新手,我爱上了这个想法。但是,是否存在某种模式来以安全的方式处理一批仅用于大容量存储的消息? 恐怕如果我阅读了预期 500 条的 400 条消息并将它们放入列表中,如果系统
我在 Heroku 服务器上收到此错误,但它在本地完全正常工作。 这是从表单获取 CSV 文件并尝试存储在资源文件夹中的 Controller 。 @PostMapping(value = "/imp
我们正在重组我们的整个数据库。也就是说,数据库结构发生了翻天覆地的变化。一些表字段将被转换为表行;会有很多验证;一些表被分解成多个表,而另一些则合并在一起。基本上我们正在将遗留数据库更改为 3NF。
我正在尝试创建一个可以通过 HTTP 下载多个文件的作业。这些文件的列表位于 MySQL 表中。我通过以下步骤创建了一个主要作业:开始、设置变量、FILELIST(我创建的转换)、下载(我创建的作业)
我是一名优秀的程序员,十分优秀!