gpt4 book ai didi

etl - 有没有办法为 Prefect 中的新 Flow 回填历史数据(一次)?

转载 作者:行者123 更新时间:2023-12-04 00:54:09 25 4
gpt4 key购买 nike

我刚开始阅读有关 Prefect 的文章(并且有一点使用 Airflow 的经验)。
我的目标是设置一个每天在 Prefect 中运行的任务,并将数据收集到一个文件夹中(我想这肯定是 Prefect 可以帮助我做的)。
我的目标也是填充历史数据(就像我及时运行这项工作一样)。
在 Airflow 中有 start_date 的概念,当它在过去设置时,DAG 将从该日期开始运行并在每个时间间隔填充。
例如,如果我有一个任务需要一个日期并返回该日期的数据,例如:

# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
return fetched_values_in_json(date)
在 Prefect 中是否有一种本地方式可以做到这一点?
我在这里或文档中的任何地方都找不到这个答案,尽管提到了回填 here .
任何帮助/指导都将非常有用。
我试过的:
当我设置 schedule成为:
from datetime import datetime, timedelta

from prefect.schedules import Schedule

schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])
然后我做 flow.run()我只是得到:
INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00
我期望的是自 start_date 以来运行我已提供,然后暂停,直到到达当前时间并等待下一个时间表。

最佳答案

Prefect 不会对您的 Flow 或其任务如何依赖时间做出任何隐含的假设,因此执行回填取决于您的 Flow 的结构。时间显式影响流通常有两种方式:

  • 通过 ParameterDateTimeParameter
  • 通过 prefect.context (其中包括许多与时间相关的字段,描述为 here )

  • 鉴于此,可以通过创建任意数量的临时调度流运行并覆盖适当的上下文键或默认参数值来执行回填。 (请注意,可以为任何流创建临时运行,无论该流是否有计划。)
    为了更精确,这里有两个示例触发单个回填运行(为了容纳更多运行,循环适当的值并为每个值创建一个运行):
    使用上下文
    import pendulum
    import prefect


    @prefect.task
    def do_something_time_specific():
    """
    This task uses the timestamp provided to the custom `backfill_time`
    context key; if that does not exist, it falls back on the built-in
    `scheduled_start_time` context key.
    """

    current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
    if isinstance(current_time, str):
    current_time = pendulum.parse(current_time)
    # performs some action dealing with this timestamp


    flow = Flow("backfill", tasks=[do_something_time_specific])

    ## using Core
    flow.run() # will use current timestamp
    flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp

    ## using an API
    prefect.Client().create_flow_run(flow_id="FLOWID",
    context={"backfill_time": "1986-01-02"}) # will use old timestamp
    使用参数
    import pendulum
    import prefect


    current_time = prefect.Parameter("current_time", default=None)

    @prefect.task
    def do_something_time_specific(current_time):
    """
    This task uses the timestamp provided to the task explicitly.
    """
    current_time = current_time or pendulum.now("utc") # uses "now" if not provided
    if isinstance(current_time, str):
    current_time = pendulum.parse(current_time)

    # performs some action dealing with this timestamp


    with Flow("backfill") as flow:
    do_something_time_specific(current_time)


    ## using Core
    flow.run() # will use current timestamp
    flow.run(current_time="1986-01-02") # will use old timestamp

    ## using an API
    prefect.Client().create_flow_run(flow_id="FLOWID",
    parameters={"current_time": "1986-01-02"}) # will use old timestamp
    较新的参数类,例如 DateTimeParameter提供更好的打字保证,但希望这能证明这个想法。
    编辑 :为了完整性,请注意,可以通过运行 flow.run(run_on_schedule=False) 在 Core 中为具有计划的流创建临时运行。

    关于etl - 有没有办法为 Prefect 中的新 Flow 回填历史数据(一次)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64029629/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com