gpt4 book ai didi

python - 如何从python复制pyspark/hadoop中的文件

转载 作者:太空宇宙 更新时间:2023-11-03 21:18:13 25 4
gpt4 key购买 nike

我正在使用 pyspark 将数据框保存为 parquet 文件或 csv 文件:

def write_df_as_parquet_file(df, path, mode="overwrite"):
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
dfw.save(path)

def write_df_as_csv_file(df, path, mode="overwrite", header=True):
df = df.repartition(1) # join partitions to produce 1 csv file
header = "true" if header else "false"
dfw = df.write.format("csv").option("header", header).mode(mode)
dfw.save(path)

但这会将 parquet/csv 文件保存在名为 path 的文件夹中,其中保存了一些我们不需要的其他文件,如下所示:

4 files are created in path, but we only care about the PARQUET file

图片:https://ibb.co/9c1D8RL

基本上,我想创建一些函数,使用上述方法将文件保存到某个位置,然后将 CSV 或 PARQUET 文件移动到新位置。喜欢:

def write_df_as_parquet_file(df, path, mode="overwrite"):
# save df in one file inside tmp_folder
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
tmp_folder = path + "TEMP"
dfw.save(tmp_folder)

# move parquet file from tmp_folder to path
copy_file(tmp_folder + "*.parquet", path)
remove_folder(tmp_folder)

我怎样才能做到这一点?如何实现 copy_fileremove_folder?我在 scala 中看到了一些解决方案,它们使用 Hadoop api 来实现此目的,但我无法在 python 中实现此功能。我想我需要使用sparkContext,但我还在学习Hadoop,还没有找到方法。

最佳答案

您可以使用 Python 的 HDFS 库之一连接到您的 HDFS 实例,然后执行所需的任何操作。

来自 hdfs3 文档( https://hdfs3.readthedocs.io/en/latest/quickstart.html ):

from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=<host>, port=<port>)
hdfs.mv(tmp_folder + "*.parquet", path)

将以上内容包装在一个函数中,就可以开始了。

注意:我刚刚使用 hdfs3 作为示例。您还可以使用 hdfsCLI。

关于python - 如何从python复制pyspark/hadoop中的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54527136/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com