gpt4 book ai didi

algorithm - 数量重新分配逻辑 - 具有外部数据集的 MapGroups

转载 作者:行者123 更新时间:2023-12-03 16:59:39 26 4
gpt4 key购买 nike

我正在研究一个复杂的逻辑,我需要将数量从一个数据集重新分配到另一个数据集。
在示例中,我们有 OwnerInvoice - 我们需要从 Invoice 中减去数量精确到 Owner匹配(在给定汽车的给定邮政编码处)。
减去的数量需要重新分配回出现同一辆车的另一个​​邮政编码。
复杂性发生在我们应该避免分发到邮政编码时,同一辆车出现在另一个 pcode 的 Invoice 表中。
最后,如果减法或重新分配产生负值,我们应该避免对给定的 Invoice 进行这种转换。 .
这是一个带有数字的示例
enter image description here
下面是代码版本,但不幸的是它没有按预期工作。更具体地说,我不知道如何跳过给定汽车的发票中多次出现的记录。
在第一个示例(红色)中,我不知道如何跳过记录 Owner(A, 888, 100)。

package playground

import org.apache.spark.sql.SparkSession


object basic extends App {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.getOrCreate()

import spark.implicits._

final case class Owner(car: String, pcode: String, qtty: Double)
final case class Invoice(car: String, pcode: String, qtty: Double)

val sc = spark.sparkContext

val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)

val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "444", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)

val owners = spark.createDataset(data)
val invoices = spark.createDataset(fleet)

val actual = owners
.joinWith(invoices, owners("Car") === invoices("Car"), joinType = "right")
.groupByKey(_._2)
.flatMapGroups {
case (invoice, group) =>
val subOwner: Vector[Owner] = group.toVector.map(_._1)
val householdToBeInvoiced: Vector[Owner] =
subOwner.filter(_.pcode == invoice.pcode)
val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
// negative compensation (remove the quantity from Invoice for the exact match)
val neg: Owner = householdToBeInvoiced.head
val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - invoice.qtty)

// positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
// invoice for the same car
val pos = subOwner.filter(s => s.pcode != invoice.pcode)
val totalQuantityOwner = pos.map(_.qtty).sum
val calculatedPos: Vector[Owner] =
pos.map(
c =>
c.copy(
qtty = c.qtty + invoice.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
)
)

(calculatedPos :+ calculatedNeg)
} else {
subOwner
}

modifiedOwner
}
}
此代码产生
+---+-----+------------------+
|car|pcode| qtty|
+---+-----+------------------+
| A| 888|116.66666666666667|
| A| 222|23.333333333333332|
| A| 444|58.333333333333336|
| A| 666| 65.0|
| C| 555|126.66666666666667|
| C| 666| 84.44444444444444|
| C| 444| 10.0|
| B| 555| -180.0|
| A| 222| 24.8|
| A| 444| 62.0|
| A| 666| 99.2|
| A| 888| 88.0|
+---+-----+------------------+
任何支持将不胜感激!谢谢

在对这个问题进行更多思考之后,我设法改进了代码,但我仍然无法使用迭代方法(使用先前的计算来计算下一个,例如获取红色记录的结果以生成蓝色记录等。 )
package playground

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}

object basic extends App {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val spark = SparkSession
.builder()
.appName("Spark Optimization Playground")
.master("local")
.getOrCreate()

import spark.implicits._

final case class Owner(car: String, pcode: String, qtty: Double)
final case class Invoice(car: String, pcode: String, qtty: Double)

val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)

val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "444", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)

val owners = spark.createDataset(data)
val invoices = spark.createDataset(fleet)

val secondFleets = invoices.map(identity)

val fleetPerCar =
invoices
.joinWith(secondFleets, invoices("car") === secondFleets("car"), "inner")
.groupByKey(_._1)
.flatMapGroups {
case (value, iter) ⇒ Iterator((value, iter.toArray))
}

val gb
: KeyValueGroupedDataset[(Invoice, Array[(Invoice, Invoice)]),
(Owner, (Invoice, Array[(Invoice, Invoice)]))] =
owners
.joinWith(fleetPerCar, owners("car") === fleetPerCar("_1.car"), "right")
.groupByKey(_._2)

val x: Dataset[Owner] =
gb.flatMapGroups {
case (fleet, group) =>
val subOwner: Vector[Owner] = group.toVector.map(_._1)
val householdToBeInvoiced: Vector[Owner] =
subOwner.filter(_.pcode == fleet._1.pcode)
val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
// negative compensation (remove the quantity from Invoice for the exact match)
val neg: Owner = householdToBeInvoiced.head
val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - fleet._1.qtty)

// positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
// invoice for the same car
val otherPCode =
fleet._2.filter(_._2.pcode != fleet._1.pcode).map(_._2.pcode)

val pos = subOwner.filter(
s => s.pcode != fleet._1.pcode && !otherPCode.contains(s.pcode)
)
val totalQuantityOwner = pos.map(_.qtty).sum + neg.qtty
val calculatedPos: Vector[Owner] =
pos.map(
c =>
c.copy(
qtty = c.qtty + fleet._1.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
)
)
// if pos or neg compensation produce negative quantity, skip the computation
val res = (calculatedPos :+ calculatedNeg)
if (res.exists(_.qtty < 0)) {
subOwner
} else {
res
}
} else {
subOwner
}

modifiedOwner
}
x.show()
}

最佳答案

第一个解决方案基于Spark DatasetsSparkSQL并提供预期的结果。
有很多方法可以配置这种方法,甚至考虑到性能问题,这可能会在稍后讨论。

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}

object basic {

val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.getOrCreate()

val sc = spark.sparkContext

case class Owner(car: String, pcode: String, qtty: Double)
case class Invoice(car: String, pcode: String, qtty: Double)

def main(args: Array[String]): Unit = {

val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)

val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "666", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)

val expected = Seq(
Owner("A", "666", 65),
Owner("B", "555", 20), // not redistributed because produce a negative value
Owner("A", "444", 69.29),
Owner("A", "222", 27.71),
Owner("C", "444", 21.43),
Owner("C", "666", 70),
Owner("C", "555", 128.57),
Owner("A", "888", 88)
)

Logger.getRootLogger.setLevel(Level.ERROR)

try {
import spark.implicits._

val owners = spark.createDataset(data).as[Owner].cache()
val invoices = spark.createDataset(fleet).as[Invoice].cache()

owners.createOrReplaceTempView("owners")
invoices.createOrReplaceTempView("invoices")

/**
* this part fetch car and pcode from owner with the substracted quantity from invoice
*/
val p1 = spark.sql(
"""SELECT i.car,i.pcode,
|CASE WHEN (o.qtty - i.qtty) < 0 THEN o.qtty ELSE (o.qtty - i.qtty) END AS qtty,
|CASE WHEN (o.qtty - i.qtty) < 0 THEN 0 ELSE i.qtty END AS to_distribute
|FROM owners o
|INNER JOIN invoices i ON(i.car = o.car AND i.pcode = o.pcode)
|""".stripMargin)
.cache()
p1.createOrReplaceTempView("p1")

/**
* this part fetch all the car and pcode that we have to redistribute their quantity
*/
val p2 = spark.sql(
"""SELECT o.car, o.pcode, o.qtty
|FROM owners o
|LEFT OUTER JOIN invoices i ON(i.car = o.car AND i.pcode = o.pcode)
|WHERE i.car IS NULL
|""".stripMargin)
.cache()
p2.createOrReplaceTempView("p2")

/**
* this part fetch the quantity to distribute
*/
val distribute = spark.sql(
"""
|SELECT car, SUM(to_distribute) AS to_distribute
|FROM p1
|GROUP BY car
|""".stripMargin)
.cache()
distribute.createOrReplaceTempView("distribute")

/**
* this part fetch the proportion to distribute proportionally
*/
val proportion = spark.sql(
"""
|SELECT car, SUM(qtty) AS proportion
|FROM p2
|GROUP BY car
|""".stripMargin)
.cache()
proportion.createOrReplaceTempView("proportion")


/**
* this part join p1 and p2 with the distribution calculated
*/
val result = spark.sql(
"""
|SELECT p2.car, p2.pcode, ROUND(((to_distribute / proportion) * qtty) + qtty, 2) AS qtty
|FROM p2
|JOIN distribute d ON(p2.car = d.car)
|JOIN proportion p ON(d.car = p.car)
|UNION ALL
|SELECT car, pcode, qtty
|FROM p1
|""".stripMargin)

result.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |555 |128.57|
|A |666 |65.0 |
|B |555 |20.0 |
|C |666 |70.0 |
|A |888 |88.0 |
+---+-----+------+
*/

expected
.toDF("car","pcode","qtty")
.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |666 |65.0 |
|B |555 |20.0 |
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |666 |70.0 |
|C |555 |128.57|
|A |888 |88.0 |
+---+-----+------+
*/

} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
使用 API 数据集
具有相同结果的此问题的另一种方法是使用 Datasets和它的伟大 API ,作为一个例子:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

object basic2 {

val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.getOrCreate()

val sc = spark.sparkContext

final case class Owner(car: String, pcode: String, o_qtty: Double)
final case class Invoice(car: String, pcode: String, i_qtty: Double)

def main(args: Array[String]): Unit = {

val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)

val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "666", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)

val expected = Seq(
Owner("A", "666", 65),
Owner("B", "555", 20), // not redistributed because produce a negative value
Owner("A", "444", 69.29),
Owner("A", "222", 27.71),
Owner("C", "444", 21.43),
Owner("C", "666", 70),
Owner("C", "555", 128.57),
Owner("A", "888", 88)
)

Logger.getRootLogger.setLevel(Level.ERROR)

try {
import spark.implicits._

val owners = spark.createDataset(data)
.as[Owner]
.cache()

val invoices = spark.createDataset(fleet)
.as[Invoice]
.cache()

val p1 = owners
.join(invoices,Seq("car","pcode"),"inner")
.selectExpr("car","pcode","IF(o_qtty-i_qtty < 0,o_qtty,o_qtty - i_qtty) AS qtty","IF(o_qtty-i_qtty < 0,0,i_qtty) AS to_distribute")
.persist(StorageLevel.MEMORY_ONLY)

val p2 = owners
.join(invoices,Seq("car","pcode"),"left_outer")
.filter(row => row.anyNull == true)
.drop(col("i_qtty"))
.withColumnRenamed("o_qtty","qtty")
.persist(StorageLevel.MEMORY_ONLY)

val distribute = p1
.groupBy(col("car"))
.agg(sum(col("to_distribute")).as("to_distribute"))
.persist(StorageLevel.MEMORY_ONLY)

val proportion = p2
.groupBy(col("car"))
.agg(sum(col("qtty")).as("proportion"))
.persist(StorageLevel.MEMORY_ONLY)

val result = p2
.join(distribute, "car")
.join(proportion, "car")
.withColumn("qtty",round( ((col("to_distribute") / col("proportion")) * col("qtty")) + col("qtty"), 2 ))
.drop("to_distribute","proportion")
.union(p1.drop("to_distribute"))

result.show()
/*
+---+-----+------+
|car|pcode| qtty|
+---+-----+------+
| A| 444| 69.29|
| A| 222| 27.71|
| C| 444| 21.43|
| C| 555|128.57|
| A| 666| 65.0|
| B| 555| 20.0|
| C| 666| 70.0|
| A| 888| 88.0|
+---+-----+------+
*/

expected
.toDF("car","pcode","qtty")
.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |666 |65.0 |
|B |555 |20.0 |
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |666 |70.0 |
|C |555 |128.57|
|A |888 |88.0 |
+---+-----+------+
*/

} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
关于性能和调优的一些一般注意事项。
它始终取决于您的特定用例,但总的来说,首先,如果您可以过滤和清理数据,您会看到一些改进。
使用高级声明性 API 的一个重点是将自己与低级实现细节隔离开来。
优化是 Catalyst Optimizer 的工作.
这是一个复杂的引擎,我真的怀疑有人可以在不深入研究其内部结构的情况下轻松改进它。
默认分区数
属性(property): spark.sql.shuffle.partitions , 正确设置。
默认 Spark SQL用途 spark.sql.shuffle.partitions数量
聚合和连接的分区,即默认为 200。
这通常会导致分区爆炸,而没有任何影响
查询的性能,因为这 200 个任务(每个分区)有
在你得到结果之前,一切都要开始和结束。
想想您的查询真正需要多少个分区。 Spark RDD 的每个分区只能运行 1 个并发任务,最多为集群中的核心数。
所以如果你有一个有 50 个核心的集群,你希望你的 RDD 至少有 50 个分区。
至于选择“好的”分区数量,您通常至少需要与并行执行程序的数量一样多。
您可以通过调用获取此计算值 sc.defaultParallelism或通过以下方式检查 RDD 分区编号 df.rdd.partitions.size 重新分区 :增加分区,过滤器增加并行度后重新平衡分区 repartition(numPartitions: Int) 合并 :在输出到 HDFS/外部之前减少分区而无需 shuffle 合并 coalesce(numPartitions: Int, suffle: Boolean = false)您可以点击此链接: Managing Spark Partitions with Coalesce and Repartition
缓存数据以避免重新计算 : dataFrame.cache() 分析器 — 逻辑查询计划分析器
Analyzer 是 Spark SQL 中的逻辑查询计划分析器
验证 Unresolved 逻辑计划并将其转换为已分析的逻辑计划。
您可以使用解释访问数据集的分析逻辑计划(使用
启用扩展标志)
dataframe.explain(extended = true)
有关更多性能选项,请参阅文档:
Performance Tuning
有很多调整 Spark 进程的可能性,但这始终取决于您的用例。
批处理或流处理?数据帧还是普通的 RDD? hive 还是非 hive ?混洗数据与否?等...
我强烈推荐你 The Internals of Spark SQL通过 Jacek Laskowski。
最后,您将不得不使用不同的值和基准进行一些试验,以查看使用数据样本进行该过程所需的时间。
  val start = System.nanoTime()

// my process

val end = System.nanoTime()

val time = end - start
println(s"My App takes: $time")
希望这可以帮助。

关于algorithm - 数量重新分配逻辑 - 具有外部数据集的 MapGroups,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62708493/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com