gpt4 book ai didi

pandas - 识别 PySpark DF ArrayType 列上运行的干净方法

转载 作者:行者123 更新时间:2023-12-03 23:47:20 27 4
gpt4 key购买 nike

给定以下形式的 PySpark DataFrame:

+----+--------+
|time|messages|
+----+--------+
| t01| [m1]|
| t03|[m1, m2]|
| t04| [m2]|
| t06| [m3]|
| t07|[m3, m1]|
| t08| [m1]|
| t11| [m2]|
| t13|[m2, m4]|
| t15| [m2]|
| t20| [m4]|
| t21| []|
| t22|[m1, m4]|
+----+--------+

我想重构它以压缩包含相同消息的运行(输出的顺序无关紧要,但为了清楚起见对她进行了排序):
+----------+--------+-------+
|start_time|end_time|message|
+----------+--------+-------+
| t01| t03| m1|
| t07| t08| m1|
| t22| t22| m1|
| t03| t04| m2|
| t11| t15| m2|
| t06| t07| m3|
| t13| t13| m4|
| t20| t20| m4|
| t22| t22| m4|
+----------+--------+-------+

(即,将 message 列视为一个序列,并为每条消息标识“运行”的开始和结束),

有没有一种干净的方法可以在 Spark 中进行这种转换?目前,我将其作为 6 GB TSV 转储并强制处理它。

我对 toPandas 的可能性持开放态度- 如果 Pandas 有一种干净的方法来进行此聚合,则将此操作并在驱动程序上进行累积。

(请参阅 my answer below 了解一个简单的基线实现)。

最佳答案

您可以尝试使用前向填充的以下方法(不需要 Spark 2.4+):

步骤 1:执行以下操作:

  • 对于按时间排序的每一行,找到 prev_messages 和 next_messages
  • 将消息分解为单个消息
  • 对于每条消息,如果 prev_messages 为 NULL 或消息不在 prev_messages 中,则设置 start=time,参见下面的 SQL 语法:
    IF(prev_messages is NULL or !array_contains(prev_messages, message),time,NULL)

    可以简化为:
    IF(array_contains(prev_messages, message),NULL,time)
  • 如果 next_messages 为 NULL 或 message 不在 next_messages 中,则设置 end=time

  • 代码如下:
    from pyspark.sql import Window, functions as F

    # rows is defined in your own post
    df = spark.createDataFrame(rows, ['time', 'messages'])

    w1 = Window.partitionBy().orderBy('time')

    df1 = df.withColumn('prev_messages', F.lag('messages').over(w1)) \
    .withColumn('next_messages', F.lead('messages').over(w1)) \
    .withColumn('message', F.explode('messages')) \
    .withColumn('start', F.expr("IF(array_contains(prev_messages, message),NULL,time)")) \
    .withColumn('end', F.expr("IF(array_contains(next_messages, message),NULL,time)"))

    df1.show()
    #+----+--------+-------------+-------------+-------+-----+----+
    #|time|messages|prev_messages|next_messages|message|start| end|
    #+----+--------+-------------+-------------+-------+-----+----+
    #| t01| [m1]| null| [m1, m2]| m1| t01|null|
    #| t03|[m1, m2]| [m1]| [m2]| m1| null| t03|
    #| t03|[m1, m2]| [m1]| [m2]| m2| t03|null|
    #| t04| [m2]| [m1, m2]| [m3]| m2| null| t04|
    #| t06| [m3]| [m2]| [m3, m1]| m3| t06|null|
    #| t07|[m3, m1]| [m3]| [m1]| m3| null| t07|
    #| t07|[m3, m1]| [m3]| [m1]| m1| t07|null|
    #| t08| [m1]| [m3, m1]| [m2]| m1| null| t08|
    #| t11| [m2]| [m1]| [m2, m4]| m2| t11|null|
    #| t13|[m2, m4]| [m2]| [m2]| m2| null|null|
    #| t13|[m2, m4]| [m2]| [m2]| m4| t13| t13|
    #| t15| [m2]| [m2, m4]| [m4]| m2| null| t15|
    #| t20| [m4]| [m2]| []| m4| t20| t20|
    #| t22|[m1, m4]| []| null| m1| t22| t22|
    #| t22|[m1, m4]| []| null| m4| t22| t22|
    #+----+--------+-------------+-------------+-------+-----+----+

    Step-2:创建按消息分区的WindSpec并向前填充到 start柱子。
    w2 = Window.partitionBy('message').orderBy('time')

    # for illustration purpose, I used a different column-name so that we can
    # compare `start` column before and after ffill
    df2 = df1.withColumn('start_new', F.last('start', True).over(w2))
    df2.show()
    #+----+--------+-------------+-------------+-------+-----+----+---------+
    #|time|messages|prev_messages|next_messages|message|start| end|start_new|
    #+----+--------+-------------+-------------+-------+-----+----+---------+
    #| t01| [m1]| null| [m1, m2]| m1| t01|null| t01|
    #| t03|[m1, m2]| [m1]| [m2]| m1| null| t03| t01|
    #| t07|[m3, m1]| [m3]| [m1]| m1| t07|null| t07|
    #| t08| [m1]| [m3, m1]| [m2]| m1| null| t08| t07|
    #| t22|[m1, m4]| []| null| m1| t22| t22| t22|
    #| t03|[m1, m2]| [m1]| [m2]| m2| t03|null| t03|
    #| t04| [m2]| [m1, m2]| [m3]| m2| null| t04| t03|
    #| t11| [m2]| [m1]| [m2, m4]| m2| t11|null| t11|
    #| t13|[m2, m4]| [m2]| [m2]| m2| null|null| t11|
    #| t15| [m2]| [m2, m4]| [m4]| m2| null| t15| t11|
    #| t06| [m3]| [m2]| [m3, m1]| m3| t06|null| t06|
    #| t07|[m3, m1]| [m3]| [m1]| m3| null| t07| t06|
    #| t13|[m2, m4]| [m2]| [m2]| m4| t13| t13| t13|
    #| t20| [m4]| [m2]| []| m4| t20| t20| t20|
    #| t22|[m1, m4]| []| null| m4| t22| t22| t22|
    #+----+--------+-------------+-------------+-------+-----+----+---------+

    第 3 步:删除 end 为 NULL 的行,然后仅选择所需的列:
    df2.selectExpr("message", "start_new as start", "end") \
    .filter("end is not NULL") \
    .orderBy("message","start").show()
    #+-------+-----+---+
    #|message|start|end|
    #+-------+-----+---+
    #| m1| t01|t03|
    #| m1| t07|t08|
    #| m1| t22|t22|
    #| m2| t03|t04|
    #| m2| t11|t15|
    #| m3| t06|t07|
    #| m4| t13|t13|
    #| m4| t20|t20|
    #| m4| t22|t22|
    #+-------+-----+---+

    总结以上步骤,我们有以下几点:
    from pyspark.sql import Window, functions as F

    # define two Window Specs
    w1 = Window.partitionBy().orderBy('time')
    w2 = Window.partitionBy('message').orderBy('time')

    df_new = df \
    .withColumn('prev_messages', F.lag('messages').over(w1)) \
    .withColumn('next_messages', F.lead('messages').over(w1)) \
    .withColumn('message', F.explode('messages')) \
    .withColumn('start', F.expr("IF(array_contains(prev_messages, message),NULL,time)")) \
    .withColumn('end', F.expr("IF(array_contains(next_messages, message),NULL,time)")) \
    .withColumn('start', F.last('start', True).over(w2)) \
    .select("message", "start", "end") \
    .filter("end is not NULL")

    df_new.orderBy("start").show()

    关于pandas - 识别 PySpark DF ArrayType 列上运行的干净方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61830488/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com