gpt4 book ai didi

azure - 如何将记录追加到foreachBatch中的增量表?

转载 作者:行者123 更新时间:2023-12-03 05:12:07 31 4
gpt4 key购买 nike

我正在使用 foreachbatch 将流数据写入多个目标,并且它在第一个微批处理执行时工作正常。当它尝试运行第二个微批处理时,它失败并出现以下错误。“StreamingQueryException:查询 [id = 0d8e45ff-4f3a-42c0-964d-6f41c93df801,runId = 186a22bf-c75e-482b-bd4b-19b039a9aa38] 因异常而终止:abfss://[email protected]/primary/directory1 已经存在”

下面是我使用的 foreach 片段。

df_new = <<<some streaming dataset>>>

val appId = "1dbcd4f2-eeb7-11ed-a05b-0242ac120003"

df_new.writeStream.format("delta")
.option("mergeSchema", "true").outputMode("append")
.option("checkpointLocation", "abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="2b5353536b53535353535353535353054f4d58054844594e055c42454f445c5805454e5f" rel="noreferrer noopener nofollow">[email protected]</a>/checkpoint/chkdir")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
val fc_final= batchDF.filter(col("msg_type") === "FC" )
.drop(columnlist_fc:_*)
fc_final.write
.option("txnVersion", batchId).option("txnAppId", appId)
.save("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="aed6d6d6d6eed6d6d6d6d6d6d6d6d6d680cac8dd80cdc1dccb80d9c7c0cac1d9dd80c0cbda" rel="noreferrer noopener nofollow">[email protected]</a>/primary/directory1")

val hb_final = batchDF.filter(col("msg_type") =!= "FC" )
.drop(columnlist_hb:_*)

hb_final.write.partitionBy("occurrence_month")
.option("txnVersion", batchId).option("txnAppId", appId)
.save("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="4b3333330b33333333333333333333652f2d38652824392e653c22252f243c3865252e3f" rel="noreferrer noopener nofollow">[email protected]</a>/primary/directory2")

batchDF.unpersist()
()

}.start().awaitTermination()

我在这里错过了什么?为什么即使我指定了 mode=append,它也无法将数据文件附加到 delta 目录。非常感谢您的帮助。

最佳答案

问题是在 .foreachBatch 内的 .write 中,您没有指定 save mode ,对于批量写入来说是SaveMode.ErrorIfExists,意思是如果数据存在则抛出错误。如果要追加数据,需要将其更改为 SaveMode.Append:

fc_final.write
.mode("append")
.option("txnVersion", batchId).option("txnAppId", appId)
.save("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="c1b9b9b9b981b9b9b9b9b9b9b9b9b9b9efa5a7b2efa2aeb3a4efb6a8afa5aeb6b2efafa4b5" rel="noreferrer noopener nofollow">[email protected]</a>/primary/directory1")

关于azure - 如何将记录追加到foreachBatch中的增量表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76214262/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com