- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在研究一个复杂的逻辑,我需要将数量从一个数据集重新分配到另一个数据集。
在示例中,我们有 Owner
和 Invoice
- 我们需要从 Invoice
中减去数量精确到 Owner
匹配(在给定汽车的给定邮政编码处)。
减去的数量需要重新分配回出现同一辆车的另一个邮政编码。
复杂性发生在我们应该避免分发到邮政编码时,同一辆车出现在另一个 pcode 的 Invoice 表中。
最后,如果减法或重新分配产生负值,我们应该避免对给定的 Invoice
进行这种转换。 .
这是一个带有数字的示例
下面是代码版本,但不幸的是它没有按预期工作。更具体地说,我不知道如何跳过给定汽车的发票中多次出现的记录。
在第一个示例(红色)中,我不知道如何跳过记录 Owner(A, 888, 100)。
package playground
import org.apache.spark.sql.SparkSession
object basic extends App {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.getOrCreate()
import spark.implicits._
final case class Owner(car: String, pcode: String, qtty: Double)
final case class Invoice(car: String, pcode: String, qtty: Double)
val sc = spark.sparkContext
val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)
val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "444", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)
val owners = spark.createDataset(data)
val invoices = spark.createDataset(fleet)
val actual = owners
.joinWith(invoices, owners("Car") === invoices("Car"), joinType = "right")
.groupByKey(_._2)
.flatMapGroups {
case (invoice, group) =>
val subOwner: Vector[Owner] = group.toVector.map(_._1)
val householdToBeInvoiced: Vector[Owner] =
subOwner.filter(_.pcode == invoice.pcode)
val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
// negative compensation (remove the quantity from Invoice for the exact match)
val neg: Owner = householdToBeInvoiced.head
val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - invoice.qtty)
// positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
// invoice for the same car
val pos = subOwner.filter(s => s.pcode != invoice.pcode)
val totalQuantityOwner = pos.map(_.qtty).sum
val calculatedPos: Vector[Owner] =
pos.map(
c =>
c.copy(
qtty = c.qtty + invoice.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
)
)
(calculatedPos :+ calculatedNeg)
} else {
subOwner
}
modifiedOwner
}
}
此代码产生
+---+-----+------------------+
|car|pcode| qtty|
+---+-----+------------------+
| A| 888|116.66666666666667|
| A| 222|23.333333333333332|
| A| 444|58.333333333333336|
| A| 666| 65.0|
| C| 555|126.66666666666667|
| C| 666| 84.44444444444444|
| C| 444| 10.0|
| B| 555| -180.0|
| A| 222| 24.8|
| A| 444| 62.0|
| A| 666| 99.2|
| A| 888| 88.0|
+---+-----+------------------+
任何支持将不胜感激!谢谢
package playground
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}
object basic extends App {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val spark = SparkSession
.builder()
.appName("Spark Optimization Playground")
.master("local")
.getOrCreate()
import spark.implicits._
final case class Owner(car: String, pcode: String, qtty: Double)
final case class Invoice(car: String, pcode: String, qtty: Double)
val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)
val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "444", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)
val owners = spark.createDataset(data)
val invoices = spark.createDataset(fleet)
val secondFleets = invoices.map(identity)
val fleetPerCar =
invoices
.joinWith(secondFleets, invoices("car") === secondFleets("car"), "inner")
.groupByKey(_._1)
.flatMapGroups {
case (value, iter) ⇒ Iterator((value, iter.toArray))
}
val gb
: KeyValueGroupedDataset[(Invoice, Array[(Invoice, Invoice)]),
(Owner, (Invoice, Array[(Invoice, Invoice)]))] =
owners
.joinWith(fleetPerCar, owners("car") === fleetPerCar("_1.car"), "right")
.groupByKey(_._2)
val x: Dataset[Owner] =
gb.flatMapGroups {
case (fleet, group) =>
val subOwner: Vector[Owner] = group.toVector.map(_._1)
val householdToBeInvoiced: Vector[Owner] =
subOwner.filter(_.pcode == fleet._1.pcode)
val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
// negative compensation (remove the quantity from Invoice for the exact match)
val neg: Owner = householdToBeInvoiced.head
val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - fleet._1.qtty)
// positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
// invoice for the same car
val otherPCode =
fleet._2.filter(_._2.pcode != fleet._1.pcode).map(_._2.pcode)
val pos = subOwner.filter(
s => s.pcode != fleet._1.pcode && !otherPCode.contains(s.pcode)
)
val totalQuantityOwner = pos.map(_.qtty).sum + neg.qtty
val calculatedPos: Vector[Owner] =
pos.map(
c =>
c.copy(
qtty = c.qtty + fleet._1.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
)
)
// if pos or neg compensation produce negative quantity, skip the computation
val res = (calculatedPos :+ calculatedNeg)
if (res.exists(_.qtty < 0)) {
subOwner
} else {
res
}
} else {
subOwner
}
modifiedOwner
}
x.show()
}
最佳答案
第一个解决方案基于Spark Datasets
和 SparkSQL
并提供预期的结果。
有很多方法可以配置这种方法,甚至考虑到性能问题,这可能会在稍后讨论。
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
object basic {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.getOrCreate()
val sc = spark.sparkContext
case class Owner(car: String, pcode: String, qtty: Double)
case class Invoice(car: String, pcode: String, qtty: Double)
def main(args: Array[String]): Unit = {
val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)
val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "666", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)
val expected = Seq(
Owner("A", "666", 65),
Owner("B", "555", 20), // not redistributed because produce a negative value
Owner("A", "444", 69.29),
Owner("A", "222", 27.71),
Owner("C", "444", 21.43),
Owner("C", "666", 70),
Owner("C", "555", 128.57),
Owner("A", "888", 88)
)
Logger.getRootLogger.setLevel(Level.ERROR)
try {
import spark.implicits._
val owners = spark.createDataset(data).as[Owner].cache()
val invoices = spark.createDataset(fleet).as[Invoice].cache()
owners.createOrReplaceTempView("owners")
invoices.createOrReplaceTempView("invoices")
/**
* this part fetch car and pcode from owner with the substracted quantity from invoice
*/
val p1 = spark.sql(
"""SELECT i.car,i.pcode,
|CASE WHEN (o.qtty - i.qtty) < 0 THEN o.qtty ELSE (o.qtty - i.qtty) END AS qtty,
|CASE WHEN (o.qtty - i.qtty) < 0 THEN 0 ELSE i.qtty END AS to_distribute
|FROM owners o
|INNER JOIN invoices i ON(i.car = o.car AND i.pcode = o.pcode)
|""".stripMargin)
.cache()
p1.createOrReplaceTempView("p1")
/**
* this part fetch all the car and pcode that we have to redistribute their quantity
*/
val p2 = spark.sql(
"""SELECT o.car, o.pcode, o.qtty
|FROM owners o
|LEFT OUTER JOIN invoices i ON(i.car = o.car AND i.pcode = o.pcode)
|WHERE i.car IS NULL
|""".stripMargin)
.cache()
p2.createOrReplaceTempView("p2")
/**
* this part fetch the quantity to distribute
*/
val distribute = spark.sql(
"""
|SELECT car, SUM(to_distribute) AS to_distribute
|FROM p1
|GROUP BY car
|""".stripMargin)
.cache()
distribute.createOrReplaceTempView("distribute")
/**
* this part fetch the proportion to distribute proportionally
*/
val proportion = spark.sql(
"""
|SELECT car, SUM(qtty) AS proportion
|FROM p2
|GROUP BY car
|""".stripMargin)
.cache()
proportion.createOrReplaceTempView("proportion")
/**
* this part join p1 and p2 with the distribution calculated
*/
val result = spark.sql(
"""
|SELECT p2.car, p2.pcode, ROUND(((to_distribute / proportion) * qtty) + qtty, 2) AS qtty
|FROM p2
|JOIN distribute d ON(p2.car = d.car)
|JOIN proportion p ON(d.car = p.car)
|UNION ALL
|SELECT car, pcode, qtty
|FROM p1
|""".stripMargin)
result.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |555 |128.57|
|A |666 |65.0 |
|B |555 |20.0 |
|C |666 |70.0 |
|A |888 |88.0 |
+---+-----+------+
*/
expected
.toDF("car","pcode","qtty")
.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |666 |65.0 |
|B |555 |20.0 |
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |666 |70.0 |
|C |555 |128.57|
|A |888 |88.0 |
+---+-----+------+
*/
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
使用 API 数据集
Datasets
和它的伟大
API
,作为一个例子:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
object basic2 {
val spark = SparkSession
.builder()
.appName("Sample app")
.master("local")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.getOrCreate()
val sc = spark.sparkContext
final case class Owner(car: String, pcode: String, o_qtty: Double)
final case class Invoice(car: String, pcode: String, i_qtty: Double)
def main(args: Array[String]): Unit = {
val data = Seq(
Owner("A", "666", 80),
Owner("B", "555", 20),
Owner("A", "444", 50),
Owner("A", "222", 20),
Owner("C", "444", 20),
Owner("C", "666", 80),
Owner("C", "555", 120),
Owner("A", "888", 100)
)
val fleet = Seq(
Invoice("A", "666", 15),
Invoice("C", "666", 10),
Invoice("A", "888", 12),
Invoice("B", "555", 200)
)
val expected = Seq(
Owner("A", "666", 65),
Owner("B", "555", 20), // not redistributed because produce a negative value
Owner("A", "444", 69.29),
Owner("A", "222", 27.71),
Owner("C", "444", 21.43),
Owner("C", "666", 70),
Owner("C", "555", 128.57),
Owner("A", "888", 88)
)
Logger.getRootLogger.setLevel(Level.ERROR)
try {
import spark.implicits._
val owners = spark.createDataset(data)
.as[Owner]
.cache()
val invoices = spark.createDataset(fleet)
.as[Invoice]
.cache()
val p1 = owners
.join(invoices,Seq("car","pcode"),"inner")
.selectExpr("car","pcode","IF(o_qtty-i_qtty < 0,o_qtty,o_qtty - i_qtty) AS qtty","IF(o_qtty-i_qtty < 0,0,i_qtty) AS to_distribute")
.persist(StorageLevel.MEMORY_ONLY)
val p2 = owners
.join(invoices,Seq("car","pcode"),"left_outer")
.filter(row => row.anyNull == true)
.drop(col("i_qtty"))
.withColumnRenamed("o_qtty","qtty")
.persist(StorageLevel.MEMORY_ONLY)
val distribute = p1
.groupBy(col("car"))
.agg(sum(col("to_distribute")).as("to_distribute"))
.persist(StorageLevel.MEMORY_ONLY)
val proportion = p2
.groupBy(col("car"))
.agg(sum(col("qtty")).as("proportion"))
.persist(StorageLevel.MEMORY_ONLY)
val result = p2
.join(distribute, "car")
.join(proportion, "car")
.withColumn("qtty",round( ((col("to_distribute") / col("proportion")) * col("qtty")) + col("qtty"), 2 ))
.drop("to_distribute","proportion")
.union(p1.drop("to_distribute"))
result.show()
/*
+---+-----+------+
|car|pcode| qtty|
+---+-----+------+
| A| 444| 69.29|
| A| 222| 27.71|
| C| 444| 21.43|
| C| 555|128.57|
| A| 666| 65.0|
| B| 555| 20.0|
| C| 666| 70.0|
| A| 888| 88.0|
+---+-----+------+
*/
expected
.toDF("car","pcode","qtty")
.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty |
+---+-----+------+
|A |666 |65.0 |
|B |555 |20.0 |
|A |444 |69.29 |
|A |222 |27.71 |
|C |444 |21.43 |
|C |666 |70.0 |
|C |555 |128.57|
|A |888 |88.0 |
+---+-----+------+
*/
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
关于性能和调优的一些一般注意事项。
spark.sql.shuffle.partitions
, 正确设置。
Spark SQL
用途
spark.sql.shuffle.partitions
数量
Spark
RDD
的每个分区只能运行 1 个并发任务,最多为集群中的核心数。
sc.defaultParallelism
或通过以下方式检查 RDD 分区编号
df.rdd.partitions.size
重新分区 :增加分区,过滤器增加并行度后重新平衡分区
repartition(numPartitions: Int)
合并 :在输出到 HDFS/外部之前减少分区而无需 shuffle 合并
coalesce(numPartitions: Int, suffle: Boolean = false)
您可以点击此链接:
Managing Spark Partitions with Coalesce and Repartition
dataFrame.cache()
分析器 — 逻辑查询计划分析器
dataframe.explain(extended = true)
有关更多性能选项,请参阅文档:
val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")
希望这可以帮助。
关于algorithm - 数量重新分配逻辑 - 具有外部数据集的 MapGroups,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62708493/
我有几个带有视频和图像的 Bootstrap slider 。在 slider 之外,我想要一个可以转到包含视频的幻灯片的按钮。包含视频的幻灯片的数量因 slider 而异。我想做的是获取幻灯片的数量
我在编写一个查询时遇到了一些问题。 我有一个由文件及其大小(以字节为单位)组成的表。它看起来像这样: FileUrl | FileSize ------------------ xyz.docx |
我有一个带 iframe 的网站和一个带另一个 iframe 的网站,所以它是一个 iframe 内嵌另一个 iframe(都在不同的域上)。那么有没有办法从父div或父主div的url(parent
以下表达式在 JavaScript 中给出了特殊的结果。 typeof (5 + "7") // Gives string typeof (5 - "7") // Gives number 如
我有一个名为“交易”的表,每当有人在我的网站上进行购买时,我都会在其中输入用户 ID、购买类型和金额。 我想向每个用户显示过去 7 天的这些统计信息。 目前,我有这个: $data = array()
我一整天都在努力寻找解决这一挑战的办法。 我有一张 table : id | amount | type | date |
我正在尝试在 10 个数据节点的集群中测试 Map reduce 程序的性能。在此过程中,我使用了 5 个 Reducers,然后是 10 个等等。 我在想增加 reducer 的数量也会使工作完成得
我正在使用 html5 输入 type="number"。我想监视此输入的变化,但是: 因为在支持它的浏览器中 它有旋转控件 我不能只监视 .keyup, 因为我不想等待它失去焦点,所以我不能只监视
我的购物车表格有问题。我创建了一个如下所示的表格: SQL Fiddle 我的问题是我希望能够选择产品 ID,并计算该产品 ID 在表格中重复的次数,以便我可以显示用户在购物车中拥有的商品数量。 寻找
我使用许多包含来显示我网站的一小部分。使用许多 include 是否合适,或者我应该减少它们(尽可能多)。包含函数要多花多少时间? 我的主页加载速度很慢。有什么方法可以让它加载更快。 (我的主页每天在
这个问题在这里已经有了答案: 关闭 10 年前。 Possible Duplicate: length of array in function argument 我的数组大小是5。例如: arrC
是否有标准的 Python 方法来处理 Python 中的物理单位/数量?我看到了来自不同领域(如物理学或神经科学)的不同模块特定解决方案。但我更愿意使用标准方法而不是“孤岛”解决方案,因为其他人应该
基本上就像标题所说的那样,有没有办法从 JavaScript 程序中查看事件循环中当前存在多少个 promise ?最好在 Deno 上。 最佳答案 Deno v1.26 添加了一个内部 API,可用
我只是想知道大型项目-比如说航空公司的预订系统,它可能有多少类/对象。 对象:客户,飞机,机场,路线,机票,订单。这就是我能想到的。该项目可能是成千上万的代码行,那么是否可能会有更多的类(执行与对象无
如果有办法限制Scala中未处理的 future 数量,我将无法提供资金。 例如下面的代码: import ExecutionContext.Implicits.global for (i
从昨天开始,我一直在努力做到这一点,尽管还没有运气。我找到了解决方案,在我想要完成的事情上总是有细微的差别。 我试图获得所有可能的组合,稍微像这样:combination_k ,但我也希望相同的项目与
我正在尝试更新 1500 个 QuickBooks Online 库存项目的现有数量。我可以从商店中提取 1500 种产品。 这个更新可以做吗?我看到手头没有数量的物品: https://develo
我想与工作人员一起扩展应用程序。 可能有 1 名 worker 或 100 名 worker ,我想无缝扩展它们。 这个想法是使用副本集。然而,由于特定领域的原因,扩展它们的适当方法是让每个工作人员知
Android Studio 有没有办法显示 XML 布局中存在的 View 数量?众所周知,布局应该包含 <=80 个 View ,因此超过此值就会出现此警告,因此告知数量会非常有帮助。 Layou
虽然编码时总是出现有关 IBOutlet 保留计数的相同问题:从 NIB 取消归档对象后保留计数?何时对 IBOutlet 使用 @property?设置时保留还是分配? Mac 和 iPhone 之
我是一名优秀的程序员,十分优秀!