gpt4 book ai didi

apache-spark - 如何一次查询 delta lake 表中的所有版本以跟踪对特定 ID 所做的更改

转载 作者:行者123 更新时间:2023-12-05 02:49:53 25 4
gpt4 key购买 nike

我有一个员工表,其中包含使用 delta lake 管理的所有 employeesalary

我可以像这样使用 delta lake 支持的时间旅行功能,根据版本时间戳查询表。

SELECT *
FROM DELTA.`EMPLOYEE`
VERSION AS OF 3

但我想知道在所有版本的增量表中对员工所做的所有更改的历史记录。像这样

SELECT *
, timestamp -- From delta table
, version -- From delta table
FROM DELTA.`EMPLOYEE`
WHERE EMPLOYEE = 'George'
WITHIN ALL VERSIONS --Never exists but just for understanding

最佳答案

这是一个老问题,但今天我偶然发现了它,因为我有一些问题要解决。我认为 Delta (delta.io) 没有为此提供方法,因为 Delta 围绕时间旅行到特定时间点而不是一段时间。

但如果我必须得到这个,我想一种方法是直接读取 Parquet 文件(忽略增量日志),这将导致记录的所有过去版本/状态(将 Vacuum 等放在一边)。

现在如果要求是获取每个记录创建的确切版本(这是我的要求),请使用类似

dataframe.withColumn("input_file", input_file_name())这将显示记录来自的确切文件名。

现在查询 .json _delta_log 事务文件,它会告诉我们哪个版本添加了哪个文件,像这样

>>> details = spark.read.json('/data/gcs/delta/ingest/bigtable/_delta_log/*.json')
>>> details = details.select(col('add')['path'].alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL")
>>> details.show(5,100)
+-------------------------------------------------------------------+-------+
| file_path|version|
+-------------------------------------------------------------------+-------+
|part-00000-148c98cc-0db1-495e-bb67-0ba1cc4fd45e-c000.snappy.parquet| 4|
|part-00001-2caa89b7-c990-47e0-b7b0-92430b15b141-c000.snappy.parquet| 4|
|part-00002-1f900af7-d819-48e9-a048-ad22e5c7ce65-c000.snappy.parquet| 4|
|part-00003-e043f466-861b-47f0-a1cf-4b67e75a5ed2-c000.snappy.parquet| 4|
|part-00000-93cc0747-ca0b-46ef-ada4-b3fb18e48925-c000.snappy.parquet| 0|
+-------------------------------------------------------------------+-------+
only showing top 5 rows

在 file_path 上加入这两个数据帧,您将看到记录的每个状态/版本及其创建时的增量版本。我的示例 -

parquet_table = spark.read.parquet('/data/gcs/delta/ingest/bigtable/*.parquet')

>>> parquet_table.printSchema()
root
|-- Region: string (nullable = true)
|-- Country: string (nullable = true)
|-- Item_Type: string (nullable = true)
|-- Sales_Channel: string (nullable = true)


parquet_table = parquet_table.where(col("Order_ID")==913712584).\
withColumn("input_file",substring(input_file_name(),38,1000)).\
select(["Order_ID","Region","Country","Sales_Channel","input_file"]).\
orderBy("Country")

>>> parquet_table.join(details,parquet_table.input_file == details.file_path).select("Order_ID","Region","Country","Sales_Channel","version").orderBy("version").show(100)
+---------+------------------+-------+-------------+-------+
| Order_ID| Region|Country|Sales_Channel|version|
+---------+------------------+-------+-------------+-------+
|913712584|Sub-Saharan Africa|Lesotho| Online| 0|
|913712584|Sub-Saharan Africa|Lesotho| Online| 0|
|913712584|Sub-Saharan Africa|Lesotho| Online| 0|

关于apache-spark - 如何一次查询 delta lake 表中的所有版本以跟踪对特定 ID 所做的更改,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63887197/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com