gpt4 book ai didi

Azure数据工厂: Output Copied File and Folder Information from Copy Activity

转载 作者:行者123 更新时间:2023-12-05 07:27:22 25 4
gpt4 key购买 nike

我正在使用 Azure 数据工厂中的自托管集成运行时将数据从本地源(普通文件系统)复制到 Azure Blob 存储目标。传输后,我想通过附加在 Databricks 集群上运行的笔记本来自动处理文件。管道工作正常,但我的问题涉及复制事件的输出。

有没有办法获取每次运行时传输的文件和文件夹的信息?我会将此信息作为参数传递给笔记本。

查看文档,似乎只有汇总信息可用:

https://learn.microsoft.com/en-us/azure/data-factory/copy-activity-overview

如果您传输大量文件,这是有意义的。如果不可能,我想另一种方法是将复制过程留给其本身,并根据存储帐户事件创建另一个管道?或者也许可以将每次运行的新文件和文件夹信息存储在固定的文本文件中,也传输它,并在笔记本中读取它?

最佳答案

如果您想获取从数据工厂读取的文件或目录的信息,可以使用获取元数据事件来完成,请参阅以下内容 answer举个例子。

检测笔记本中新文件的另一种方法是对文件源使用结构化流。这非常有效,您只需在复制事件之后调用笔记本事件即可。

为此,您定义一个流输入数据帧:

streamingInputDF = (
spark
.readStream
.schema(pqtSchema)
.parquet(inputPath)
)

inputPath 指向 Blob 存储中的输入目录。支持的文件格式有 text、csv、json、orc、parquet,因此这取决于您的具体场景。

重要的是,在目标上您使用触发一次选项,因此笔记本不需要永久运行,例如。例如:

streamingOutputDF \
.repartition(1) \
.writeStream \
.format("parquet") \
.partitionBy('Id') \
.option("checkpointLocation", adlpath + "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
.option("path", targetPath + "delta") \
.trigger(once=True) \
.start()

另一种方法可能是使用 Azure 队列存储 (AQS),请参阅以下内容 documentation .

关于Azure数据工厂: Output Copied File and Folder Information from Copy Activity,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53985412/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com