gpt4 book ai didi

java - Spark 不推送过滤器(PushedFilters 数组为空)

转载 作者:行者123 更新时间:2023-12-04 00:23:03 28 4
gpt4 key购买 nike

简介

我注意到我们项目中的推送过滤器都不起作用。它解释了为什么执行时间会受到影响,因为它读取了数百万次读取,而本应将其减少到几千次。为了调试这个问题,我编写了一个小测试来读取 CSV 文件、过滤内容(PushDown Filter)并返回结果。

它不适用于 CSV,因此我尝试读取 Parquet 文件。它们都不起作用。

数据

people.csv 文件具有以下结构:

first_name,last_name,city  // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row

注意:parquet 文件具有相同的结构

读取 CSV 文件

为了重现这个问题,我编写了一个读取 csv 文件的最小代码,并且应该只返回过滤后的数据。

读取csv文件并打印实物图:

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);

物理计划:

+----------+---------+----+
|first_name|last_name|city|
+----------+---------+----+
|FirstName1|LastName1|Bern|
+----------+---------+----+

== Parsed Logical Plan == Relation[first_name#10,last_name#11,city#12] csv

== Analyzed Logical Plan == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv

== Optimized Logical Plan == Relation[first_name#10,last_name#11,city#12] csv

== Physical Plan == *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct

我已经用 parquet 文件进行了测试,不幸的是结果是一样的。

我们可以注意到的是:

  • PushedFilters 为空,我希望过滤器包含谓词。
  • 返回的结果仍然是正确的。

我的问题是:为什么这个 PushedFilters 是空的?

注意:

  • Spark 版本:2.4.3
  • 文件系统:ext4(和集群上的 HDFS,都没有工作)

最佳答案

您正在对第一个数据集调用解释,即只有读取的数据集。试试类似的东西(对不起,我只有 Scala 环境可用):

val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))

f.explain(true)

f.show()

另外,使用类型化数据集 API 时要小心,因为 this .不过不应该是你的情况。

关于java - Spark 不推送过滤器(PushedFilters 数组为空),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59139431/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com