gpt4 book ai didi

azure - 使用 Databricks PySpark 解压缩大文件

转载 作者:行者123 更新时间:2023-12-02 08:11:22 27 4
gpt4 key购买 nike

我有一个场景,其中有两个 blob 容器,分别属于两个不同的 azure 存储帐户。这两个容器都安装在 Databricks 工作区中。第一个容器cnt-input有一个文件夹,其中包含大量zip文件(每天20K),每个大约5GB 尺寸。压缩比为1/10,也就是说解压后会得到多个csv文件,总大小为appx.csv。 50GBZip 文件每天都会推送到此容器。我想创建一个 pyspark 笔记本:

1- 增量处理文件;检测新传入的文件并保留已处理的文件。

2- 将大量文件作为一批(例如20K)进行解压,并将解压后的csv文件保存在第二个blob<中 容器:cnt-output

3- 该过程应该是高性能且最佳的(Databricks 运行时版本为 12.1,集群有 8 个 Standard_DS3_v2 类型的工作节点)。

最佳答案

您可以按照以下方法进行操作。

在这里,我正在加载 zip 文件,将其解压并使用以下命令将其上传到 blob 容器自动装载机概念。它增量加载数据。

import os
import zipfile



def extract_zip_file(file_data):
file_path, modificationTime,length,content = file_data
with zipfile.ZipFile(file_path.replace("dbfs:","/dbfs"), 'r') as zip_ref:
zip_ref.extractall("/dbfs"+output_mount_path+"csv_data")

input_mount_path = "/mnt/jgsblob/cnt-input/"
output_mount_path = "/mnt/jgsblob2/cnt-ouput/"

extract_zip_file 是用于提取spark读取的文件的函数readstream 如下。

query = spark.readStream.format("cloudFiles").option("cloudFiles.format", "binaryFile").option("pathGlobFilter", "*.zip")\
.load(input_mount_path+"Databricks").writeStream.option("checkpointLocation", output_mount_path+"chk_point/")\
.foreachBatch(lambda batch_df, batch_id: batch_df.foreach(extract_zip_file)).start()

enter image description here

在此代码中,我使用的格式为cloudFiles,它调用自动加载器,和 checkpointLocation ,用于存储已上传的记录。

下面是容器一中的zip文件,即输入挂载路径input_mount_path =“/mnt/jgsblob/cnt-input/”

enter image description here

输出:以下是容器 2 中提取的文件,即输出挂载路径。output_mount_path =“/mnt/jgsblob2/cnt-ouput/”

enter image description here

在这里,您可以看到 cosmetics.csvvoted-kaggle-dataset 最初是在 11:51 创建的,我上传了新的zip 文件,然后在 11:55 gold.csv 加载,即使以前的 zip 文件包含在容器一中。

确保根据您的输入数据大小使用正确的节点类型。因为我已经针对小文件进行了测试。

关于azure - 使用 Databricks PySpark 解压缩大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76475541/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com