gpt4 book ai didi

python - Synapse Workspace 中的 PySpark Windows 函数(超前、滞后)

转载 作者:行者123 更新时间:2023-12-05 04:36:31 25 4
gpt4 key购买 nike

场景:

  • ticket 有 StartDateEndDate ,如果 StartDateEndDate 存在,则创建一个新的数据框作为在下面显示所需的输出。

Pyspark 数据集如下所示

#base Schema for Testing purpose
#Dataset

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
,StructField('StartTime', StringType(), True)\
,StructField('EndTime', StringType(), True)])

data = [
{"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
{"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
{"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
{"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
{"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
{"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
{"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
]

from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()

# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)

df1.show()

创建的数据集:

+----------+-------------+-------------+
|CaseNumber| StartTime| EndTime|
+----------+-------------+-------------+
| Ticket1|1/22/19 10:00| NaN|
| Ticket1| NaN|1/23/19 11:00|
| Ticket1| 1/25/19 7:00| NaN|
| Ticket1| 1/27/19 3:00| NaN|
| Ticket2|1/29/19 10:00| NaN|
| Ticket2| NaN| 2/23/19 2:00|
| Ticket2| 3/25/19 7:00| NaN|
| Ticket2| NaN| 3/27/19 8:00|
| Ticket2| NaN|3/27/19 10:00|
| Ticket3| 4/25/19 1:00| NaN|
+----------+-------------+-------------+

所需的输出应该是:

+----------+-------------+-------------+
|CaseNumber| StartTime| EndTime|
+----------+-------------+-------------+
| Ticket1|1/22/19 10:00|1/23/19 11:00|
| Ticket2|1/29/19 10:00| 2/23/19 2:00|
| Ticket2| 3/25/19 7:00| 3/27/19 8:00|
+----------+-------------+-------------+

应用 Lead 函数查看工单是否存在 endtime

from pyspark.sql.window import Window
import pyspark.sql.functions as psf

windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()

pysparkdf = df.toPandas()

import pandas as pd
tickets = pysparkdf.groupby('CaseNumber')

def isLeadnull(e):
return e['lead'] != None

my_list = []
for i,ticket in tickets:
for j,e in ticket.iterrows() :
if isLeadnull(e):
my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
else:
print(e['lead'],'Do nothing as condition not met')

这个函数之后的输出是:

[{'CaseNumber': 'Ticket1',
'Start': '1/22/19 10:00',
'EndTime': '1/23/19 11:00'},
{'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket2',
'Start': '1/29/19 10:00',
'EndTime': '2/23/19 2:00'},
{'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
{'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
{'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]

最佳答案

这是一种差距和孤岛问题。您可以通过创建 group 列使用条件累积和来识别“”,然后您可以按 CaseNumber + group 并聚合每个组的最大 StartTime 和最小 EndTime:

from pyspark.sql import functions as F, Window

# first, convert strings to timestamps and replacing empty strings with nulls
df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
.withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
.replace("", None)

w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))

df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
.groupBy("CaseNumber", "group") \
.agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
.filter(F.col("EndTime").isNotNull()) \
.drop("group")

df2.show()
#+----------+-------------------+-------------------+
#|CaseNumber| StartTime| EndTime|
#+----------+-------------------+-------------------+
#| Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
#| Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
#| Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
#+----------+-------------------+-------------------+

为了理解逻辑,您可以在组之前逐步显示中间列:

df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()

#+----------+-------------------+-------------------+-----+
#|CaseNumber| StartTime| EndTime|group|
#+----------+-------------------+-------------------+-----+
#| Ticket1|2019-01-22 10:00:00| null| 1|
#| Ticket1| null|2019-01-23 11:00:00| 1|
#| Ticket1|2019-01-25 07:00:00| null| 2|
#| Ticket1|2019-01-27 03:00:00| null| 3|
#| Ticket2|2019-01-29 10:00:00| null| 1|
#| Ticket2| null|2019-02-23 02:00:00| 1|
#| Ticket2|2019-03-25 07:00:00| null| 2|
#| Ticket2| null|2019-03-27 08:00:00| 2|
#| Ticket2| null|2019-03-27 10:00:00| 2|
#| Ticket3|2019-04-25 01:00:00| null| 1|
#+----------+-------------------+-------------------+-----+

关于python - Synapse Workspace 中的 PySpark Windows 函数(超前、滞后),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70819127/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com