gpt4 book ai didi

scala - 如何在 Spark 中读取 cassandra 分区时获得良好的性能?

转载 作者:行者123 更新时间:2023-12-04 19:03:42 25 4
gpt4 key购买 nike

我正在使用 cassandra-connector 从 cassandra 分区读取数据以产生 Spark 。我尝试了以下读取分区的解决方案。我尝试通过尽可能多地创建 rdds 来并行化任务,但解决方案一和解决方案二都具有相同的性能。

在解决方案一中,我可以立即看到 spark UI 中的各个阶段。我试图在解决方案二中避免一个 for 循环。

在解决方案二中,阶段出现在相当长的时间之后。此外,随着用户 ID 数量的增加,阶段在解决方案二的 spark UI 中出现之前的时间显着增加。

Version
spark - 1.1
Dse - 4.6
cassandra-connector -1.1

Setup
3 - Nodes with spark cassandra
Each node has 1 core dedicated to this task.
512MB ram for the executor memory.

我的 cassandra 表架构,
 CREATE TABLE   test (
user text,
userid bigint,
period timestamp,
ip text,
data blob,
PRIMARY KEY((user,userid,period),ip)
);

第一个解决方案:
 val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val partitions = users.flatMap(x => period.map(y => (x,y))))
val userids = 1 to 10
for (userid <- userids){
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,userid)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
}

第二种解决方案:
 val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val userids = 1 to 10
val partitions = users.flatMap(x => period.flatMap(
y => userids.map(z => (x,y,z))))

val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,x._3)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)

为什么解决方案二不比解决方案一快?

我的理解是,由于所有分区都是一次性查询的,并且数据分布在节点之间,因此速度应该更快。
如果我错了,请纠正我。

最佳答案

首先,您应该研究 joinWithCassandraTable,它应该是您正在做的事情的一个更简单的 api(假设您有足够的分区来让它值得)。这个 api 使用分区键的 RDD,并将它们从 C* 中提取出来并分发它们。
这会让你做类似的事情

sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")
如果你愿意,你也可以做一个 repartitionByCassandraReplica但这很可能对非常小的请求没有好处。您必须对数据进行基准测试才能确定。
如果您只想执行原始驱动程序命令,您可以执行以下操作
val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it =>
cc.withSessionDo{ session =>
session.execute( Some query )
}
}
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12
代码示例演练:
现在让我们快速浏览一下您的代码示例
首先,我们只检索 60 个 C* 分区。对于这个用例,与从 C* 检索分区所需的时间相比,我们很可能会被设置和取消任务所需的时间所支配。
在这两种解决方案中,由于 Spark 所做的惰性求值,您基本上都在做同样的事情。驱动程序创建一个图,该图首先创建 60 个 RDD,每个 RDD 带有一条从 C* 检索单个分区的惰性指令。 (每个分区 1 个 RDD 是不好的,RDD 旨在存储大量数据,因此最终会产生相当多的开销)。尽管这 60 个 RDD 是用不同的模式制作的,但这真的无关紧要,因为在您调用 collect 之前,它们的实际评估不会发生。驱动程序继续设置新的 RDD 和转换。
在我们点击 collect 之前,绝对不会从 C* 中检索数据,并且由于我们使用基本相同的依赖图为您在上面发布的两种解决方案点击了 collect,因此在这两种情况下都会发生完全(或非常相似)的事情。所有 60 个 RDD 都将按照依赖关系图的指定进行解析。这将并行发生,但同样会非常耗费开销。
为了避免这种情况,请查看我上面使用单个 RDD 提取所有信息的示例。

关于scala - 如何在 Spark 中读取 cassandra 分区时获得良好的性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30686612/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com