gpt4 book ai didi

scala - 当条件与列一起使用时具有动态的数据框

转载 作者:行者123 更新时间:2023-12-05 03:05:49 24 4
gpt4 key购买 nike

我有一个 Scala 代码,用于在我的硬编码代码中使用 withcolumn 函数计算一个新列,看起来像这样

 combinedinputdf
.withColumn("AMGClassRule", when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "1")
//.when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "2")
.when(col("CoreSectorLevel1Code") === "Derivatives" && col("FAS157Flavor") === "TRSY" && col("DerivativeType") === "TROR", "3")
.when(col("InDefaultInd") === "Y" , "4")

这按预期工作。

但是我想在基于table或者csv执行的时候动态添加或者修改when条件

第二个表看起来像这样 Tables with rules

因此在运行时我可以将此表读入数据框或 Map 并遍历表的规则并为我的输出分配一个值

我该如何动态地执行此操作?

最佳答案

它可以通过以某种抽象的方式接近 when 条件的创建来动态实现。

为了使其更具可读性,让我们创建一个专用的包装器对象:

case class WhenCondition(valuesToCheck: Seq[(String, Any)], valueIfMatches: Any)

valuesToCheck 字段包含用于生成表达式的所有适用条件的元组序列。每个元组的第一个元素是列的名称,第二个是要匹配的值。例如:("CoreSectorLevel1Code", "Derivatives")

valueIfMatches 对应于传递给 when 的第二个参数:示例中的“1”、“3”或“4”。

我们需要一个从源表中读取条件并返回一系列 WhenCondition 实例的函数:

private def readConditions(args: Any*): Seq[WhenCondition] = {
??? // impl depends on how you read the source data
}

// for the example of the question, this function should return:
val conditions = Seq(
WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("CoreSectorLevel2Code", "Caps"), ("FAS157Flavor", "SPRD")), "1"),
WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("FAS157Flavor", "TRSY"), ("DerivativeType", "TROR")), "3"),
WhenCondition(Seq(("InDefaultInd", "Y")), "4")
)

更新

我们还需要一个生成 when 调用链接的函数:

private def chainWhen(chained: Column, remaining: Seq[(Column, Any)]): Column =
remaining match {
case Nil => chained
case head :: tail => chainWhen(chained.when(head._1, head._2), tail)
}

现在我们创建一个 dynamicWhen 函数来转换 WhenCondition 的序列:

private def dynamicWhen(parsedConditions: Seq[WhenCondition]): Option[Column] = {
// first, we transform a WhenCondition object into a tuple of args (Column, Any) for the target "when" function
val conditions = parsedConditions.map(whenCondition => {
val condition = whenCondition.valuesToCheck
.map(cond => col(cond._1) === cond._2)
.reduce(_ && _)
(condition, whenCondition.valueIfMatches)
})
// if there weren't any input conditions, we return None, otherwise we chain the transformation and wrap it into Some
conditions match {
case Nil => None
case head :: tail => Some(chainWhen(when(head._1, head._2), tail))
}
}

而原来的硬编码调用可以替换为

// produce the optional chained dynamic "whens".
val whenConditions = dynamicWhen(conditions)

// map it in a DataFrame with a new column or keep the original one, if there were no "whens".
val result = whenConditions.map(cond => df.withColumn("AMGClassRule", cond))
.getOrElse(df)

最后,使用虚假数据进行简短测试:

val df = Seq(
("Derivatives", "Caps", "SPRD", "x", "N"),
("Derivatives", "Caps", "SPRD", "TROR", "Y"),
("Derivatives", "Caps", "TRSY", "x", "Y"),
("Derivatives", "Caps", "TRSY", "TROR", "N"),
("Derivatives", "Caps", "zzzz", "x", "N")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "DerivativeType", "InDefaultInd")

val result = ... // transformations above
result.show(false)

+--------------------+--------------------+------------+--------------+------------+------------+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|DerivativeType|InDefaultInd|AMGClassRule|
+--------------------+--------------------+------------+--------------+------------+------------+
|Derivatives |Caps |SPRD |x |N |1 |
|Derivatives |Caps |SPRD |TROR |Y |1 |
|Derivatives |Caps |TRSY |x |Y |4 |
|Derivatives |Caps |TRSY |TROR |N |3 |
|Derivatives |Caps |zzzz |x |N |null |
+--------------------+--------------------+------------+--------------+------------+------------+

更新结束

更新 2

从 DataFrame 读取条件的示例。

假设我们将以下条件描述存储在 DataFrame 中:

val rulesDf = Seq(
("Derivatives", "%", "%", "16"),
("Derivatives", "Fx Options", "%", "17"),
("Derivatives", "Futures", "%", "48")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "rule")

rulesDf.show(false)

+--------------------+--------------------+------------+----+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|rule|
+--------------------+--------------------+------------+----+
|Derivatives |% |% |16 |
|Derivatives |Fx Options |% |17 |
|Derivatives |Futures |% |48 |
+--------------------+--------------------+------------+----+

我们可以使用以下方法读取并将它们转换为 WhenCondition 包装器:

private def readConditions(): Seq[WhenCondition] = {
val ruleColumnName = "rule"
val ruleColumnIndex = rulesDf.schema.fieldIndex(ruleColumnName)
val conditionColumns = rulesDf.schema.fieldNames.filter(_ != ruleColumnName).toSeq
rulesDf.rdd.map(row => {
val valuesToCheck = conditionColumns.map(colName => (colName, row.get(row.fieldIndex(colName))))
val rule = row(ruleColumnIndex)
WhenCondition(valuesToCheck, rule)
}).collect().toSeq
}

readConditions().foreach(println)

// outputs:
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,%), (FAS157Flavor,%)),16)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Fx Options), (FAS157Flavor,%)),17)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Futures), (FAS157Flavor,%)),48)

更新结束 2

关于scala - 当条件与列一起使用时具有动态的数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50279920/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com