- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在我的应用程序中,我比较了两个不同的数据集(即来自 Hive 的源表和来自 RDBMS 的目标表)的重复和不匹配,它适用于较小的数据集但是当我尝试比较超过 1GB 的数据时(单独的源)它挂起并抛出 TIMEOUT ERROR
,我尝试了 .config("spark.network.timeout", "600s")
即使在增加网络超时后它抛出 java. lang.OutOfMemoryError:超出 GC 开销限制
。
val spark = SparkSession.builder().master("local")
.appName("spark remote")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.160:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
val source = spark.sql("SELECT * from sample.source").rdd.map(_.mkString(","))
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
val sparkdestination = SparkSession.builder().master("local").appName("Database")
.config("spark.network.timeout", "600s")
.getOrCreate()
val jdbcUsername = "root"
val jdbcPassword = "root"
val url = "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false"
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", jdbcUsername)
connectionProperties.put("password", jdbcPassword)
val queryDestination = "(select * from destination) as dest"
val destination = sparkdestination.read.jdbc(url, queryDestination, connectionProperties).rdd.map(_.mkString(","))
我也尝试过 destination.persist(StorageLevel.MEMORY_AND_DISK_SER)
(MEMORY_AND_DISK,DISK_ONLY) 方法,但没有成功。
编辑:这是原始错误堆栈:
17/07/11 12:49:43 INFO DAGScheduler: Submitting 22 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at map at stack.scala:76)
17/07/11 12:49:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 22 tasks
17/07/11 12:49:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/11 12:51:38 INFO JDBCRDD: closed connection
17/07/11 12:51:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
17/07/11 12:51:38 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
17/07/11 12:49:43 INFO DAGScheduler: Submitting 22 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at map at stack.scala:76)
17/07/11 12:49:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 22 tasks
17/07/11 12:49:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/11 12:51:38 INFO JDBCRDD: closed connection
17/07/11 12:51:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
17/07/11 12:51:38 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
编辑 2:
我尝试使用:
val options = Map(
"url" -> "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false",
"dbtable" -> queryDestination,
"user" -> "root",
"password" -> "root")
val destination = sparkdestination.read.options(options).jdbc(options("url"), options("dbtable"), "0", 1, 5, 4, new java.util.Properties()).rdd.map(_.mkString(","))
我用少量数据检查了它的工作情况,但在使用大型数据集时它抛出了下面的错误
错误
17/07/11 14:12:46 INFO DAGScheduler: looking for newly runnable stages
17/07/11 14:12:46 INFO DAGScheduler: running: Set(ShuffleMapStage 1)
17/07/11 14:12:46 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/07/11 14:12:46 INFO DAGScheduler: failed: Set()
17/07/11 14:12:50 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.175.160:39913 in memory (size: 19.9 KB, free: 353.4 MB)
17/07/11 14:14:47 WARN ServerConnector:
17/07/11 14:15:32 WARN QueuedThreadPool:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.substring(String.java:1969)
17/07/11 14:15:32 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:179)
17/07/11 14:15:32 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver, [Lscala.Tuple2;@1e855db,BlockManagerId (driver, 192.168.175.160, 39913, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
17/07/11 14:15:32 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:179)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
17/07/11 14:15:32 WARN QueuedThreadPool: Unexpected thread death: org.spark_project.jetty.util.thread.QueuedThreadPool$3@710104 in SparkUI{STARTED,8<=8<=200,i=5,q=0}
17/07/11 14:15:32 INFO JDBCRDD: closed connection
17/07/11 14:15:32 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 22)
java.lang.OutOfMemoryError: GC overhead limit exceeded
17/07/11 14:15:32 INFO SparkUI: Stopped Spark web UI at http://192.168.175.160:4040
17/07/11 14:15:32 INFO DAGScheduler: Job 0 failed: collect at stack.scala:93, took 294.365864 s
Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)
17/07/11 14:15:32 INFO DAGScheduler: ShuffleMapStage 1 (map at stack.scala:85) failed in 294.165 s due to Stage cancelled because SparkContext was shut down
17/07/11 14:15:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@cfb906)
17/07/11 14:15:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1499762732342,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
17/07/11 14:15:32 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker-1,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
17/07/11 14:15:32 INFO DiskBlockManager: Shutdown hook called
17/07/11 14:15:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/11 14:15:32 INFO ShutdownHookManager: Shutdown hook called
17/07/11 14:15:32 INFO MemoryStore: MemoryStore cleared
17/07/11 14:15:32 INFO BlockManager: BlockManager stopped
17/07/11 14:15:32 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/11 14:15:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b2ea8bd-95c0-45e4-a1cc-bd62b3899b24
17/07/11 14:15:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b2ea8bd-95c0-45e4-a1cc-bd62b3899b24/userFiles-194d73ba-fcfa-4616-ae17-78b0bba6b465
17/07/11 14:15:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Spark 配置
虽然处于开发模式,但我正在使用 2g 内存和 1 个内核执行。我是 spark 的新手,很抱歉提出如此幼稚的问题。
谢谢!
最佳答案
首先,您正在启动两个非常无用的 SparkSession
,您只是在拆分 资源。所以不要那样做!
其次,这就是问题所在。关于 Apache Spark 的并行性和 jdbc
源存在误解(别担心,这是一个陷阱!)。
这主要是由于缺少文档。 (我最后一次检查过)
回到问题。实际发生的是以下行:
val destination = spark.read.jdbc(url, queryDestination, connectionProperties).rdd.map(_.mkString(","))
是它将读取委托(delegate)给单个工作人员。
所以主要是,如果您有足够的内存并且您成功读取了该数据。整个destination
数据将在一个分区 中。而一个分区就意味着麻烦!又名可能:
java.lang.OutOfMemoryError: GC overhead limit exceeded
所以发生的事情是,被选择用来获取数据的单个执行器不堪重负,它的 JVM 崩溃了。
现在让我们解决这个问题:
(免责声明:以下代码摘自 spark-gotchas ,我是其作者之一。)
所以让我们创建一些示例数据并将它们保存在我们的数据库中:
val options = Map(
"url" -> "jdbc:postgresql://127.0.0.1:5432/spark",
"dbtable" -> "data",
"driver" -> "org.postgresql.Driver",
"user" -> "spark",
"password" -> "spark"
)
val newData = spark.range(1000000)
.select($"id", lit(""), lit(true), current_timestamp())
.toDF("id", "name", "valid", "ts")
newData.write.format("jdbc").options(options).mode("append").save
Apache Spark 提供了两种用于通过 JDBC 加载分布式数据的方法。第一个使用整数列对数据进行分区:
val dfPartitionedWithRanges = spark.read.options(options)
.jdbc(options("url"), options("dbtable"), "id", 1, 5, 4, new java.util.Properties())
dfPartitionedWithRanges.rdd.partitions.size
// Int = 4
dfPartitionedWithRanges.rdd.glom.collect
// Array[Array[org.apache.spark.sql.Row]] = Array(
// Array([1,foo,true,2012-01-01 00:03:00.0]),
// Array([2,foo,false,2013-04-02 10:10:00.0]),
// Array([3,bar,true,2015-11-02 22:00:00.0]),
// Array([4,bar,false,2010-11-02 22:00:00.0]))
Partition column and bounds can provided using options as well:
val optionsWithBounds = options ++ Map(
"partitionColumn" -> "id",
"lowerBound" -> "1",
"upperBound" -> "5",
"numPartitions" -> "4"
)
spark.read.options(optionsWithBounds).format("jdbc").load
也可以使用选项提供分区列和边界:
val optionsWithBounds = options ++ Map(
"partitionColumn" -> "id",
"lowerBound" -> "1",
"upperBound" -> "5",
"numPartitions" -> "4"
)
spark.read.options(optionsWithBounds).format("jdbc").load
另一种选择是使用一系列谓词,但我不会在这里谈论它。
您可以阅读有关 Spark SQL 和 JDBC 源代码的更多信息 here以及其他一些问题。
希望对您有所帮助。
关于apache-spark - 读取大量数据集时 Spark 2.1 挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45027091/
我遇到以下问题。我想读取一个包含数百万行和数百列的大型 csv。我想向下转换列的数据类型。我的方法是读取 csv,然后使用 pd.to_numeric() 对其进行向下转换。我不知道列数及其类型。在读
目前,我从 SQL server (2008) 数据库获取数据。 cyurrent的方法是使用DataTable,然后将其传递并使用。 if (parameters != null)
我有以下问题。我有一个巨大的 csv 文件,想用多处理加载它。对于一个包含 500000 行和 130 列不同数据类型的示例文件,Pandas 需要 19 秒。我试过 dask 因为我想多处理阅读。但
是否有关于用于序列化各种 MFC 数据结构的二进制格式的明确文档?我已经能够在十六进制编辑器中查看我自己的一些类,并使用 Java 的 ByteBuffer 类读取它们(使用自动字节顺序转换等)。 但
我正在使用 Selenium 进行测试,我们用 HTML 文件编写测试用例,并用它们制作测试套件,我们的要求是编写足够健壮的测试用例,以根据测试环境改变自身。 为此,我不希望在 HTML 脚本本身中包
我需要一个 JavaScript 代码来读取存储为 .txt 文件的字典(或者也可以保存为任何其他类型的文件。它也可以在线获得)并将其内容存储在一个变量中。我不能找到一种让 JavaScript 像
我正在尝试遍历包含 SSH 登录和其他日志的日志文本文件。 程序正在返回 SSH 登录的总数。 我的解决方案确实有效,但似乎有点慢(在 200mo 文件上大约需要 3.5 秒)。我想知道是否有任何方法
我正在将大量数据从一个电子表格复制到工作簿中的其他 160 个电子表格。目前,Excel (2013) 遇到错误,因为它没有足够的资源来完成操作。 我的目标是将工作表 4 中 V13:XI1150 范
我正在尝试读取一个有 1147 行的文本文件。下面的代码仅读取第 1050-1147 行。我的目标是读取整个文件并提取位于不同行的特定值以在脚本中使用。一个示例是包含“BlockList: 2”的行中
我正在为游戏编写解释器。用户将其移动输入解释器,程序执行该移动。 现在我想为每个决定实现一个时间限制。玩家不应该能够思考超过 30 秒来写一个移动并按下回车。 call_with_time_limit
以this file例如,我正在尝试读取 data.frame 中的数据。来自 the doc (pdf 文件,表 1),它遵循一些 fortran 约定。我尝试了以下但收效甚微: dir 0' 将
我正在使用 R 阅读 Outlook 附件。我的引用在这里:Download attachment from an outlook email using R 这是我的电子邮件的截图: 这每天都会发送
我不会从表格中读取行来将主题放在列表中 php脚本 $url_obj='http://'.$host.':8069/xmlrpc/object'; $sock=new xmlrpc_client($u
我有一个这样的 csv 文件: id,name,value 1,peter,5 2,peter\,paul,3 我如何读取此文件并告诉 R "\," 不表示新列,仅表示 ","。 我必须添加该文件
我正在尝试读取 ~/Library/Preferences/com.apple.mail.plist (在 Snow Leopard 上)以获取电子邮件地址和其他信息以进入“关于”对话框。我使用以下代
This question already has answers here: How do I use floating-point division in bash? (19个回答) 5个月前关闭
本练习的目标是读取输入文件并将其存储到表中,然后验证输入中的某些字段并输出任何错误记录。我需要读取并存储每个策略组,以便表中一次仅存储 5 条记录,而不是整个文件。 所以我需要读取一个包含 5 条记录
据我了解,LWT 插入始终以 SERIAL 一致性级别完成。如果为 true,这是否意味着读取作为 LWT 插入的行可以安全地以 ANY 的一致性级别读取? 换句话说,我假设 LWT 插入是完全一致的
我看到很多很多通过java脚本读取cookie的函数,但我只想在变量中使用它一次,我是JS新手。 这是我的代码 var TheNumber = (Math.random() + '') * 10000
我正在使用 asp.net 和 C#。我在服务器上部署了一个应用程序[已发布],现在我想查看该网站的代码,据我所知,我可以阅读程序集来查看代码。 请告诉我如何实现它。 提前致谢。 最佳答案 您可以使用
我是一名优秀的程序员,十分优秀!