gpt4 book ai didi

sql - Pyspark 分组和结构化数据

转载 作者:行者123 更新时间:2023-12-04 08:46:54 24 4
gpt4 key购买 nike

我在 spark 2.4.5 中有以下数据:

data = [
('1234', '203957', '2010', 'London', 'CHEM'),
('1234', '203957', '2010', 'London', 'BIOL'),
('1234', '288400', '2012', 'Berlin', 'MATH'),
('1234', '288400', '2012', 'Berlin', 'CHEM'),
]
d = spark.createDataFrame(data, ['auid', 'eid', 'year', 'city', 'subject'])
d.show()

+----+------+----+------+-------+
|auid| eid|year| city|subject|
+----+------+----+------+-------+
|1234|203957|2010|London| CHEM|
|1234|203957|2010|London| BIOL|
|1234|288400|2012|Berlin| MATH|
|1234|288400|2012|Berlin| CHEM|
+----+------+----+------+-------+
我需要从中获取按 auid 分组的 df ,按城市的时间顺序排列,即 London, Berlin[[Berlin, 2010], [London, 2012]]在另一列中,加上我需要按主题的降频列排序: [CHEM,2], [BIOL, 1], [MATH, 1] .或者它可能就像 [CHEM, BIOL, MATH] .
我试过这个:
d.groupBy('auid').agg(func.collect_set(func.struct('city', 'year')).alias('city_set')).show(10, False)
这导致了这个:
+----+--------------------------------+
|auid|city_set |
+----+--------------------------------+
|1234|[[Berlin, 2012], [London, 2010]]|
+----+--------------------------------+
我被困在这里,需要帮助。 (希望得到对 city_set 中的值进行排序的提示)

最佳答案

您可以在 struct('year', 'city') 上聚合 collect_list ,对数组进行排序,然后使用 transform函数来调整字段的顺序。与主题类似,创建一个包含两个字段的结构数组:cntsubject , 对结构数组进行排序/降序,然后仅检索 subject field :

df_new = d.groupBy('auid').agg(
func.sort_array(func.collect_set(func.struct('year', 'city'))).alias('city_set'),
func.collect_list('subject').alias('subjects')
).withColumn('city_set', func.expr("transform(city_set, x -> (x.city as city, x.year as year))")) \
.withColumn('subjects', func.expr("""
sort_array(
transform(array_distinct(subjects), x -> (size(filter(subjects, y -> y=x)) as cnt, x as subject)),
False
).subject
"""))

df_new.show(truncate=False)
+----+--------------------------------+------------------+
|auid|city_set |subjects |
+----+--------------------------------+------------------+
|1234|[[London, 2010], [Berlin, 2012]]|[CHEM, MATH, BIOL]|
+----+--------------------------------+------------------+
编辑:有几种方法可以删除 city_set 中重复的城市条目。大批:
  • 使用Window函数调整year到每个城市的分钟(年),然后重复上述过程。
    d = d.withColumn('year', func.min('year').over(Window.partitionBy('auid','city')))
  • 使用 aggregatecity_set 中删除重复项的函数大批:
    df_new = d.groupBy('auid').agg(
    func.sort_array(func.collect_set(func.struct('year', 'city'))).alias('city_set')
    ).withColumn("city_set", func.expr("""
    aggregate(
    /* expr: take slice of city_set array from the 2nd element to the last */
    slice(city_set,2,size(city_set)-1),
    /* start: initialize `acc` as an array with a single entry city_set[0].city */
    array(city_set[0].city),
    /* merge: iterate through `expr`, if x.city exists in `acc`, keep as-is
    * , otherwise add an entry to `acc` using concat function */
    (acc,x) -> IF(array_contains(acc,x.city), acc, concat(acc, array(x.city)))
    )
    """))

  • 注:不过,使用 Spark 3.0+ 会容易得多:
    df_new = d.groupBy('auid').agg(func.expr("array_sort(collect_set((city,year)), (l,r) -> int(l.year-r.year)) as city_set"))

    关于sql - Pyspark 分组和结构化数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64270309/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com