- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在多列上聚合一个数据框。我知道聚合所需的一切都在分区内——也就是说,不需要洗牌,因为聚合的所有数据都在分区内。
服用 example ,如果我有类似的东西
val sales=sc.parallelize(List(
("West", "Apple", 2.0, 10),
("West", "Apple", 3.0, 15),
("West", "Orange", 5.0, 15),
("South", "Orange", 3.0, 9),
("South", "Orange", 6.0, 18),
("East", "Milk", 5.0, 5))).repartition(2)
val tdf = sales.map{ case (store, prod, amt, units) => ((store, prod), (amt, amt, amt, units)) }.
reduceByKey((x, y) => (x._1 + y._1, math.min(x._2, y._2), math.max(x._3, y._3), x._4 + y._4))
println(tdf.toDebugString)
(2) ShuffledRDD[12] at reduceByKey at Test.scala:59 []
+-(2) MapPartitionsRDD[11] at map at Test.scala:58 []
| MapPartitionsRDD[10] at repartition at Test.scala:57 []
| CoalescedRDD[9] at repartition at Test.scala:57 []
| ShuffledRDD[8] at repartition at Test.scala:57 []
+-(1) MapPartitionsRDD[7] at repartition at Test.scala:57 []
| ParallelCollectionRDD[6] at parallelize at Test.scala:51 []
select store, prod, sum(amt), avg(units) from sales group by partition_id, store, prod
最佳答案
实现这一目标的唯一方法是使用 mapPartitions 并在迭代分区时使用自定义代码对您的值进行分组和计算。
正如您提到的,数据已经按分组键(store、prod)进行了排序,我们可以以流水线方式有效地计算您的聚合:
(1) 定义辅助类:
:paste
case class MyRec(store: String, prod: String, amt: Double, units: Int)
case class MyResult(store: String, prod: String, total_amt: Double, min_amt: Double, max_amt: Double, total_units: Int)
object MyResult {
def apply(rec: MyRec): MyResult = new MyResult(rec.store, rec.prod, rec.amt, rec.amt, rec.amt, rec.units)
def aggregate(result: MyResult, rec: MyRec) = {
new MyResult(result.store,
result.prod,
result.total_amt + rec.amt,
math.min(result.min_amt, rec.amt),
math.max(result.max_amt, rec.amt),
result.total_units + rec.units
)
}
}
:paste
def pipelinedAggregator(iter: Iterator[MyRec]): Iterator[Seq[MyResult]] = {
var prev: MyResult = null
var res: Seq[MyResult] = Nil
for (crt <- iter) yield {
if (prev == null) {
prev = MyResult(crt)
}
else if (prev.prod != crt.prod || prev.store != crt.store) {
res = Seq(prev)
prev = MyResult(crt)
}
else {
prev = MyResult.aggregate(prev, crt)
}
if (!iter.hasNext) {
res = res ++ Seq(prev)
}
res
}
:paste
val sales = sc.parallelize(
List(MyRec("West", "Apple", 2.0, 10),
MyRec("West", "Apple", 3.0, 15),
MyRec("West", "Orange", 5.0, 15),
MyRec("South", "Orange", 3.0, 9),
MyRec("South", "Orange", 6.0, 18),
MyRec("East", "Milk", 5.0, 5),
MyRec("West", "Apple", 7.0, 11)), 2).toDS
sales.mapPartitions(iter => Iterator(iter.toList)).show(false)
val result = sales
.mapPartitions(recIter => pipelinedAggregator(recIter))
.flatMap(identity)
result.show
result.explain
+-------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------+
|[[West,Apple,2.0,10], [West,Apple,3.0,15], [West,Orange,5.0,15]] |
|[[South,Orange,3.0,9], [South,Orange,6.0,18], [East,Milk,5.0,5], [West,Apple,7.0,11]]|
+-------------------------------------------------------------------------------------+
+-----+------+---------+-------+-------+-----------+
|store| prod|total_amt|min_amt|max_amt|total_units|
+-----+------+---------+-------+-------+-----------+
| West| Apple| 5.0| 2.0| 3.0| 25|
| West|Orange| 5.0| 5.0| 5.0| 15|
|South|Orange| 9.0| 3.0| 6.0| 27|
| East| Milk| 5.0| 5.0| 5.0| 5|
| West| Apple| 7.0| 7.0| 7.0| 11|
+-----+------+---------+-------+-------+-----------+
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line14.$read$$iw$$iw$MyResult, true]).store, true) AS store#31, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line14.$read$$iw$$iw$MyResult, true]).prod, true) AS prod#32, assertnotnull(input[0, $line14.$read$$iw$$iw$MyResult, true]).total_amt AS total_amt#33, assertnotnull(input[0, $line14.$read$$iw$$iw$MyResult, true]).min_amt AS min_amt#34, assertnotnull(input[0, $line14.$read$$iw$$iw$MyResult, true]).max_amt AS max_amt#35, assertnotnull(input[0, $line14.$read$$iw$$iw$MyResult, true]).total_units AS total_units#36]
+- MapPartitions <function1>, obj#30: $line14.$read$$iw$$iw$MyResult
+- MapPartitions <function1>, obj#20: scala.collection.Seq
+- Scan ExternalRDDScan[obj#4]
sales: org.apache.spark.sql.Dataset[MyRec] = [store: string, prod: string ... 2 more fields]
result: org.apache.spark.sql.Dataset[MyResult] = [store: string, prod: string ... 4 more fields]
关于apache-spark - 在分区内的多列上进行 Spark 聚合,无需洗牌,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46690575/
这是一个面试问题。请给一些提示: 使用 vector 实现一个方法,洗一副牌。 public class Card { private int value; Card(int v) {
我非常是 C++ 的新手,我还没有完全理解基本概念,但我的教授希望我们编写一个算法来洗牌和展示一副纸牌和一副牌需要表示为二维数组。 但是我在模拟一副纸牌时遇到了麻烦! #include #inclu
我想用 php 创建随机桥牌的集合。我认为我可以将一副有序的纸牌编码为下面的字符串 $deal(考虑到大写和小写时,我喜欢它有 52 个字母)。我发现了 php 函数 str_shuffle。所以我想
我想随机重新排序矩阵 A 的行以生成另一个新矩阵。如何在 R 中做到这一点? 最佳答案 使用 sample()以(伪)随机顺序生成行索引并使用 [ 重新排列矩阵. ## create a matrix
我计划在 android-Java 上开发一个简单的纸牌游戏,在我的应用程序中,我洗牌,所以我想知道将我的牌组存储在数组或堆栈中的最佳方法是什么..?堆栈的唯一问题是我不知道如何对其进行洗牌。 最佳答
我正在尝试创建一种方法,使用 Arraylist 随机打乱基元数组。我想知道 .get(); 方法是否是在我的 Arraylist 上使用的正确方法,在 for 循环中的普通数组上它只是 array[
我是 C++ 的新手,但是,我正在尝试创建一些盐和胡椒噪音。它几乎完成了,只是在这之间我想洗牌一个整数数组,无论我做什么,无论我使用什么洗牌功能,我总是得到令人讨厌的“从这里需要”,它没有告诉我任何事
我被要求编写一个程序(主要是一个方法)来洗牌。我编写了以下程序: public class Deck { //////////////////////////////////////// // Dat
我已经看到其他与此相关的话题,但似乎没有一个能回答我的问题。我尝试在 vector 上使用 shuffle() 函数,但我一直收到错误消息: deck_of_cards.cpp:34:5: error
是否与sort 一致导致更随机的数组或者是 sort这里只是浪费? use List::Util qw(shuffle); @random1 = shuffle sort keys %vocables
首先,这个问题是从 this 中摘下来的问题。我这样做是因为我认为这部分比较长问题的子部分更大。如有冒犯,请见谅。 假设您有一个生成随机性的算法。现在你如何测试它?或者更直接地说 - 假设您有一个洗牌
我正在制作一个配对游戏,其中有 8 个 ImageView 和 4 个不同的图像。我想知道页面何时加载是否可以交换 ImageView 的位置? 让每张图片的顺序每次都变? 最佳答案 试试这个:
我正在制作一款纸牌游戏,需要在游戏开始前洗牌。通过在将数组插入数据库之前对其进行混洗,可以毫无问题地完成此操作。然而,在游戏开始后,有些情况下玩家需要洗牌。我想到的唯一方法是重新洗牌后重新插入套牌,但
我正在运行一个网站,其中用户导航子目录的唯一方式是通过随机页面(类似于维基百科的随机页面功能)。我已经实现了调用随机页面的代码并且它工作正常,但我想尽量减少在 onclick 执行后再次调用同一页面的
我想用一个条件打乱一个 2d Numpy 数组。例如,仅随机播放非零值。 import numpy as np a = np.arange(9).reshape((3,3)) a[2,2] = 0 #
我将如何获取 RLMArray 并对其进行洗牌,以便 RLMArray 中的当前项目是随机的。 我已经查看了 RLMArray 的文档,但是我没有看到对其进行洗牌的好方法。 最佳答案 你最好的选择可能
在我的 Qt c++ 应用程序中,我有一个包含一组 QString 值的 QStringList!我想随机播放(任意更改 QStringList 中 QString 的位置)。 perl中是否有任何默
我知道它是如何工作的,但我在排序和配对方面仍然有问题,这样我才能确定获胜者。 将它们配对(配对是具有相同值(value)的卡片。)例如,红心 A 和黑桃 A 组成一对。然后我数那些对。拥有最高对子的手
我有一组这样的 div: 我需要在点击其中一个红色 div 时随机播放它们,但点击的 div 应该始终与黄色 div 交换。 fiddle here $('.box-red').click(funct
// deck of cards // below are initializations #include #include #include using namespace std; int
我是一名优秀的程序员,十分优秀!