作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一张很大的 Cassandra table 。我只想从 Cassandra 加载 50 行。
以下代码
val ds = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
.load()
.where(col("aggregate_type") === "DAY")
.where(col("start_time") <= "2018-03-28")
.limit(50).collect()
where
推送两个谓词方法,但不限于一种。获取整个数据(100 万条记录)是真的吗?如果没有,为什么这段代码的运行时间和代码没有
limit(50)
大致相同。
最佳答案
与 Spark Streaming 不同,Spark 本身正在尝试尽可能快地预加载尽可能多的数据,以便能够对其进行并行操作。所以预加载是懒惰的,但在触发时会贪婪。但是,有 cassandra-conector 特定的因素:
limit(...)
未转换为 CQL LIMIT
,所以它的行为取决于在下载足够的数据后创建了多少获取作业。报价:calling limit will allow Spark to skip reading some portions from the underlying DataSource. These would limit the amount of data read from Cassandra by canceling tasks from being executed.
numPartitions
来部分管理和数据交换率 ( concurrent.reads
and other params )。如果“在大多数情况下”对 n ~ 50 没问题,您还可以限制类似 where(dayIndex < 50 * factor * num_records)
的内容。 . LIMIT
通过 SparkPartitionLimit
,这会直接影响每个 CQL 请求 ( see more ) - 请记住,请求是每个 Spark 分区的。它在 CassandraRdd 中可用扩展类,因此您必须先转换为 RDD。 filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()
LIMIT $N
到每个 CQL 请求。与
DataFrame
不同的限制,如果您指定 CassandraRDD
limit
多次(
.limit(10).limit(20)
) - 只有最后一个将被附加。另外,我用了
n
而不是
n / numPartitions + 1
因为它(即使 Spark 和 Cassandra 分区是一对一的)每个分区返回的结果可能会更少。结果,我不得不添加
take(n)
为了切
<= numPartitions * n
下至
n
.
where
可翻译为 CQL(使用
explain()
) - 否则
LIMIT
将在过滤之前应用。
sparkSession.sql(...)
直接运行 CQL (
like here ) 并比较结果。
关于scala - 使用数据帧时如何为 Cassandra 下推限制谓词?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49534566/
我是一名优秀的程序员,十分优秀!