gpt4 book ai didi

apache-spark - 如何更快地计算我的 Foundry 'latest version' 数据集?

转载 作者:行者123 更新时间:2023-12-04 08:43:26 25 4
gpt4 key购买 nike

我有一个数据集,用于摄取对我的数据行的最新编辑,但它只摄取最近编辑过的版本。 (即它是在 update_ts 时间戳列上递增的)。
原表:

| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
表更新时:
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
| key_1 | 2 |
摄取后,我需要计算所有先前更新的“最新版本”,同时还要考虑任何新的编辑。
这意味着我每次都会进行增量摄取并运行 SNAPSHOT 输出。这对于我的构建来说非常慢,因为我注意到每次我想为我的数据计算最新版本时,我都必须查看所有输出行。
交易 n=1(快照):
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
交易 n=2(追加):
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 1 |
| key_2 | 1 |
我怎样才能使这个“最新版本”的计算速度更快?

最佳答案

这是将受益于 bucketing 的常见模式.
其要点是:根据您的 primary_key 将您的输出 SNAPSHOT 写入存储桶中。列,其中完全跳过了对大得多的输出进行改组的昂贵步骤。
这意味着您只需将新数据交换到已经包含您之前历史记录的存储桶。
让我们从初始状态开始,我们在先前计算的“最新”版本上运行,该版本是一个慢速 SNAPSHOT:

- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT
如果我们写出 clean_datasetprimary_key 上使用分桶列到单独计算的桶数中以适应我们预期的数据规模,我们需要以下代码:
from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


@transform(
my_output=Output("/datasets/clean_dataset"),
my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):

BUCKET_COUNT = 600
PRIMARY_KEY = "primary_key"
ORDER_COL = "update_ts"

updated_keys = my_input.dataframe("added")
last_written = my_output.dataframe("current")

updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)

value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]

updated_keys = updated_keys.select(
PRIMARY_KEY,
*[F.col(x).alias("updated_keys_" + x) for x in value_cols]
)

last_written = last_written.select(
PRIMARY_KEY,
*[F.col(x).alias("last_written_" + x) for x in value_cols]
)

all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")

latest_df = all_rows.select(
PRIMARY_KEY,
*[F.coalesce(
F.col("updated_keys_" + x),
F.col("last_written_" + x)
).alias(x) for x in value_cols]
)

my_output.set_mode("replace")

return my_output.write_dataframe(
latest_df,
bucket_cols=PRIMARY_KEY,
bucket_count=BUCKET_COUNT,
sort_by=ORDER_COL
)
当它运行时,您会在查询计划中注意到输出的项目步骤不再包括交换,这意味着它不会对数据进行混洗。您现在将看到的唯一交换是在输入上,它需要以与格式化输出完全相同的方式分发更改(这是一个非常快的操作)。
然后将此交换保存到 fullouter 中。加入步骤,然后加入将利用这一点并非常快速地运行 600 个任务。最后,我们通过在与以前相同的列上显式地分桶到相同数量的桶中来维护输出的格式。
注意:使用这种方法,您在每个存储桶中的文件大小会随着时间的推移而增长,并且没有考虑增加存储桶数量以保持大小合适的需要。您最终将使用此技术达到阈值,即文件大小超过 128MB 并且您不再有效执行(修复方法是提高 BUCKET_COUNT 值)。
您的输出现在将如下所示:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: BUCKET_COUNT by PRIMARY_KEY
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT

关于apache-spark - 如何更快地计算我的 Foundry 'latest version' 数据集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64448980/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com