gpt4 book ai didi

scala - Spark SQL + Cassandra : bad performance

转载 作者:行者123 更新时间:2023-12-03 16:43:32 25 4
gpt4 key购买 nike

我刚开始使用 Spark SQL + Cassandra,可能遗漏了一些重要的东西,但一个简单的查询需要大约 45 秒。我正在使用 cassanda-spark-connector库,并运行也托管 Spark 的本地 Web 服务器。所以我的设置大致是这样的:

在 sbt:

    "org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
"org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))

在代码中,我有一个托管 SparkContext 的单例和 CassandraSQLContetx .然后从 servlet 调用它。下面是单例代码的样子:
object SparkModel {

val conf =
new SparkConf()
.setAppName("core")
.setMaster("local")
.set("spark.cassandra.connection.host", "127.0.0.1")

val sc = new SparkContext(conf)
val sqlC = new CassandraSQLContext(sc)
sqlC.setKeyspace("core")

val df: DataFrame = sqlC.cassandraSql(
"SELECT email, target_entity_id, target_entity_type " +
"FROM tracking_events " +
"LEFT JOIN customers " +
"WHERE entity_type = 'User' AND entity_id = customer_id")
}

在这里我如何使用它:
get("/spark") {
SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}

Cassandra、Spark 和 Web 应用程序运行在我的 Macbook Pro 虚拟机中的同一台主机上,具有不错的规范。 Cassandra 查询本身需要 10-20 毫秒。

当我第一次调用这个端点时,返回结果需要 70-80 秒。后续查询需要大约 45 秒。后续操作的日志是这样的:
12:48:50 INFO  org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool
12:49:35 INFO o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s

从日志中可以看出,最长的停顿在这 3 行之间(21 + 24 秒):
12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver

显然,我做错了什么。那是什么?我该如何改进?

编辑:重要的补充:表的大小很小( tracking_events 为 ~200 个条目, customers 为 ~20 个条目),因此将它们整体读入内存不应该花费任何大量时间。它是本地 Cassandra 安装,不涉及集群,不涉及网络。

最佳答案

  "SELECT email, target_entity_id, target_entity_type " +
"FROM tracking_events " +
"LEFT JOIN customers " +
"WHERE entity_type = 'User' AND entity_id = customer_id")

此查询将从 tracking_events 和 customers 表中读取所有数据。我会将性能与仅在两个表上执行 SELECT COUNT(*) 进行比较。如果它显着不同,那么可能存在问题,但我的猜测是这只是将两个表完全读入内存所需的时间。

有几个旋钮可以调整读取的完成方式,并且由于默认值面向更大的数据集,您可能需要更改这些。
spark.cassandra.input.split.size_in_mb  approx amount of data to be fetched into a Spark partition  64 MB
spark.cassandra.input.fetch.size_in_rows number of CQL rows fetched per driver request 1000

我会确保您生成的任务与您拥有的内核(至少)一样多,以便您可以利用所有资源。为此,请缩小 input.split.size

提取大小控制执行程序核心一次分页的行数,因此在某些用例中增加它可以提高速度。

关于scala - Spark SQL + Cassandra : bad performance,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32051648/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com