- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是 Catalyst 特定的问题
在应用我的规则之前,请参阅下面的 queryExecution.optimizedPlan。
01 Project [x#9, p#10, q#11, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93]
02 +- InMemoryRelation [x#9, p#10, q#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
03 : +- *SerializeFromObject [assertnotnull(input[0, eic.R0, true], top level non-flat input object).x AS x#9, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).p) AS p#10, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).q) AS q#11]
04 : +- *MapElements <function1>, obj#8: eic.R0
05 : +- *DeserializeToObject newInstance(class java.lang.Long), obj#7: java.lang.Long
05 : +- *Range (0, 3, step=1, splits=Some(2))
在第 01 行中,我需要这样交换 udfA 和 udfB 的位置:
01 Project [x#9, p#10, q#11, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28]
当我尝试通过 Catalyst 优化更改 SparkSQL 中投影操作中的属性顺序时,查询结果被修改为无效值。也许我没有做所有需要做的事情。我只是更改 fields 参数中 NamedExpression 对象的顺序:
object ReorderColumnsOnProjectOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case Project(fields: Seq[NamedExpression], child) =>
if (checkCondition(fields)) Project(newFieldsObject(fields), child) else Project(fields, child)
case _ => plan
}
private def newFieldsObject(fields: Seq[NamedExpression]): Seq[NamedExpression] = {
// compare UDFs computation cost and return the new NamedExpression list
. . .
}
private def checkCondition(fields: Seq[NamedExpression]): Boolean = {
// compare UDFs computation cost and return Boolean for decision off change order on field list.
. . .
}
. . .
}
注意:我在 extraOptimizations
SparkSQL 对象上添加我的规则:
spark.experimental.extraOptimizations = Seq(ReorderColumnsOnProjectOptimizationRule)
任何建议都会有很大帮助。
编辑 1
顺便说一下,我在 Databricks 上创建了一个笔记本用于测试目的。 See this link for more detail
对第 60 行进行注释,调用优化并发生错误。
. . .
58 // Do UDF with less cost before, so I need change the fields order
59 myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)
60 false
61 }
我错过了什么?
编辑2
考虑以下来自编译器优化的代码,它几乎是类似的:
if ( really_slow_test(with,plenty,of,parameters)
&& slower_test(with,some,parameters)
&& fast_test // with no parameters
)
{
...then code...
}
此代码首先计算一个昂贵的函数,然后在成功后继续计算表达式的其余部分。但即使第一个测试失败并且评估是快捷的,也会有显着的性能损失,因为总是评估胖的 real_slow_test(...) 。在保持程序正确性的同时,可以按如下方式重新排列表达式:
if ( fast_test
&& slower_test(with,some,parameters)
&& (really_slow_test(with,plenty,of,parameters))
{
...then code...
}
我的目标是首先运行最快的 UDF
最佳答案
如stefanobaghino说分析器的模式在分析后被缓存,优化器不应该改变它。
如果您使用 Spark 2.2,您可以利用 SPARK-18127并在分析器中应用规则。
如果您运行这个虚拟应用程序
package panos.bletsos
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.SparkSessionExtensions
case class ReorderColumnsOnProjectOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case p: Project => {
val fields = p.projectList
if (checkConditions(fields, p.child)) {
val modifiedFieldsObject = optimizePlan(fields, p.child, plan)
val projectUpdated = p.copy(modifiedFieldsObject, p.child)
projectUpdated
} else {
p
}
}
}
private def checkConditions(fields: Seq[NamedExpression], child: LogicalPlan): Boolean = {
// compare UDFs computation cost and return Boolean
val needsOptimization = listHaveTwoUDFsEnabledForOptimization(fields)
if (needsOptimization) println(fields.mkString(" | "))
needsOptimization
}
private def listHaveTwoUDFsEnabledForOptimization(fields: Seq[NamedExpression]): Boolean = {
// a simple priority order based on UDF name suffix
val myPriorityList = fields.map((e) => {
if (e.name.toString().startsWith("udf")) {
Integer.parseInt(e.name.toString().split("_")(1))
} else {
0
}
}).filter(e => e > 0)
// Do UDF with less cost before, so I need change the fields order
myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)
}
private def optimizePlan(fields: Seq[NamedExpression],
child: LogicalPlan,
plan: LogicalPlan): Seq[NamedExpression] = {
// change order on field list. Return LogicalPlan modified
val myListWithUDF = fields.filter((e) => e.name.toString().startsWith("udf"))
if (myListWithUDF.size != 2) {
throw new UnsupportedOperationException(
s"The size of UDF list have ${myListWithUDF.size} elements.")
}
val myModifiedList: Seq[NamedExpression] = Seq(myListWithUDF(1), myListWithUDF(0))
val myListWithoutUDF = fields.filter((e) => !e.name.toString().startsWith("udf"))
val modifiedFielsObject = getFieldsReordered(myListWithoutUDF, myModifiedList)
val msg = "•••• optimizePlan called : " + fields.size + " columns on Project.\n" +
"•••• fields: " + fields.mkString(" | ") + "\n" +
"•••• UDFs to reorder:\n" + myListWithUDF.mkString(" | ") + "\n" +
"•••• field list Without UDF: " + myListWithoutUDF.mkString(" | ") + "\n" +
"•••• modifiedFielsObject: " + modifiedFielsObject.mkString(" | ") + "\n"
modifiedFielsObject
}
private def getFieldsReordered(fieldsWithoutUDFs: Seq[NamedExpression],
fieldsWithUDFs: Seq[NamedExpression]): Seq[NamedExpression] = {
fieldsWithoutUDFs.union(fieldsWithUDFs)
}
}
case class R0(x: Int,
p: Option[Int] = Some((new scala.util.Random).nextInt(999)),
q: Option[Int] = Some((new scala.util.Random).nextInt(999))
)
object App {
def main(args : Array[String]) {
type ExtensionsBuilder = SparkSessionExtensions => Unit
// inject the rule here
val f: ExtensionsBuilder = { e =>
e.injectResolutionRule(ReorderColumnsOnProjectOptimizationRule)
}
val spark = SparkSession
.builder()
.withExtensions(f)
.getOrCreate()
def createDsR0(spark: SparkSession): Dataset[R0] = {
import spark.implicits._
val ds = spark.range(3)
val xdsR0 = ds.map((i) => {
R0(i.intValue() + 1)
})
// IMPORTANT: The cache here is mandatory
xdsR0.cache()
}
val dsR0 = createDsR0(spark)
val udfA_99 = (p: Int) => Math.cos(p * p) // higher cost Function
val udfB_10 = (q: Int) => q + 1 // lower cost Function
println("*** I' going to register my UDF ***")
spark.udf.register("myUdfA", udfA_99)
spark.udf.register("myUdfB", udfB_10)
val dsR1 = {
val ret1DS = dsR0.selectExpr("x", "p", "q", "myUdfA(p) as udfA_99")
val result = ret1DS.cache()
dsR0.show()
result.show()
result
}
val dsR2 = {
val ret2DS = dsR1.selectExpr("x", "p", "q", "udfA_99", "myUdfB(p) as udfB_10")
val result = ret2DS.cache()
dsR0.show()
dsR1.show()
result.show()
result
}
}
}
它将打印
+---+---+---+-------+-------------------+
| x| p| q|udfB_10| udfA_99|
+---+---+---+-------+-------------------+
| 1|392|746| 393|-0.7508388993643841|
| 2|778|582| 779| 0.9310990915956336|
| 3|661| 34| 662| 0.6523545972748773|
+---+---+---+-------+-------------------+
关于scala - 如何更改 Apache SparkSQL `Project` 运算符中的属性顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48612353/
Hive 外部表指向 S3 上的文件,ddl 包括按 eod 子句分区。一个文件夹下有 5 个子文件夹,每个子文件夹下面都有一个文件,用于不同的 partition_date。即 eod=201806
SparkSQL / DataFrames HBase-Spark连接器(在HBase-Spark模块中)利用Spark-1.2.0中引入的DataSource API (SPARK-3247),弥
我将 RDD[myClass] 转换为数据帧,然后将其注册为 SQL表 my_rdd.toDF().registerTempTable("my_rdd") 该表是可调用的,可以使用以下命令进行演示 %
我在这看到 DataBricks post ,SparkSql 中支持窗口函数,特别是我正在尝试使用 lag() 窗口函数。 我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行
我正在为 hive 使用远程 mysql 元存储。当我运行 hive 客户端时,它运行完美。但是当我尝试通过 spark-shell 或 spark-submit 使用 spark-sql 时,我无法
我有一个 Spark 作业,它正在将数据从 CSV 文件加载到 MySQL 数据库中。 一切正常,但最近我注意到 Spark 在插入阶段打开了许多连接(300 多个连接)。感觉就像每个插入语句都打开一
这段代码来自 Spark Programming Guide , # The result of loading a parquet file is also a DataFrame. parquet
我有一个 Scala spark DataFrame: df.select($"row_id", $"array_of_data").show +----------+----------------
我设计了以下函数来处理任何数字类型的数组: def array_sum[T](item:Traversable[T])(implicit n:Numeric[T]) = item.sum // Reg
我通过 df.saveAsTable 创建了一个持久表 当我运行以下查询时,我会收到这些结果 spark.sql("""SELECT * FROM mytable """).show() 我可以查看
我想通过 sparksql 删除一个配置单元表。 在安装了 hadoop 2.6、hive 2.0、spark 1.6 和 spark 2.0 的集群中。我在两个版本的 pyspark shell 和
我有一个要求,我需要计算 SparkSQL 中 Hive 表的重复行数。 from pyspark import SparkContext, SparkConf from pyspark.sql im
我有一个连接到 Postgres 数据库的 SparkSQL 的非常简单的设置,我正在尝试从一个表中获取一个 DataFrame,该 DataFrame 具有 X 个分区(假设为 2)。代码如下: M
有什么方法可以在 sparksql 中实现存储过程或函数等 sql 功能? 我知道 hbase 中的 hpl sql 和协处理器。但是想知道 spark 中是否有类似的东西。 最佳答案 您可以考虑使用
我正在使用 cloudera vm 10.0,spark 版本为 1.6。 登录 pyspark 控制台后,我正在尝试以下语句从配置单元中获取数据 sqlContext.sql("select * f
我想用 Spark SQL 2.0 执行以下查询 SELECT a.id as id, (SELECT SUM(b.points) FROM tableB b WHERE b.id = a.i
我正在使用 spark sql 对我的数据集运行查询。查询的结果很小,但仍然是分区的。 我想合并生成的 DataFrame 并按列对行进行排序。我试过 DataFrame result = spark
任何人都直接在 HBase 表上使用 SparkSQL,就像在 Hive 表上使用 SparkSQL。 我是spark新手。请指导我如何连接hbase和spark。如何查询hbase表。 最佳答案 A
我正在尝试从 SparkSQL 表(S3 中的 Parquet )中有效地选择单个分区。但是,我看到 Spark 打开表中所有 Parquet 文件的证据,而不仅仅是那些通过过滤器的文件。对于具有大量
我尝试使用 SparkSQL (v.1.3.0) 访问 PostgreSQL 数据库。在这个数据库中,我有一个表 CREATE TABLE test ( id bigint, values dou
我是一名优秀的程序员,十分优秀!