gpt4 book ai didi

scala - 使用数据帧时如何为 Cassandra 下推限制谓词?

转载 作者:行者123 更新时间:2023-12-04 15:06:56 24 4
gpt4 key购买 nike

我有一张很大的 Cassandra table 。我只想从 Cassandra 加载 50 行。
以下代码

val ds = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
.load()
.where(col("aggregate_type") === "DAY")
.where(col("start_time") <= "2018-03-28")
.limit(50).collect()

以下代码从 where 推送两个谓词方法,但不限于一种。获取整个数据(100 万条记录)是真的吗?如果没有,为什么这段代码的运行时间和代码没有 limit(50)大致相同。

最佳答案

与 Spark Streaming 不同,Spark 本身正在尝试尽可能快地预加载尽可能多的数据,以便能够对其进行并行操作。所以预加载是懒惰的,但在触发时会贪婪。但是,有 cassandra-conector 特定的因素:

  • Automatic predicate pushdown有效 “where”子句。
  • 根据 this answer limit(...)未转换为 CQL LIMIT ,所以它的行为取决于在下载足够的数据后创建了多少获取作业。报价:

  • calling limit will allow Spark to skip reading some portions from the underlying DataSource. These would limit the amount of data read from Cassandra by canceling tasks from being executed.



    可能的解决方案:
  • DataFrame 限制可以通过限制 numPartitions 来部分管理和数据交换率 ( concurrent.reads and other params )。如果“在大多数情况下”对 n ~ 50 没问题,您还可以限制类似 where(dayIndex < 50 * factor * num_records) 的内容。 .
  • 有一种方法可以设置 CQL LIMIT通过 SparkPartitionLimit ,这会直接影响每个 CQL 请求 ( see more ) - 请记住,请求是每个 Spark 分区的。它在 CassandraRdd 中可用扩展类,因此您必须先转换为 RDD。

  • 代码类似于:
    filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

    这将附加 LIMIT $N到每个 CQL 请求。与 DataFrame 不同的限制,如果您指定 CassandraRDD limit多次( .limit(10).limit(20) ) - 只有最后一个将被附加。另外,我用了 n而不是 n / numPartitions + 1因为它(即使 Spark 和 Cassandra 分区是一对一的)每个分区返回的结果可能会更少。结果,我不得不添加 take(n)为了切 <= numPartitions * n下至 n .

    警告 仔细检查您的 where可翻译为 CQL(使用 explain()) - 否则 LIMIT将在过滤之前应用。

    附言您也可以尝试使用 sparkSession.sql(...) 直接运行 CQL ( like here ) 并比较结果。

    关于scala - 使用数据帧时如何为 Cassandra 下推限制谓词?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49534566/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com