gpt4 book ai didi

apache-spark - Apache 星火 : Using folder structures to reduce run-time of analyses

转载 作者:行者123 更新时间:2023-12-03 09:44:08 25 4
gpt4 key购买 nike

我想通过将一个巨大的 csv 文件分割为不同的分区来优化 Spark 应用程序的运行时间,具体取决于它们的特性。

例如。我有一列带有客户 ID(整数,a),一列带有日期(月 + 年,例如 01.2015,b),以及带有产品 ID 的列(整数,c)(以及更多带有产品特定数据的列,不需要用于分区)。

我想建立一个像 /customer/a/date/b/product/c 这样的文件夹结构.当用户想了解客户 X 于 2016 年 1 月售出的产品信息时,他可以加载并分析保存在 /customer/X/date/01.2016/* 中的文件。 .

是否有可能通过通配符加载此类文件夹结构?还应该可以加载特定时间范围内的所有客户或产品,例如01.2015 至 09.2015。是否可以使用通配符 /customer/*/date/*.2015/product/c ?或者这样的问题怎么解决?

我想对数据进行一次分区,然后在分析中加​​载特定文件,以减少这些作业的运行时间(忽略分区的额外工作)。

解决方案:使用 Parquet 文件

我更改了我的 Spark 应用程序以将我的数据保存到 Parquet 文件中,现在一切正常,我可以通过提供文件夹结构来预先选择数据。这是我的代码片段:

JavaRDD<Article> goodRdd = ...

SQLContext sqlContext = new SQLContext(sc);

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));

StructType schema = DataTypes.createStructType(fields);

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
public Row call(Article article) throws Exception {
return RowFactory.create(article.getKeyStore(), article.getTextArticle());
}
});

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// WRITE PARQUET FILES
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");

// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");

System.out.println("READ : " + read.count());

重要

不要尝试只有一列的表格!当您尝试调用 partitionBy 时,您将收到异常。方法!

最佳答案

因此,在 Spark 中,您可以按照您想要的方式保存和读取分区数据。但是,不要像创建 /customer/a/date/b/product/c 那样创建路径。 Spark 将使用此约定 /customer=a/date=b/product=c当您使用以下方式保存数据时:

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/")

当需要读入数据时,需要指定 basepath-option像这样:
sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/")

以及 /my/base/path/ 之后的所有内容将被 Spark 解释为列。在此处给出的示例中,Spark 将添加三列 customer , dateproduct到数据框。请注意,您可以根据需要对任何列使用通配符。

至于在特定时间范围内读入数据,您应该知道 Spark 使用谓词下推,因此它只会将数据实际加载到符合条件的内存中(由某些过滤器转换指定)。但是如果你真的想明确指定范围,你可以生成一个路径名列表,然后将它传递给 read 函数。像这样:
val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
"/my/path/customer=*/date=02.2015/product=*",
"/my/path/customer=*/date=03.2015/product=*"...,
"/my/path/customer=*/date=09.2015/product=*")

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*)

无论如何,我希望这会有所帮助:)

关于apache-spark - Apache 星火 : Using folder structures to reduce run-time of analyses,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37807124/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com