gpt4 book ai didi

java - 如何使用Spark处理CSV数据并输出序列化的自定义JavaObject?

转载 作者:太空宇宙 更新时间:2023-11-04 11:05:20 25 4
gpt4 key购买 nike

我有一个包含小 CSV 文件的文件夹,其中包含目录:“/catmydata/2017/1/1、/catdata/2017/1/2”,每个目录都包含一堆 CSV,其中 CSV 中的每一行都有 cat 适应。

我想使用spark分配负载并转换数据并将其粘贴到输出目录中的二进制JavaObject文件中,类似于我的源数据“/convertedcatdata/2017/1/1/adoptedcatjavaobjectbinaryfile,/convertedcatdata/2017/1/1/adoptedcatjavaobjectbinaryfile”

我将使用的 Java 对象是这样的:

public class Cat{
private Date dateOfAdoption;
private String name;
private Boolean isMale;
private Person owner;

public Cat(String name, Boolean isMale, Person owner, Date dateOfAdoption){
this.name = name;
this.isMale = isMale;
this.owner = owner;
this.dateOfAdoption = dateOfAdoption;
}
}

public class AdoptedCats{
public List<Cat> catList;

public AdoptedCats(){
}
}

我习惯使用 python Spark 将 csv 文件转换为 avro,但问题是在这种情况下我的输出文件是一个已填充的 Java 对象类。我怎样才能创建这样一个文件,其中它是一个序列化的 java 对象,其中包含 AdoptedClass 对象,该对象填充了 CSV 中列出的所有猫?

我的意思是,如果有人编写 Java 代码(非 Spark)并打开序列化的“adoptedcatjavaobjectbinaryfile”,他们就可以访问 Java 类的内容。说:

AdoptedCats adoptedCats = <the adoptedcatjavaobjectbinaryfile>
System.out.println(adoptedCats.cats.length) // this would output say, 75 if there were 75 rows in the original CSV.

我假设我必须使用Java Spark...但我不确定如何创建spark脚本,以便它可以划分并征服一月份整个月的所有cat目录,并将它们输出到各自的目录中。任何帮助或示例都会非常有帮助。

最佳答案

实现这一目标的步骤-

  1. 假设文件(我有一个包含目录的小 CSV 文件的文件夹)是 /somepath/catmydata.csv

        JavaSparkContext context  = ......;

    //Retrieve list of files from master file

    List<String> files = context.textFile("/somepath/catmydata.csv")
    .flatMap(row->Arrays.asList(row.split(",")).iterator()).collect();

    //Iterate through each file and save it as object file
    Map<String,JavaRDD<Cat>> adoptedCatsMap =new HashMap<String,JavaRDD<Cat>>();
    for (String file:files) {

    // Extract file path and attach to convert
    String output=String.format("/convertedcatdata/%s",file.substring(file.indexOf('/'),file.length()))

    JavaRDD<Cat> adoptedCatsRdd = context.textFile(file).map(s->new Cat(s));

    adoptedCatsMap.put(output,adoptedCatsRdd);
    //This will write number of files based on rdd partitions,
    // if you want only one file in a directory then save it
    // to temporary directory then use copyMerge to create one file using Hadoop fs FileUtil
    // https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html
    //Save to physical location if required.
    adoptedCatsRdd.saveAsObjectFile(output);
    }

我希望这会有所帮助。

关于java - 如何使用Spark处理CSV数据并输出序列化的自定义JavaObject?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46535922/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com