gpt4 book ai didi

scala - 使用 spark-cassandra 连接器的 Cassandra 插入性能

转载 作者:行者123 更新时间:2023-12-04 23:44:21 25 4
gpt4 key购买 nike

我是 Spark 和 cassandra 的新手。我正在尝试使用 spark-cassandra 连接器插入 cassandra 表,如下所示:

import java.util.UUID

import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)

object SparkConnectorContext {
val conf = new SparkConf(true).setMaster("local")
.set("spark.cassandra.connection.host", "192.168.xxx.xxx")
val sc = new SparkContext(conf)
}
object TestRepo {
def insertList(list: List[TestEntity]) = {
SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
}
}
object TestApp extends App {
val start = System.currentTimeMillis()
TestRepo.insertList(Utility.generateRandomData())
val end = System.currentTimeMillis()
val timeDiff = end-start
println("Difference (in millis)= "+timeDiff)
}

当我使用上述方法插入时(包含 100 个实体的列表),它需要 300-1100 milliseconds .
我尝试使用 phantom 插入相同的数据图书馆。它只需要不到 20-40 milliseconds .

谁能告诉我为什么 Spark 连接器要花这么多时间插入?我在代码中做错了什么还是不建议使用 spark-cassandra connector用于插入操作?

最佳答案

看起来您在计时中包含了并行化操作。此外,由于您的 spark worker 运行在与 Cassandra 不同的机器上,因此 saveToCassandra 操作将通过网络进行写入。

尝试将您的系统配置为在 Cassandra 节点上运行 spark 工作程序。然后在单独的步骤中创建一个 RDD,并在其上调用 count() 之类的操作以将数据加载到内存中。此外,您可能希望对 RDD 进行persist() 或cache() 以确保它保留在内存中以进行测试。

然后只计算该缓存 RDD 的 saveToCassandra。

您可能还想查看 Cassandra 连接器提供的 repartitionByCassandraReplica 方法。这将根据写入需要转到哪个 Cassandra 节点来对 RDD 中的数据进行分区。通过这种方式,您可以利用数据局部性,并且经常避免通过网络进行写入和洗牌。

关于scala - 使用 spark-cassandra 连接器的 Cassandra 插入性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31941563/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com