gpt4 book ai didi

csv - Spark : spark-csv takes too long

转载 作者:行者123 更新时间:2023-12-02 03:22:36 24 4
gpt4 key购买 nike

我正在尝试使用 Databricks spark-csv 从 EMR Spark 集群上 S3 上的 CSV 源创建一个 DataFrame包和 flights dataset :

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('s3n://h2o-airlines-unpacked/allyears.csv')

df.first()

这不会在 4 个 m3.xlarge 的集群上终止。我正在寻找从 PySpark 中 S3 上的 CSV 文件创建 DataFrame 的建议。或者,我尝试将文件放在 HDFS 上并从 HFDS 读取,但这也不会终止。该文件不是太大 (12 GB)。

最佳答案

要读取一个只有 12GB 的性能良好的 csv 文件,您可以将它复制到您的所有工作人员和驱动程序机器上,然后在“,”上手动拆分。这可能无法解析任何 RFC4180 csv,但它解析了我拥有的内容。

  • 在申请集群时,为每个工作人员的工作人员磁盘空间添加至少 12GB 的额外空间。
  • 使用至少具有 12GB RAM 的机器类型,例如 c3.2xlarge。如果您不打算让集群闲置并且能够负担得起更大的费用,那就扩大规模。更大的机器意味着更少的磁盘文件复制开始。我经常在现货市场上看到 c3.8xlarge 低于 0.50 美元/小时。

将文件复制到您的每个工作人员,在每个工作人员的同一目录中。这应该是物理连接的驱动器,即每台机器上的不同物理驱动器。

确保驱动程序机器上也有相同的文件和目录。

raw = sc.textFile("/data.csv")

print "Counted %d lines in /data.csv" % raw.count()

raw_fields = raw.first()
# this regular expression is for quoted fields. i.e. "23","38","blue",...
matchre = r'^"(.*)"$'
pmatchre = re.compile(matchre)

def uncsv_line(line):
return [pmatchre.match(s).group(1) for s in line.split(',')]

fields = uncsv_line(raw_fields)

def raw_to_dict(raw_line):
return dict(zip(fields, uncsv_line(raw_line)))

parsedData = (raw
.map(raw_to_dict)
.cache()
)

print "Counted %d parsed lines" % parsedData.count()

parsedData 将是字典的 RDD,其中字典的键是第一行的 CSV 字段名称,值是当前行的 CSV 值。如果您的 CSV 数据中没有标题行,这可能不适合您,但应该清楚的是,您可以覆盖读取此处第一行的代码并手动设置字段。

请注意,这对于创建数据框或注册 spark SQL 表不是立即有用的。但是其他的就OK了,如果需要转储到spark SQL中,还可以进一步抽取转化成更好的格式。

我在一个 7GB 的文件上使用它没有任何问题,除了我已经删除了一些过滤逻辑来检测有效数据,这些逻辑有一个副作用,即从解析的数据中删除 header 。您可能需要重新实现一些过滤。

关于csv - Spark : spark-csv takes too long,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32265649/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com