gpt4 book ai didi

palantir-foundry - Palantir Foundry 增量测试很难迭代,我如何更快地发现错误?

转载 作者:行者123 更新时间:2023-12-04 12:30:21 26 4
gpt4 key购买 nike

我的 Foundry 实例中有一个使用增量计算的管道设置,但由于某种原因没有达到我的预期。即,我想读取我的转换的先前输出并获取日期的最大值,然后仅在此最大日期之后立即读取数据的输入。
出于某种原因,它没有按照我的预期运行,而且在构建/分析/修改代码过程中逐步执行代码非常令人沮丧。
我的代码如下所示:

from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta


JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])


@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""

# Get the previous output and full current input
previous_output_df = my_output.dataframe("previous", output_schema)
current_input_df = my_input.dataframe("current")

# Get the next date of interest from the previous output
previous_max_date_rows = previous_output_df.groupBy().agg(
F.max(F.col("date")).alias("max_date")
).collect() # noqa
# PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
# operation in favor of making a new query plan.

if len(previous_max_date_rows) == 0:
# We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
previous_max_date = START_DATE
else:
# We have a previous max date, use it.
previous_max_date = previous_max_date_rows[0][0]

delta = timedelta(days=JUMP_DAYS)
next_date = previous_max_date + delta

# Filter the input to only the next date
filtered_input = current_input_df.filter(F.col("date") == F.lit(date))

# Do any other processing...

output_df = filtered_input

# Persist
my_output.set_mode("modify")
my_output.write_dataframe(output_df)

最佳答案

在增量转换中,很难隔离破坏代码的条件。因此,通常最好:

  • 除了获取输入/输出的适当 View 并将这些数据帧传递给内部方法之外,让您的计算函数不执行任何操作
  • 模块化您的每一块逻辑以使其可测试
  • 为每个部分编写测试以验证对特定 DataFrame 的每次操作是否符合您的预期。

  • 在您的代码示例中,将执行分解为一堆可测试的方法将使测试和查看问题变得更加容易。
    新方法应如下所示:
    from pypsark.sql import functions as F, types as T, DataFrame
    from transforms.api import transform, Input, Output, incremental
    from datetime import date, timedelta


    JUMP_DAYS = 1
    START_DATE = date(year=2021, month=10, day=1)
    OUTPUT_SCHEMA = T.StructType([
    T.StructField("date", T.DateType()),
    T.StructField("value", T.IntegerType())
    ])


    def get_previous_max_date(previous_output_df) -> date:
    """Given the previous output, get the maximum date written to it"""
    previous_max_date_rows = previous_output_df.groupBy().agg(
    F.max(F.col("date")).alias("max_date")
    ).collect() # noqa
    # PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
    # operation in favor of making a new query plan.

    if len(previous_max_date_rows) == 0:
    # We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
    previous_max_date = START_DATE
    else:
    # We have a previous max date, use it.
    previous_max_date = previous_max_date_rows[0][0]
    return previous_max_date


    def get_next_date(previous_output_df) -> date:
    """Given the previous output, compute the max date + 1 day"""
    previous_max_date = get_previous_max_date(previous_output_df)
    delta = timedelta(days=JUMP_DAYS)
    next_date = previous_max_date + delta
    return next_date


    def filter_input_to_date(current_input_df: DataFrame, date_filter: date) -> DataFrame:
    """Given the entire intput, filter to only rows that have the next date"""
    return current_input_df.filter(F.col("date") == F.lit(date))


    def process_with_dfs(current_input_df, previous_output_df) -> DataFrame:
    """With the constructed DataFrames, do our work"""
    # Get the next date of interest from the previous output
    next_date = get_next_date(previous_output_df)

    # Filter the input to only the next date
    filtered_input = filter_input_to_date(current_input_df, next_date)

    # Do any other processing...

    return filtered_input


    @incremental(semantic_version=1)
    @transform(
    my_input=Input("/path/to/my/input"),
    my_output=Output("/path/to/my/output")
    )
    def only_write_one_day(my_input, my_output):
    """Filter the input to only rows that are a day after the last written output and process them"""

    # Get the previous output and full current input
    previous_output_df = my_output.dataframe("previous", output_schema)
    current_input_df = my_input.dataframe("current")

    # Do the processing
    output_df = process_with_dfs(current_input_df, previous_output_df)

    # Persist
    my_output.set_mode("modify")
    my_output.write_dataframe(output_df)

    您现在可以设置单独的单元测试,假设您的代码位于 transforms-python/src/myproject/datasets/output.py ,遵循方法论 here正确设置一切。
    因此,我的测试文件现在如下所示:
    from pyspark.sql import functions as F, types as T
    from myproject.datasets import (
    only_write_one_day,
    process_with_dfs,
    filter_input_to_date,
    get_next_date,
    get_previous_max_date,
    OUTPUT_SCHEMA,
    JUMP_DAYS,
    START_DATE
    )
    import pytest
    from datetime import date


    @pytest.fixture
    def empty_output_df(spark_session):
    data = []
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


    @pytest.fixture
    def single_write_output_df(spark_session):
    data = [{
    "date": date(year=2021, month=10, day=1),
    "value": 1
    }]
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


    @pytest.fixture
    def double_write_output_df(spark_session):
    data = [
    {
    "date": date(year=2021, month=10, day=1),
    "value": 1
    },
    {
    "date": date(year=2021, month=10, day=2),
    "value": 2
    }
    ]
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


    @pytest.fixture
    def normal_input_df(spark_session):
    data = [
    {
    "date": date(year=2021, month=10, day=1),
    "value": 1
    },
    {
    "date": date(year=2021, month=10, day=2),
    "value": 2
    }
    ]
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


    # ======= FIRST RUN CASE

    def test_first_run_process_with_dfs(normal_input_df, empty_output_df):
    assert True


    def test_first_run_filter_input_to_date(normal_input_df, empty_output_df):
    assert True


    def test_first_run_get_next_date(normal_input_df, empty_output_df):
    assert True


    def test_first_run_get_previous_max_date(normal_input_df, empty_output_df):
    assert True


    # ======= NORMAL CASE

    def test_normal_run_process_with_dfs(normal_input_df, single_write_output_df):
    assert True


    def test_normal_run_filter_input_to_date(normal_input_df, single_write_output_df):
    assert True


    def test_normal_run_get_next_date(normal_input_df, single_write_output_df):
    assert True


    def test_normal_run_get_previous_max_date(normal_input_df, single_write_output_df):
    assert True

    值得注意的是,这就是为什么您可以在 Foundry 中启用 McCabe 复杂性检查器和单元测试覆盖功能等功能的原因,这样您就可以将代码分解成更小、更耐用的部分,例如这样。
    遵循这样的设计模式将为您提供更持久的代码,这些代码在增量转换中更值得信赖。
    如果您采用这种转换风格,您还可以通过使用“测试”的代码存储库功能运行您正在寻找的单个测试来更快地迭代以完善您的逻辑。您可以打开测试文件并单击您感兴趣的特定案例旁边的绿色“测试”按钮,这将使您编写逻辑的速度比每次单击构建并尝试像您一样排列输入条件要快得多想。

    关于palantir-foundry - Palantir Foundry 增量测试很难迭代,我如何更快地发现错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69531556/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com