gpt4 book ai didi

azure - 具有合并到多个表的自动加载器

转载 作者:行者123 更新时间:2023-12-03 02:11:31 25 4
gpt4 key购买 nike

我正在尝试使用以下代码在多个表上使用“合并”来实现自动装载器,如文档中所述:

def upsert_data(df, epoch_id):
deltaTable = DeltaTable.forPath(spark, target_location)\
deltaTable.alias("t").merge(df.alias("s"),\
"t.xx = s.xx and t.xx1 = s.xx1") \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll() \
.execute()

for i in range(len(list_of_files()[0])):
schema =list_of_files()[2][i]
raw_data = list_of_files()[1][i]
checkpoint= list_of_files()[3][i]
target_location = list_of_files()[4][i]

dfSource =list_of_files(raw_data)
dfMergedSchema = dfSource.where("1=0")
dfMergedSchema.createOrReplaceGlobalTempView("test1")
dfMergedSchema.write.option("mergeSchema","true").mode("append").format("delta")\
.save(target_location)

stream = spark.readStream\
.format("cloudFiles")\
.option("cloudFiles.format", "parquet")\
.option("header", "true")\
.schema(schema)\
.load(raw_data)

stream.writeStream.format("delta")\
.outputMode("append")\
.foreachBatch(upsert_data)\
.option("dataChange", "false")\
.trigger(once=True)\
.option("checkpointLocation", checkpoint)\
.start()

我的场景:我们有一个登陆区域,其中 Parquet 文件被附加到多个文件夹中,如下所示:

Landing Zone ---|                   
|-----folder 0 ---|----parquet1
| |----parquet2
| |----parquet3
|
|
|-----folder 1 ---|----parquet1
| |----parquet2
| |----parquet3


Then I am needing Auto Loader to create the tables as shown below with the checkpoints:
Staging Zone ---|
|-----folder 0 ---|----checkpoint
| |----table
|
|
|
|-----folder 1 ---|----checkpoint
| |----table
|

我注意到,如果 Writestream 中没有 foreachBatch 选项,但使用一次触发器,则代码可以按预期对多个表进行插入,如上面所示。当我们在单个表上同时具有 foreachBatch 和 Trigger 选项而不使用 for 循环时,该代码也可以工作。但是,当我尝试为 for 循环中的多个表启用这两个选项(foreachBatch 和 Trigger Once)时,自动加载器会将所有表内容合并到一个表中。您会获得一个检查点,但暂存区域中的文件夹 0 没有表内容,而在文件夹 1 中,您会获得一个检查点,但会在文件夹 1 的表文件夹中获得构成文件夹 0 和 1 的表内容的增量文件。将两个表合并为一个表。

我还得到了 ConcurrentAppendException。

我在文档中阅读了有关 ConcurrentAppendException 的内容,我发现您要么使用分区,要么在传递到 WriteStream 的 foreachBatch 选项的 upsert_data 函数中存在脱节条件。我都尝试了,但都不起作用。

在这个 for 循环中使用 foreachBatch 和 Trigger Once 的同时,如何隔离暂存区这一场景中不同文件夹的流?这里的 foreachBatch 选项肯定缺少一些东西,因为没有它,自动加载器能够将流隔离到文件夹 0 和文件夹 1,但有了它,就不能了。

最佳答案

今天与 Databricks 解决方案架构师交谈,他提到我需要使用 ThreadPoolExecutor,它是 Auto Loader 或 Databricks 本身之外的东西,但是 Python 原生的。这将在一个辅助函数中,该函数指定与自动加载器并行处理表的流的数量。因此,可以将 Auto Loader 笔记本的单个实例用于多个表,这符合我的用例。谢谢!

关于azure - 具有合并到多个表的自动加载器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73254482/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com