gpt4 book ai didi

mysql - 如何在 mysql 数据库中保存 apache spark 模式输出

转载 作者:行者123 更新时间:2023-11-29 01:40:33 27 4
gpt4 key购买 nike

谁能告诉我 apache spark 中是否有任何方法可以将 JavaRDD 存储在 mysql 数据库中?我从 2 个 csv 文件中获取输入,然后在对它们的内容进行连接操作后,我需要将输出(输出 JavaRDD)保存在 mysql 数据库中。我已经能够在 hdfs 上成功保存输出,但我没有找到与 apache Spark-MYSQL 连接相关的任何信息。下面我发布了spark sql的代码。这可以作为那些正在寻找 spark-sql 示例的人的引用。

package attempt1;

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;


public class Spark_Mysql {
@SuppressWarnings("serial")
public static class CompleteSample implements Serializable {
private String ASSETNUM;
private String ASSETTAG;
private String CALNUM;



public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getASSETTAG() {
return ASSETTAG;
}
public void setASSETTAG(String aSSETTAG) {
ASSETTAG = aSSETTAG;
}
public String getCALNUM() {
return CALNUM;
}
public void setCALNUM(String cALNUM) {
CALNUM = cALNUM;
}


}

@SuppressWarnings("serial")
public static class ExtendedSample implements Serializable {

private String ASSETNUM;
private String CHANGEBY;
private String CHANGEDATE;


public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getCHANGEBY() {
return CHANGEBY;
}
public void setCHANGEBY(String cHANGEBY) {
CHANGEBY = cHANGEBY;
}
public String getCHANGEDATE() {
return CHANGEDATE;
}
public void setCHANGEDATE(String cHANGEDATE) {
CHANGEDATE = cHANGEDATE;
}
}

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {

JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
new Function<String, CompleteSample>() {
public CompleteSample call(String line) throws Exception {
String[] parts = line.split(",");

CompleteSample cs = new CompleteSample();
cs.setASSETNUM(parts[0]);
cs.setASSETTAG(parts[1]);
cs.setCALNUM(parts[2]);

return cs;
}
});

JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
new Function<String, ExtendedSample>() {
public ExtendedSample call(String line) throws Exception {
String[] parts = line.split(",");

ExtendedSample es = new ExtendedSample();
es.setASSETNUM(parts[0]);
es.setCHANGEBY(parts[1]);
es.setCHANGEDATE(parts[2]);

return es;
}
});

JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
complete.registerAsTable("cs");

JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
extended.registerAsTable("es");

JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");


JavaRDD<String> result = fs.map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0);
}
});

result.saveAsTextFile("hdfs://path/to/hdfs/dir-name"); //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection

}



}

最后,我在 HDFS 中成功保存了结果。但是现在我想保存到MYSQL数据库中。请帮帮我。谢谢

最佳答案

您可以使用两种方法将结果写回数据库。一种是使用 DBOutputFormat 之类的东西并对其进行配置,另一种是在要保存的 RDD 上使用 foreachPartition 并传入一个函数,该函数创建与 MySQL 的连接并将结果写回。

关于mysql - 如何在 mysql 数据库中保存 apache spark 模式输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24896233/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com