- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的 Foundry 实例中有一个使用增量计算的管道设置,但由于某种原因没有达到我的预期。即,我想读取我的转换的先前输出并获取日期的最大值,然后仅在此最大日期之后立即读取数据的输入。
出于某种原因,它没有按照我的预期运行,而且在构建/分析/修改代码过程中逐步执行代码非常令人沮丧。
我的代码如下所示:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = my_output.dataframe("previous", output_schema)
current_input_df = my_input.dataframe("current")
# Get the next date of interest from the previous output
previous_max_date_rows = previous_output_df.groupBy().agg(
F.max(F.col("date")).alias("max_date")
).collect() # noqa
# PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
# operation in favor of making a new query plan.
if len(previous_max_date_rows) == 0:
# We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
previous_max_date = START_DATE
else:
# We have a previous max date, use it.
previous_max_date = previous_max_date_rows[0][0]
delta = timedelta(days=JUMP_DAYS)
next_date = previous_max_date + delta
# Filter the input to only the next date
filtered_input = current_input_df.filter(F.col("date") == F.lit(date))
# Do any other processing...
output_df = filtered_input
# Persist
my_output.set_mode("modify")
my_output.write_dataframe(output_df)
最佳答案
在增量转换中,很难隔离破坏代码的条件。因此,通常最好:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
def get_previous_max_date(previous_output_df) -> date:
"""Given the previous output, get the maximum date written to it"""
previous_max_date_rows = previous_output_df.groupBy().agg(
F.max(F.col("date")).alias("max_date")
).collect() # noqa
# PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
# operation in favor of making a new query plan.
if len(previous_max_date_rows) == 0:
# We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
previous_max_date = START_DATE
else:
# We have a previous max date, use it.
previous_max_date = previous_max_date_rows[0][0]
return previous_max_date
def get_next_date(previous_output_df) -> date:
"""Given the previous output, compute the max date + 1 day"""
previous_max_date = get_previous_max_date(previous_output_df)
delta = timedelta(days=JUMP_DAYS)
next_date = previous_max_date + delta
return next_date
def filter_input_to_date(current_input_df: DataFrame, date_filter: date) -> DataFrame:
"""Given the entire intput, filter to only rows that have the next date"""
return current_input_df.filter(F.col("date") == F.lit(date))
def process_with_dfs(current_input_df, previous_output_df) -> DataFrame:
"""With the constructed DataFrames, do our work"""
# Get the next date of interest from the previous output
next_date = get_next_date(previous_output_df)
# Filter the input to only the next date
filtered_input = filter_input_to_date(current_input_df, next_date)
# Do any other processing...
return filtered_input
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = my_output.dataframe("previous", output_schema)
current_input_df = my_input.dataframe("current")
# Do the processing
output_df = process_with_dfs(current_input_df, previous_output_df)
# Persist
my_output.set_mode("modify")
my_output.write_dataframe(output_df)
您现在可以设置单独的单元测试,假设您的代码位于
transforms-python/src/myproject/datasets/output.py
,遵循方法论
here正确设置一切。
from pyspark.sql import functions as F, types as T
from myproject.datasets import (
only_write_one_day,
process_with_dfs,
filter_input_to_date,
get_next_date,
get_previous_max_date,
OUTPUT_SCHEMA,
JUMP_DAYS,
START_DATE
)
import pytest
from datetime import date
@pytest.fixture
def empty_output_df(spark_session):
data = []
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
@pytest.fixture
def single_write_output_df(spark_session):
data = [{
"date": date(year=2021, month=10, day=1),
"value": 1
}]
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
@pytest.fixture
def double_write_output_df(spark_session):
data = [
{
"date": date(year=2021, month=10, day=1),
"value": 1
},
{
"date": date(year=2021, month=10, day=2),
"value": 2
}
]
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
@pytest.fixture
def normal_input_df(spark_session):
data = [
{
"date": date(year=2021, month=10, day=1),
"value": 1
},
{
"date": date(year=2021, month=10, day=2),
"value": 2
}
]
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
# ======= FIRST RUN CASE
def test_first_run_process_with_dfs(normal_input_df, empty_output_df):
assert True
def test_first_run_filter_input_to_date(normal_input_df, empty_output_df):
assert True
def test_first_run_get_next_date(normal_input_df, empty_output_df):
assert True
def test_first_run_get_previous_max_date(normal_input_df, empty_output_df):
assert True
# ======= NORMAL CASE
def test_normal_run_process_with_dfs(normal_input_df, single_write_output_df):
assert True
def test_normal_run_filter_input_to_date(normal_input_df, single_write_output_df):
assert True
def test_normal_run_get_next_date(normal_input_df, single_write_output_df):
assert True
def test_normal_run_get_previous_max_date(normal_input_df, single_write_output_df):
assert True
值得注意的是,这就是为什么您可以在 Foundry 中启用 McCabe 复杂性检查器和单元测试覆盖功能等功能的原因,这样您就可以将代码分解成更小、更耐用的部分,例如这样。
关于palantir-foundry - Palantir Foundry 增量测试很难迭代,我如何更快地发现错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69531556/
这个问题已经有答案了: What is x after "x = x++"? (18 个回答) 已关闭 6 年前。 public static void main(String[] args)
我目前正在使用 jquery 循环插件。我有 3 个不同的幻灯片,它们彼此相邻并同时循环播放。我想做的是先关闭第一张幻灯片,然后是第二张幻灯片,然后是第三张幻灯片。无论如何,我可以通过增量或超时来做到
这个问题在这里已经有了答案: 关闭 11 年前。 Possible Duplicate: ++someVariable Vs. someVariable++ in Javascript 我知道您可以
我一直在查看 SVN 手册,但无法找到“svn log”和“svn st”的“--incremental”选项的简单用法示例或解释。 我正在编写一个开源 SVN GUI 前端,因此我需要一些有关此标志
我有这种矩阵。 非常抱歉,我没有可重现的示例。 表 1: [,1][,2][,3][,4][,5][,6][,7][,8][,9][,10] [1,] 3 NA NA NA
我在hdfs中有一个 Parquet 文件作为我的数据的初始加载。接下来的所有拼花地板只是这些数据集每天都会更改为初始负载(按时间顺序)。这是我的三角洲。 我想读取全部或部分 Parquet 文件,以
我目前有这样的功能,可以将任何输入数字四舍五入到最接近的模糊整数值: $(function(){ $('#my_value').blur(function() { $(this).va
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 要求提供代码的问题必须表现出对所解决问题的最低限度的了解。包括尝试的解决方案、为什么它们不起作用以及预期结果
我对 SQL 还很陌生,我想知道我是否可以使用它来自动解决我数据库中的一个复杂问题。 也就是说,我每天都在跟踪条目。因此,我们关注的列是: YYYY MM DD XXX YYYY 是年,MM 是月,D
我正在开发一个非常简单的数据库,但我不知道数据透视表是否是一个很好的解决方案。如果我使用数据透视表,我需要添加无用的表只是为了增量。 让我们从头开始。 在用户注册期间,会创建一个新表 GROUP。在G
在 MySQL 中你可以做这样的事情 SELECT @n := @n + 1 n, first_name, last_name FROM table1, (SELECT
如果我正在使用一个类,我知道如何重载运算符 += class temp { public: int i; temp(){ i = 10; } int operator+=(in
我有两个文件:file1、file2。我想从 file2 中获取 file1 中不存在的行。 我读过 post这告诉我使用 grep 的 -v 标志来执行此操作(我阅读了 grep 的手册页,但仍然不
我看了很多类似的题,功能很简单,用于API的嵌套for循环,每分钟可以调用5次。所以我将一年数据的范围设置为 75。你们能帮我解决这个问题吗?提前致谢! 第一部分正在运行,输入列表中的邮政编码。 fo
所以我想计算每日返回/增量的一些时间序列数据,其中每日增量 = value_at_time(T)/value_at_time(T-1) import pandas as pd df=pd.DataFr
请帮我解决这个问题。该表达式之后的步骤是: //Expression offSpring1[m1++] = temp1; //Steps: 1.- increment m1 2.- assign te
我正在开发一个解决方案,在该解决方案中,我通过 webapp 不同类型的实体(例如中央数据库上的用户、组、部门信息)和 ldap 进行身份验证。但是最终用户将与来自远程位置(他的办公室、节点)的数据交
我有以下 Python 数据结构: data1 = [{'name': u'String 1'}, {'name': u'String 2'}] data2 = [{'name': u'String
如果 AtomicInteger 会发生什么?达到 Integer.MAX_VALUE 并递增? 值会回到零吗? 最佳答案 由于integer overflow,它会环绕, 到 Integer.MIN
我是 C 的初学者,我正在尝试在 While 循环中进行 0.00001 增量。例如,double t = 0.00001 并且我希望循环每次以 0.00001 的增量运行,第二次是 0.00002
我是一名优秀的程序员,十分优秀!