gpt4 book ai didi

python - PySpark 解压文件 : Which is a good approach for unzipping files and storing the csv files into a Delta Table?

转载 作者:行者123 更新时间:2023-12-03 21:00:08 24 4
gpt4 key购买 nike

我在 Amazon s3 中存储了 zip 文件,然后我有一个 Python 列表为 ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"] ,我需要使用 Spark Cluster 解压缩所有这些文件,并将所有 CSV 文件存储到增量格式表中。我想知道比我目前的方法更快的处理方法:

1) 我有一个 用于在我的 Python 列表中进行迭代。

2) 我正在使用 Python Boto3 从 s3 获取 zip 文件 s3.bucket.Object(file)
3)我正在使用下一个代码解压缩文件

import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
shutil.rmtree(outputZip)
dbutils.fs.rm("dbfs:" + outputZip, True)

4)我的文件在驱动程序节点中解压缩,然后执行程序无法访问这些文件(我没有找到方法)所以我使用 dbutils.fs.cp() 将所有这些 csv 文件移动到 DBFS |

5)我使用 Pyspark Dataframe 从 DBFS 读取所有 csv 文件,并将其写入 Delta 表
df = self.spark.read.option("header", "true").csv("dbfs:" + file) 
df.write.format("delta").save(path)

6) 我从 DBFS 和驱动程序节点中删除数据

因此,我目前的目标是在比我之前的过程更短的时间内将 S3 中的 zip 文件摄取到 Delta 表中。我想我可以将其中一些过程并行化为 1) 步骤,我想避免复制到 DBFS 的步骤,因为我不需要在那里保存数据,我还需要在每次摄取后删除 CSV 文件Delta 表以避免驱动程序节点磁盘中的内存错误。有什么建议吗?

最佳答案

好吧,多种可能的解决方案可能是:

  • 您可以使用 df=spark.read.csv("s3://mybucket") 一起读取所有文件(如果架构允许)并使用 df.write.format("delta").save(path)
    将数据帧写为增量
  • 您可以单独读取数据帧中的每个文件并直接附加到现有的增量表(即使它是空的),而无需将其存储在 DBFS 中。更多详情:https://docs.databricks.com/delta/delta-batch.html#append-using-dataframes
  • 您可以在数据框中单独读取每个文件并将其合并到现有的主数据框中。最后,您可以将主数据帧写为增量表。

  • 选项 3 类似于:
        import io
    import boto3
    import shutil
    import zipfile
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("name").getOrCreate()

    schema = StructType([
    \\ YOUR DATA SCHMEA
    ])

    df = spark.createDataFrame([], schema)

    for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
    obj = s3.bucket.Object(file)
    with io.BytesIO(obj.get()["Body"].read()) as tf:
    tf.seek(0)
    with zipfile.ZipFile(tf, mode='r') as zipf:
    for subfile in zipf.namelist():
    zipf.extract(subfile, outputZip)
    tempdf = spark.read.option("header", "true").csv(outputZip)
    df = df.union(tempdf)

    df.write.format("delta").save(path)

    关于python - PySpark 解压文件 : Which is a good approach for unzipping files and storing the csv files into a Delta Table?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58628626/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com