gpt4 book ai didi

mysql - 将大数据从 MySQL 加载到 Spark

转载 作者:行者123 更新时间:2023-11-29 02:47:30 25 4
gpt4 key购买 nike

寻找对 Spark 的理解...

我正在将大量数据从 MySQL 加载到 Spark 中,但它一直在死掉 :-(

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)

这是我的代码

val query =
s"""
(
select
mod(act.AccountID, ${parts}) part,
p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
from DimIdentity as id
join FactArrival as arr on arr.IdentityID=id.IdentityID
join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
join DimAccount as act on act.AccountID=event.AccountID
join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
join DimDateTime as d on event.DateTimeID=d.DateTimeID
join DimProperty as p on p.PropertyID=event.EventTypeID
where
id.Botness=0 and
d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
(role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
) a
""".stripMargin

val events = sqlContext.read.format("jdbc").
option("url", sqlURL).
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("zeroDateTimeBehavior", "round").
option("continueBatchOnError", "true").
option("useSSL", "false").
option("dbtable", query).
option("user", sqlUser).
option("password", sqlPassword).
option("partitionColumn", "part").
option("lowerBound", "0").
option("upperBound", s"${parts - 1}").
option("numPartitions", s"${parts}").
load().as[Activity].toDF

请注意,我正在使用其他答案中推荐的 partitionColumn、lowerBound、upperBound、numPartitions

我尝试将分区设置为 4 到 512,但它总是死机。从文件或Mongo中读取相同数量的数据没有问题。这是 MySQL 连接器的问题吗?有解决办法吗?

请注意,我找到了一个建议我避免使用 Spark 的答案,并将查询读入 HDFS 上的一个文件,然后加载该文件

Multiple Partitions in Spark RDD

这真的是最好的方法吗?

最佳答案

这是我得到的答案...

对我来说,答案是避免 Spark 的 mysql-connection :-( 我发现很难避免分区导致的崩溃。Mysql-connection 需要手动调整分区,并且不会产生任何增加速度。编写将数据读取到大型文本文件中的非 Spark 代码以及在文本文件上调用 Spark 更容易。Spark 对于大多数数据源来说真的很好,但不是 mysql ......至少现在还不行

关于mysql - 将大数据从 MySQL 加载到 Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40132635/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com