gpt4 book ai didi

java - Spark sql 连接 mongo-spark 和 Spark-redshift 连接器的性能问题

转载 作者:行者123 更新时间:2023-12-02 02:56:59 24 4
gpt4 key购买 nike

我们正在使用 Apache-spark mongo-spark library (用于与 MongoDB 连接)和 spark-redshift library (用于连接 Amazon Redshift DWH)。我们的工作表现非常糟糕。

因此,我希望得到一些帮助,以了解我们的程序是否做错了什么,或者这是我们所使用的基础设施所期望的结果。

我们正在 4 个 AWS EC2 节点上使用 MESOS 资源管理器运行作业,每个节点的配置如下:

RAM: 16GB, CPU cores: 4, SSD: 200GB

我们在 Redshift 集群中有 3 个表:

TABLE_NAME  SCHEMA                                    NUMBER_OF_ROWS
table1 (table1Id, table2FkId, table3FkId, ...) 50M
table2 (table2Id, phonenumber, email,...) 700M
table3 (table3Id, ...) 2K

在 MongoDB 中,我们有一个包含 3500 万个文档的集合,示例文档如下(所有电子邮件和电话号码在这里都是唯一的,没有重复):

{
"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c",
"idKeys": {
"email": [
"a@gmail.com",
"b@gmail.com"
],
"phonenumber": [
"1111111111",
"2222222222"
]
},
"flag": false,
...
...
...
}

我们正在使用spark-mongo连接器将其过滤和扁平化(请参阅mongo-spark聚合管道末尾的代码)为以下格式(因为我们需要加入来自Redshift和Mongo的数据,其中电子邮件或电话号码与另一个匹配可用选项是 Spark SQL 中的 array_contains() ,这有点慢):

  {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c", "email": "a@gmail.com", "phonenumber": null},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": "b@gmail.com","phonenumber": null},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "1111111111"},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "22222222222"}

Spark计算步骤(请引用下面的代码以更好地理解这些步骤):

  1. 首先,我们使用 Spark-redshift 连接器将 3 个 Redshift 表中的所有数据分别加载到 table1Dataset、table2Dataset、table3Dataset 中。
  2. 使用 SparkSQL 连接这 3 个表并创建新的数据集 redshiftJoinedDataset。 (该操作独立完成6小时)
  3. 使用 mongo-spark 连接器将 MongoDB 数据加载到 mongoDataset 中。
  4. 加入 mongoDataset 和 redshiftJoinedDataset。 (这里是瓶颈,因为我们需要将来自 redshift 的超过 5000 万行与来自 mongodb 的超过 1 亿条扁平行连接起来)
    注意:- mongo-spark 似乎也有一些 internal issue with its aggregation pipeline execution这可能会使其变得非常慢。
  5. 然后我们对 FinalId 上的数据进行一些聚合和分组

这是上述步骤的代码:

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import java.util.Arrays;

public class SparkMongoRedshiftTest {

private static SparkSession sparkSession;
private static SparkContext sparkContext;
private static SQLContext sqlContext;

public static void main(String[] args) {

sparkSession = SparkSession.builder().appName("redshift-spark-test").getOrCreate();
sparkContext = sparkSession.sparkContext();
sqlContext = new SQLContext(sparkContext);


Dataset table1Dataset = executeRedshiftQuery("(SELECT table1Id,table2FkId,table3FkId FROM table1)");
table1Dataset.createOrReplaceTempView("table1Dataset");

Dataset table2Dataset = executeRedshiftQuery("(SELECT table2Id,phonenumber,email FROM table2)");
table2Dataset.createOrReplaceTempView("table2Dataset");

Dataset table3Dataset = executeRedshiftQuery("(SELECT table3Id FROM table3");
table3Dataset.createOrReplaceTempView("table3Dataset");


Dataset redshiftJoinedDataset = sqlContext.sql(" SELECT a.*,b.*,c.*" +
" FROM table1Dataset a " +
" LEFT JOIN table2Dataset b ON a.table2FkId = b.table2Id" +
" LEFT JOIN table3Dataset c ON a.table3FkId = c.table3Id");
redshiftJoinedDataset.createOrReplaceTempView("redshiftJoinedDataset");

JavaMongoRDD<Document> userIdentityRDD = MongoSpark.load(getJavaSparkContext());
Dataset mongoDataset = userIdentityRDD.withPipeline(
Arrays.asList(
Document.parse("{$match: {flag: false}}"),
Document.parse("{ $unwind: { path: \"$idKeys.email\" } }"),
Document.parse("{$group: {_id: \"$_id\",emailArr: {$push: {email: \"$idKeys.email\",phonenumber: {$ifNull: [\"$description\", null]}}},\"idKeys\": {$first: \"$idKeys\"}}}"),
Document.parse("{$unwind: \"$idKeys.phonenumber\"}"),
Document.parse("{$group: {_id: \"$_id\",phoneArr: {$push: {phonenumber: \"$idKeys.phonenumber\",email: {$ifNull: [\"$description\", null]}}},\"emailArr\": {$first: \"$emailArr\"}}}"),
Document.parse("{$project: {_id: 1,value: {$setUnion: [\"$emailArr\", \"$phoneArr\"]}}}"),
Document.parse("{$unwind: \"$value\"}"),
Document.parse("{$project: {email: \"$value.email\",phonenumber: \"$value.phonenumber\"}}")
)).toDF();
mongoDataset.createOrReplaceTempView("mongoDataset");

Dataset joinRedshiftAndMongoDataset = sqlContext.sql(" SELECT a.* , b._id AS finalId " +
" FROM redshiftJoinedData AS a INNER JOIN mongoDataset AS b " +
" ON b.email = a.email OR b.phonenumber = a.phonenumber");

//aggregating joinRedshiftAndMongoDataset
//then storing to mysql
}

private static Dataset executeRedshiftQuery(String query) {
return sqlContext.read()
.format("com.databricks.spark.redshift")
.option("url", "jdbc://...")
.option("query", query)
.option("aws_iam_role", "...")
.option("tempdir", "s3a://...")
.load();
}

public static JavaSparkContext getJavaSparkContext() {
sparkContext.conf().set("spark.mongodb.input.uri", "");
sparkContext.conf().set("spark.sql.crossJoin.enabled", "true");
return new JavaSparkContext(sparkContext);
}
}

在上述基础设施上完成这项工作的时间估计超过 2 个月。

因此,定量地总结连​​接:

RedshiftDataWithMongoDataJoin => (RedshiftDataJoin)                INNER_JOIN (MongoData)
=> (50M LEFT_JOIN 700M LEFT_JOIN 2K) INNER_JOIN (~100M)
=> (50M) INNER_JOIN (~100M)

对此的任何帮助将不胜感激。

最佳答案

经过大量调查后,我们发现表 2 中 90% 的数据的电子邮件或电话号码为空,而我错过了处理查询中空值的联接。

这就是性能瓶颈的主要问题。

解决此问题后,作业现在可以在 2 小时内运行。

因此,spark-redshift 或 mongo-spark 没有任何问题,它们的性能非常好:)

关于java - Spark sql 连接 mongo-spark 和 Spark-redshift 连接器的性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42929885/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com