gpt4 book ai didi

python - 将pyspark偏移滞后动态值检索到其他数据帧

转载 作者:行者123 更新时间:2023-12-02 22:00:53 24 4
gpt4 key购买 nike

我正在使用pyspark 2.1。以下是我的输入数据框。我被困在从不同数据框中获取动态偏移值,请帮忙

df1 =

类别值

1 3

2 2

4 5

df2

类别年月周数lag_attribute运行

1 0 0 0 0 2

1 2019年1 1 1 0

1 2019年1 2 2 0

1 2019年1 3 3 0

1 2019年1 4 4 1

1 2019年1 5 5 2

1 2019年1 6 6 3

1 2019年1 7 7 4

1 2019年1 8 8 5

1 2019年1 9 9 6

2 0 0 0 9 0

2 2018 1 1 2 0

2 2018 1 2 3 2

2 2018年1 3 4 3

2 2018年1 3 5 4

如上例所示,df1是我的查找表,它具有偏移值,其中1偏移值为3,类别2偏移值为2。

在df2中,runs是我的输出列,因此对于df1中的每个类别值,如果滞后值为3,则应从dataframe2 [df2]考虑lag_attrbute并向下滞后3个值,因此您可以看到lag_attribute的每3个值在重复

我尝试在下面的编码不起作用。请帮忙

df1=df1.registerTempTable("df1")
df2=df2.registerTempTable("df2")
sqlCtx.sql("select st.category,st.Year,st.Month,st.weekyear,st.lag_attribute,LAG(st.lag_attribute,df1.value, 0) OVER (PARTITION BY st.cagtegory ORDER BY st.Year,st.Month,st.weekyear) as return_test from df1 st,df2 lkp where df1.category=df2.category")

请帮助我克服这个障碍

最佳答案

lag接受一个列对象和一个整数(python整数),如函数签名所示:

Signature: psf.lag(col, count=1, default=None)
count的值不能是pyspark IntegerType(列对象)。但是,有一些解决方法,让我们从示例数据开始:

df1 = spark.createDataFrame([[1, 3],[2, 2],[4, 5]], ["category", "value"])
df2 = spark.createDataFrame([[1, 0, 0, 0, 0, 2],[1, 2019, 1, 1, 1, 0],[1, 2019, 1, 2, 2, 0],[1, 2019, 1, 3, 3, 0],
[1, 2019, 1, 4, 4, 1],[1, 2019, 1, 5, 5, 2],[1, 2019, 1, 6, 6, 3],[1, 2019, 1, 7, 7, 4],
[1, 2019, 1, 8, 8, 5],[1, 2019, 1, 9, 9, 6],[2, 0, 0, 0, 9, 0],[2, 2018, 1, 1, 2, 0],
[2, 2018, 1, 2, 3, 2],[2, 2018, 1, 3, 4, 3],[2, 2018, 1, 3, 5, 4]],
["category", "year", "month", "weeknumber", "lag_attribute", "runs"])
  • 如果df1而不是太大(意味着少量的categories,并且每个category中可能包含很多值),该怎么办,就是将df1转换为列表并创建if-elif-elif ...条件根据其值:

    list1 = df1.collect()
    sc.broadcast(list1)

    import pyspark.sql.functions as psf
    from pyspark.sql import Window
    w = Window.partitionBy("category").orderBy("year", "month", "weeknumber")
    cond = eval('psf' + ''.join(['.when(df2.category == ' + str(c) + ', psf.lag("lag_attribute", ' + str(l) + ', 0).over(w))' for c, l in list1]))

    注意:这是如果cl是整数,如果它们是字符串,则:

    cond = eval('psf' + ''.join(['.when(df2.category == "' + str(c) + '", psf.lag("lag_attribute", "' + str(l) + '", 0).over(w))' for c, l in list1]))

    现在我们可以应用条件:

    df2.select("*", cond.alias("return_test")).show()

    +--------+----+-----+----------+-------------+----+-----------+
    |category|year|month|weeknumber|lag_attribute|runs|return_test|
    +--------+----+-----+----------+-------------+----+-----------+
    | 1| 0| 0| 0| 0| 2| 0|
    | 1|2019| 1| 1| 1| 0| 0|
    | 1|2019| 1| 2| 2| 0| 0|
    | 1|2019| 1| 3| 3| 0| 0|
    | 1|2019| 1| 4| 4| 1| 1|
    | 1|2019| 1| 5| 5| 2| 2|
    | 1|2019| 1| 6| 6| 3| 3|
    | 1|2019| 1| 7| 7| 4| 4|
    | 1|2019| 1| 8| 8| 5| 5|
    | 1|2019| 1| 9| 9| 6| 6|
    | 2| 0| 0| 0| 9| 0| 0|
    | 2|2018| 1| 1| 2| 0| 0|
    | 2|2018| 1| 2| 3| 2| 9|
    | 2|2018| 1| 3| 4| 3| 2|
    | 2|2018| 1| 3| 5| 4| 3|
    +--------+----+-----+----------+-------------+----+-----------+
  • 如果df1,则可以在已建立的df2列上自行加入lag:

    首先,我们将使用联接将valuesdf1带到df2:

    df = df2.join(df1, "category")

    如果df1不太大,则应使用broadcast对其进行编码:

    import pyspark.sql.functions as psf
    df = df2.join(psf.broadcast(df1), "category")

    现在,我们将枚举每个partition中的行,并构建一个lag列:

    from pyspark.sql import Window
    w = Window.partitionBy("category").orderBy("year", "month", "weeknumber")
    left = df.withColumn('rn', psf.row_number().over(w))
    right = left.select((left.rn + left.value).alias("rn"), left.lag_attribute.alias("return_test"))

    left.join(right, ["category", "rn"], "left")\
    .na.fill(0)\
    .sort("category", "rn").show()

    +--------+---+----+-----+----------+-------------+----+-----+-----------+
    |category| rn|year|month|weeknumber|lag_attribute|runs|value|return_test|
    +--------+---+----+-----+----------+-------------+----+-----+-----------+
    | 1| 1| 0| 0| 0| 0| 2| 3| 0|
    | 1| 2|2019| 1| 1| 1| 0| 3| 0|
    | 1| 3|2019| 1| 2| 2| 0| 3| 0|
    | 1| 4|2019| 1| 3| 3| 0| 3| 0|
    | 1| 5|2019| 1| 4| 4| 1| 3| 1|
    | 1| 6|2019| 1| 5| 5| 2| 3| 2|
    | 1| 7|2019| 1| 6| 6| 3| 3| 3|
    | 1| 8|2019| 1| 7| 7| 4| 3| 4|
    | 1| 9|2019| 1| 8| 8| 5| 3| 5|
    | 1| 10|2019| 1| 9| 9| 6| 3| 6|
    | 2| 1| 0| 0| 0| 9| 0| 2| 0|
    | 2| 2|2018| 1| 1| 2| 0| 2| 0|
    | 2| 3|2018| 1| 2| 3| 2| 2| 9|
    | 2| 4|2018| 1| 3| 4| 3| 2| 2|
    | 2| 5|2018| 1| 3| 5| 4| 2| 3|
    +--------+---+----+-----+----------+-------------+----+-----+-----------+

  • 注意:您的 runs滞后值存在问题,对于 catagory=2,它仅滞后 1而不是 2。另外,某些行在数据框中的顺序相同(例如,示例数据框 df2中的最后两行具有相同的 category, year, month and weeknumber),因为涉及改组,因此每次运行代码时可能会得到不同的结果。

    关于python - 将pyspark偏移滞后动态值检索到其他数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47272640/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com