gpt4 book ai didi

python - Pyspark - 2 个数据帧之间的区别 - 识别插入、更新和删除

转载 作者:行者123 更新时间:2023-12-05 02:37:50 24 4
gpt4 key购买 nike

我有 2 个数据帧 df1(旧)和 df2(新)。我正在尝试将 df2 与 df1 进行比较,并找到新添加的行、删除的行、更新的行以及更新的列的名称。

这是我写的代码

from pyspark.sql.functions import col, array, when, array_remove, lit

data1 = [("James","rob","Smith","36636","M",3000),
("Michael","Rose","jim","40288","M",4000),
("Robert","dunkin","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","60563","F",-1)
]

data2 = [("James","rob","Smith","36636","M",3000),
("Robert","dunkin","Williams","42114","M",2000),
("Maria","Anne","Jones","72712","F",3000),
("Yesh","Reddy","Brown","75234","M",3000),
("Jen","Mary","Brown","60563","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])

df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

select_expr =[
col("firstname"),col("middlename"),col("lastname"),
*[df2[c] for c in df2.columns if c not in ['firstname','middlename','lastname']],
array_remove(array(*conditions_), "").alias("updated_columns")
]

df1.join(df2, ["firstname","middlename","lastname"],"inner").select(*select_expr).show()

这是我得到的结果

+---------+----------+--------+-----+------+------+---------------+
|firstname|middlename|lastname| id|gender|salary|updated_columns|
+---------+----------+--------+-----+------+------+---------------+
| James| rob| Smith|36636| M| 3000| []|
| Robert| dunkin|Williams|42114| M| 2000| [salary]|
| Maria| Anne| Jones|72712| F| 3000| [id, salary]|
| Jen| Mary| Brown|60563| F| -1| []|
+---------+----------+--------+-----+------+------+---------------+

这是我期待的输出

+---------+----------+--------+-----+------+------+---------------+-----------------+
|firstname|middlename|lastname| id|gender|salary|updated_columns| status|
+---------+----------+--------+-----+------+------+---------------+-----------------+
| James| rob| Smith|36636| M| 3000| []| unchanged|
| Robert| dunkin|Williams|42114| M| 2000| [salary]| updated|
| Michael| Rose| jim|40288| M| 4000| []| deleted|
| Maria| Anne| Jones|72712| F| 3000| [id, salary]| updated|
| Yesh| Reddy| Brown|75234| M| 3000| []| added|
| Jen| Mary| Brown|60563| F| -1| []| unchanged|
+---------+----------+--------+-----+------+------+---------------+-----------------+

我知道我可以分别使用左反连接找到添加和删除的行。但是,我正在寻找更新现有连接以获得上述输出的方法。

最佳答案

outer join 会帮助你的情况。我已经修改了您提供的代码来执行此操作,并且还包含了状态列。

最小工作示例

from pyspark.sql.functions import col, array, when, array_remove, lit, size, coalesce
from pyspark.sql.types import *

data1 = [("James","rob","Smith","36636","M",3000),
("Michael","Rose","jim","40288","M",4000),
("Robert","dunkin","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","60563","F",-1)
]

data2 = [("James","rob","Smith","36636","M",3000),
("Robert","dunkin","Williams","42114","M",2000),
("Maria","Anne","Jones","72712","F",3000),
("Yesh","Reddy","Brown","75234","M",3000),
("Jen","Mary","Brown","60563","F",-1)
]
schema = StructType([
StructField("firstname",StringType(),True),
StructField("middlename",StringType(),True),
StructField("lastname",StringType(),True),
StructField("id", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True)
])

df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

status 的逻辑列和修改select_exprcoalesce来自 df2 的值和 df1优先考虑 df2获取最新数据。

status = when(df1["id"].isNull(), lit("added")).when(df2["id"].isNull(), lit("deleted")).when(size(array_remove(array(*conditions_), "")) > 0, lit("updated")).otherwise("unchanged")

select_expr =[
col("firstname"),col("middlename"),col("lastname"),
*[coalesce(df2[c], df1[c]).alias(c) for c in df2.columns if c not in ['firstname','middlename','lastname']],
array_remove(array(*conditions_), "").alias("updated_columns"),
status.alias("status"),
]

最后,应用 outer join .

df1.join(df2, ["firstname","middlename","lastname"],"outer").select(*select_expr).show()

输出

+---------+----------+--------+-----+------+------+---------------+---------+
|firstname|middlename|lastname| id|gender|salary|updated_columns| status|
+---------+----------+--------+-----+------+------+---------------+---------+
| James| rob| Smith|36636| M| 3000| []|unchanged|
| Jen| Mary| Brown|60563| F| -1| []|unchanged|
| Maria| Anne| Jones|72712| F| 3000| [id, salary]| updated|
| Michael| Rose| jim|40288| M| 4000| []| deleted|
| Robert| dunkin|Williams|42114| M| 2000| [salary]| updated|
| Yesh| Reddy| Brown|75234| M| 3000| []| added|
+---------+----------+--------+-----+------+------+---------------+---------+

关于python - Pyspark - 2 个数据帧之间的区别 - 识别插入、更新和删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69886683/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com