gpt4 book ai didi

apache-spark - 没有聚合的 Spark 枢轴

转载 作者:行者123 更新时间:2023-12-04 04:24:35 26 4
gpt4 key购买 nike

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html很好地解释了一个枢轴如何为 Spark 工作。

在我的python代码中,我使用没有聚合的pandas但重置索引并加入:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B'])
countryToMerge.index.name = 'ISO'
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')

这在 Spark 中如何工作?

我尝试手动分组和加入,例如:
val grouped = countryKPI.groupBy("A").pivot("B")
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show

但这不起作用。 reset_index 将如何适应 spark/它如何以 spark native 方式实现?

编辑

python代码的最小示例:
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"])
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO'])
dates['ISO'] = isos.ISO
dates['ISO'] = dates['ISO'].astype("category")
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'],
'indicator_id':['a','a','b','b'],
'value':[7,8,9,7]})
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id'])
countryToMerge.index.name = 'ISO'
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner'))

dates ISO a b
0 2016-01-01 ABC 7 9
1 2016-01-03 ABC 7 9
2 2016-01-05 ABC 7 9
3 2016-01-07 ABC 7 9
4 2016-01-09 ABC 7 9
5 2016-01-02 POL 8 7
6 2016-01-04 POL 8 7
7 2016-01-06 POL 8 7
8 2016-01-08 POL 8 7
9 2016-01-10 POL 8 7

在 Scala/spark 中跟进
val dates = Seq(("2016-01-01", "ABC"),
("2016-01-02", "ABC"),
("2016-01-03", "POL"),
("2016-01-04", "ABC"),
("2016-01-05", "POL"),
("2016-01-06", "ABC"),
("2016-01-07", "POL"),
("2016-01-08", "ABC"),
("2016-01-09", "POL"),
("2016-01-10", "ABC")
).toDF("dates", "ISO")
.withColumn("dates", 'dates.cast("Date"))

dates.show
dates.printSchema

val countryKPI = Seq(("ABC", "a", 7),
("ABC", "b", 8),
("POL", "a", 9),
("POL", "b", 7)
).toDF("country_id3", "indicator_id", "value")

countryKPI.show
countryKPI.printSchema

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id")

最佳答案

没有在 Spark 中聚合的好方法,它基本上假设您将只使用 OneHotEncoder 来实现该功能,但这缺乏直接枢轴的人类可读性。我发现这样做的最佳方法是:

val pivot = countryKPI
.groupBy("country_id3", "value")
.pivot("indicator_id", Seq("a", "b"))
.agg(first(col("indicator_id")))

pivot.show
+-----------+-----+----+----+
|country_id3|value| a| b|
+-----------+-----+----+----+
| ABC| 8|null| b|
| POL| 9| a|null|
| POL| 7|null| b|
| ABC| 7| a|null|
+-----------+-----+----+----+

但是,如果 (country_id3, value)在数据集中不明显,然后您折叠行并可能采用有点无意义的 first()来自您的枢轴列的值。

另一种方法是向数据集添加一个 id 列,对该新 id 进行分组,旋转所需的列,然后连接回原始数据集。下面是一个例子:
val countryWithId = countryKPI.withColumn("id", monotonically_increasing_id)
val pivotted = countryWithId
.groupBy("id")
.pivot("indicator_id")
.agg(first(col("indicator_id")))

val pivot2 = countryWithId.join(pivotted, Seq("id")).drop("id") //.drop("indicator_id")

pivot2.show
+-----------+------------+-----+----+----+
|country_id3|indicator_id|value| a| b|
+-----------+------------+-----+----+----+
| ABC| a| 7| a|null|
| ABC| b| 8|null| b|
| POL| a| 9| a|null|
| POL| b| 7|null| b|
+-----------+------------+-----+----+----+

在这种情况下,您仍然拥有原始的枢轴列,但您可以 .drop()如果您愿意,也可以这样做。

关于apache-spark - 没有聚合的 Spark 枢轴,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40752819/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com