gpt4 book ai didi

python - Pyspark 将多个 csv 文件读入数据框(或 RDD?)

转载 作者:太空宇宙 更新时间:2023-11-03 12:45:03 26 4
gpt4 key购买 nike

我有一个 Spark 2.0.2 集群,我通过 Jupyter Notebook 通过 Pyspark 访问它。我有多个管道分隔的 txt 文件(加载到 HDFS。但也可在本地目录中使用),我需要使用 spark-csv 将其加载到三个单独的数据帧中,具体取决于文件的名称。

我看到了我可以采用的三种方法 - 或者我可以使用 python 以某种方式遍历 HDFS 目录(还没有想出如何做到这一点,加载每个文件然后进行合并。

我也知道 spark 中存在一些通配符功能(请参阅 here)——我可能可以利用

最后,我可以使用 pandas 从磁盘加载 vanilla csv 文件作为 pandas 数据帧,然后创建一个 spark 数据帧。这里的缺点是这些文件很大,在单个节点上加载到内存中可能需要 ~8gb。 (这就是为什么首先要迁移到集群的原因)。

这是我目前的代码和这两种方法的一些伪代码:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
pd_df = pd.read_csv(currFile, sep = '|')
df = spark.createDataFrame(pd_df)
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df

有谁知道如何实现方法 1 或 2?我一直无法弄清楚这些。此外,令我感到惊讶的是,没有更好的方法将 csv 文件加载到 pyspark 数据帧中 - 使用第三方包来处理看起来应该是 native 功能的东西让我感到困惑(我只是错过了标准用例吗用于将 csv 文件加载到数据帧中?)最终,我将把一个合并的单个数据帧写回到 HDFS(使用 .write.parquet() ),这样我就可以清除内存并使用 MLlib 进行一些分析。如果我强调的方法不是最佳实践,我将不胜感激插入正确的方向!

最佳答案

方法一:

在 python 中,您不能直接引用 HDFS 位置。你需要借助另一个库,比如 pydoop。在 scala 和 java 中,你有 API。即使使用 pydoop,您也会一个接一个地阅读文件。一个一个读取文件,不使用spark提供的并行读取选项是不好的。

方法 2:

您应该能够用逗号分隔或通配符指向多个文件。这样 spark 负责读取文件并将它们分发到分区中。但是,如果您对每个数据框使用 union 选项,那么当您动态读取每个文件时,就会出现一种边缘情况。当您有很多文件时,列表在驱动程序级别会变得非常庞大,并可能导致内存问题。主要原因是,读取过程仍在驱动程序级别发生。

这个选项更好。 spark 会读取所有与正则表达式相关的文件并将它们转换为分区。你得到一个 RDD 用于所有通配符匹配,从那里你不需要担心单个 rdd 的联合

示例代码片段:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")

方法 3:

除非你有一些使用 pandas 特性的 python 遗留应用程序,否则我更喜欢使用 spark 提供的 API

关于python - Pyspark 将多个 csv 文件读入数据框(或 RDD?),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41129787/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com