- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们正在使用 Apache-spark mongo-spark library (用于与 MongoDB 连接)和 spark-redshift library (用于连接 Amazon Redshift DWH)。我们的工作表现非常糟糕。
因此,我希望得到一些帮助,以了解我们的程序是否做错了什么,或者这是我们所使用的基础设施所期望的结果。
我们正在 4 个 AWS EC2 节点上使用 MESOS 资源管理器运行作业,每个节点的配置如下:
RAM: 16GB, CPU cores: 4, SSD: 200GB
我们在 Redshift 集群中有 3 个表:
TABLE_NAME SCHEMA NUMBER_OF_ROWS
table1 (table1Id, table2FkId, table3FkId, ...) 50M
table2 (table2Id, phonenumber, email,...) 700M
table3 (table3Id, ...) 2K
在 MongoDB 中,我们有一个包含 3500 万个文档的集合,示例文档如下(所有电子邮件和电话号码在这里都是唯一的,没有重复):
{
"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c",
"idKeys": {
"email": [
"a@gmail.com",
"b@gmail.com"
],
"phonenumber": [
"1111111111",
"2222222222"
]
},
"flag": false,
...
...
...
}
我们正在使用spark-mongo连接器将其过滤和扁平化(请参阅mongo-spark聚合管道末尾的代码)为以下格式(因为我们需要加入来自Redshift和Mongo的数据,其中电子邮件或电话号码与另一个匹配可用选项是 Spark SQL 中的 array_contains() ,这有点慢):
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c", "email": "a@gmail.com", "phonenumber": null},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": "b@gmail.com","phonenumber": null},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "1111111111"},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "22222222222"}
Spark计算步骤(请引用下面的代码以更好地理解这些步骤):
这是上述步骤的代码:
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import java.util.Arrays;
public class SparkMongoRedshiftTest {
private static SparkSession sparkSession;
private static SparkContext sparkContext;
private static SQLContext sqlContext;
public static void main(String[] args) {
sparkSession = SparkSession.builder().appName("redshift-spark-test").getOrCreate();
sparkContext = sparkSession.sparkContext();
sqlContext = new SQLContext(sparkContext);
Dataset table1Dataset = executeRedshiftQuery("(SELECT table1Id,table2FkId,table3FkId FROM table1)");
table1Dataset.createOrReplaceTempView("table1Dataset");
Dataset table2Dataset = executeRedshiftQuery("(SELECT table2Id,phonenumber,email FROM table2)");
table2Dataset.createOrReplaceTempView("table2Dataset");
Dataset table3Dataset = executeRedshiftQuery("(SELECT table3Id FROM table3");
table3Dataset.createOrReplaceTempView("table3Dataset");
Dataset redshiftJoinedDataset = sqlContext.sql(" SELECT a.*,b.*,c.*" +
" FROM table1Dataset a " +
" LEFT JOIN table2Dataset b ON a.table2FkId = b.table2Id" +
" LEFT JOIN table3Dataset c ON a.table3FkId = c.table3Id");
redshiftJoinedDataset.createOrReplaceTempView("redshiftJoinedDataset");
JavaMongoRDD<Document> userIdentityRDD = MongoSpark.load(getJavaSparkContext());
Dataset mongoDataset = userIdentityRDD.withPipeline(
Arrays.asList(
Document.parse("{$match: {flag: false}}"),
Document.parse("{ $unwind: { path: \"$idKeys.email\" } }"),
Document.parse("{$group: {_id: \"$_id\",emailArr: {$push: {email: \"$idKeys.email\",phonenumber: {$ifNull: [\"$description\", null]}}},\"idKeys\": {$first: \"$idKeys\"}}}"),
Document.parse("{$unwind: \"$idKeys.phonenumber\"}"),
Document.parse("{$group: {_id: \"$_id\",phoneArr: {$push: {phonenumber: \"$idKeys.phonenumber\",email: {$ifNull: [\"$description\", null]}}},\"emailArr\": {$first: \"$emailArr\"}}}"),
Document.parse("{$project: {_id: 1,value: {$setUnion: [\"$emailArr\", \"$phoneArr\"]}}}"),
Document.parse("{$unwind: \"$value\"}"),
Document.parse("{$project: {email: \"$value.email\",phonenumber: \"$value.phonenumber\"}}")
)).toDF();
mongoDataset.createOrReplaceTempView("mongoDataset");
Dataset joinRedshiftAndMongoDataset = sqlContext.sql(" SELECT a.* , b._id AS finalId " +
" FROM redshiftJoinedData AS a INNER JOIN mongoDataset AS b " +
" ON b.email = a.email OR b.phonenumber = a.phonenumber");
//aggregating joinRedshiftAndMongoDataset
//then storing to mysql
}
private static Dataset executeRedshiftQuery(String query) {
return sqlContext.read()
.format("com.databricks.spark.redshift")
.option("url", "jdbc://...")
.option("query", query)
.option("aws_iam_role", "...")
.option("tempdir", "s3a://...")
.load();
}
public static JavaSparkContext getJavaSparkContext() {
sparkContext.conf().set("spark.mongodb.input.uri", "");
sparkContext.conf().set("spark.sql.crossJoin.enabled", "true");
return new JavaSparkContext(sparkContext);
}
}
在上述基础设施上完成这项工作的时间估计超过 2 个月。
因此,定量地总结连接:
RedshiftDataWithMongoDataJoin => (RedshiftDataJoin) INNER_JOIN (MongoData)
=> (50M LEFT_JOIN 700M LEFT_JOIN 2K) INNER_JOIN (~100M)
=> (50M) INNER_JOIN (~100M)
对此的任何帮助将不胜感激。
最佳答案
经过大量调查后,我们发现表 2 中 90% 的数据的电子邮件或电话号码为空,而我错过了处理查询中空值的联接。
这就是性能瓶颈的主要问题。
解决此问题后,作业现在可以在 2 小时内运行。
因此,spark-redshift 或 mongo-spark 没有任何问题,它们的性能非常好:)
关于java - Spark sql 连接 mongo-spark 和 Spark-redshift 连接器的性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42929885/
我知道这个问题可能已经被问过,但我检查了所有这些,我认为我的情况有所不同(请友善)。所以我有两个数据集,第一个是测试数据集,第二个是我保存在数据框中的预测(预测值,这就是没有数据列的原因)。我想合并两
在 .loc 方法的帮助下,我根据同一数据框中另一列中的值来识别 Panda 数据框中某一列中的值。 下面给出了代码片段供您引用: var1 = output_df['Player'].loc[out
当我在 Windows 中使用 WinSCP 通过 Ubuntu 连接到 VMware 时,它提示: The server rejected SFTP connection, but it lis
我正在开发一个使用 xml web 服务的 android 应用程序。在 wi-fi 网络中连接时工作正常,但在 3G 网络中连接时失败(未找到 http 404)。 这不仅仅发生在设备中。为了进行测
我有一个XIB包含我的控件的文件,加载到 Interface Builder(Snow Leopard 上的 Xcode 4.0.2)中。 文件的所有者被设置为 someClassController
我在本地计算机上管理 MySQL 数据库,并通过运行以下程序通过 C 连接到它: #include #include #include int main(int argc, char** arg
我不知道为什么每次有人访问我网站上的页面时,都会打开一个与数据库的新连接。最终我到达了大约 300 并收到错误并且页面不再加载。我认为它应该工作的方式是,我将 maxIdle 设置为 30,这意味着
希望清理 NMEA GPS 中的 .txt 文件。我当前的代码如下。 deletes = ['$GPGGA', '$GPGSA', '$GPGSV', '$PSRF156', ] searchquer
我有一个 URL、一个用户名和一个密码。我想在 C# .Net WinForms 中建立 VPN 连接。 你能告诉我从哪里开始吗?任何第三方 API? 代码示例将受到高度赞赏... 最佳答案 您可以像
有没有更好的方法将字符串 vector 转换为字符 vector ,字符串之间的终止符为零。 因此,如果我有一个包含以下字符串的 vector "test","my","string",那么我想接收一
我正在编写一个库,它不断检查 android 设备的连接,并在设备连接、断开连接或互联网连接变慢时给出回调。 https://github.com/muddassir235/connection_ch
我的操作系统:Centos 7 + CLOUDLINUX 7.7当我尝试从服务器登录Mysql时 [root@server3 ~]# Mysql -u root -h localhost -P 330
我收到错误:Puma 发现此错误:无法打开到本地主机的 TCP 连接:9200(连接被拒绝 - 连接(2)用于“本地主机”端口 9200)(Faraday::ConnectionFailed)在我的
请给我一些解决以下错误的方法。 这是一个聊天应用....代码和错误如下:: conversations_controller.rb def create if Conversation.bet
我想将两个单元格中的数据连接到一个单元格中。我还想只组合那些具有相同 ID 的单元格。 任务 ID 名称 4355.2 参与者 4355.2 领袖 4462.1 在线 4462.1 快速 4597.1
我经常需要连接 TSQL 中的字段... 使用“+”运算符时 TSQL 强制您处理的两个问题是 Data Type Precedence和 NULL 值。 使用数据类型优先级,问题是转换错误。 1)
有没有在 iPad 或 iPhone 应用程序中使用 Facebook 连接。 这个想法是登录这个应用程序,然后能够看到我的哪些 facebook 用户也在使用该应用程序及其功能。 最佳答案 是的。
我在连接或打印字符串时遇到了一个奇怪的问题。我有一个 char * ,可以将其设置为字符串文字的几个值之一。 char *myStrLiteral = NULL; ... if(blah) myS
对于以下数据 - let $x := "Yahooooo !!!! Select one number - " let $y := 1 2 3 4 5 6 7 我想得到
我正在看 UDEMY for perl 的培训视频,但是视频不清晰,看起来有错误。 培训展示了如何使用以下示例连接 2 个字符串: #!usr/bin/perl print $str = "Hi";
我是一名优秀的程序员,十分优秀!