gpt4 book ai didi

palantir-foundry - 如何处理代码存储库中的大文件?

转载 作者:行者123 更新时间:2023-12-04 12:34:58 40 4
gpt4 key购买 nike

我有一个每天提供一个大 .txt 文件 (50-75GB) 的数据馈送。该文件包含几个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,我该如何有效地做到这一点?

最佳答案

您需要解决的最大问题是恢复架构的迭代速度,这对于这种规模的文件来说可能具有挑战性。
此处最好的策略是获取一个示例“名义”文件,其中包含要恢复为一行的每个模式,并将其添加为存储库中的文件。当您将此文件添加到您的存储库(与您的转换逻辑一起)时,您将能够将其推送到数据帧中,就像您在数据集中使用原始文件一样,以进行快速测试迭代。
首先,确保您指定 txt文件作为包内容的一部分,这样你的测试就会发现它们(这在 Read a file from a Python repository 下的文档中有介绍):

You can read other files from your repository into the transform context. This might be useful in setting parameters for your transform code to reference.

To start, In your python repository edit setup.py:

setup(
name=os.environ['PKG_NAME'],
# ...
package_data={
'': ['*.txt']
}
)

我正在使用包含以下内容的 txt 文件:
my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing
此文本文件位于我的存储库中的以下路径: transforms-python/src/myproject/datasets/raw.txt将文本文件配置为随逻辑一起提供后,并且将文件本身包含在存储库中后,您就可以包含以下代码。这段代码有几个重要的功能:
  • 它使原始文件解析逻辑与将文件读入 Spark DataFrame 的阶段完全分开。这样,DataFrame 的构建方式就可以留给测试基础设施或运行时,具体取决于您运行的位置。
  • 保持逻辑分离可以确保您想要进行的实际逐行解析是它自己的可测试函数,而不是让它完全存在于您的 my_compute_function 中。
  • 此代码使用 Spark-native spark_session.read.text方法,这将比逐行解析原始 txt 文件快几个数量级。这将确保并行化的 DataFrame 是您操作的对象,而不是在您的执行程序(或更糟的是,您的驱动程序)中逐行处理的单个文件。

  • from transforms.api import transform, Input, Output
    from pkg_resources import resource_filename


    def raw_parsing_logic(raw_df):
    return raw_df


    @transform(
    my_output=Output("/txt_tests/parsed_files"),
    my_input=Input("/txt_tests/dataset_of_files"),
    )
    def my_compute_function(my_input, my_output, ctx):
    all_files_df = None
    for file_status in my_input.filesystem().ls('**/**'):
    raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
    parsed_df = raw_parsing_logic(raw_df)
    all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
    my_output.write_dataframe(all_files_df)


    def test_my_compute_function(spark_session):
    file_path = resource_filename(__name__, "raw.txt")
    raw_df = raw_parsing_logic(
    spark_session.read.text(file_path)
    )
    assert raw_df.count() > 0
    raw_columns_set = set(raw_df.columns)
    expected_columns_set = {"value"}
    assert len(raw_columns_set.intersection(expected_columns_set)) == 1

    一旦您启动并运行此代码,您的 test_my_compute_function方法将非常快速地迭代,以便您可以完善您的模式恢复逻辑。这将使在最后构建数据集变得更加容易,但不会产生完整构建的任何开销。

    关于palantir-foundry - 如何处理代码存储库中的大文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65706708/

    40 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com