gpt4 book ai didi

airflow - 使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?

转载 作者:行者123 更新时间:2023-12-02 18:12:15 25 4
gpt4 key购买 nike

我有一个 DAG,用于检查文件是否已上传到特定目录中的 Azure DataLake。如果是这样,则允许其他 DAG 运行。

我考虑过使用 FileSensor,但我认为 fsconnid 参数不足以针对 DataLake 进行身份验证

最佳答案

Azure provider 中没有 AzureDataLakeSensor但您可以轻松地实现一个,因为 AzureDataLakeHookcheck_for_file函数,因此所需要做的就是用实现 poke() 的 Sensor 类包装该函数BaseSensorOperator 的函数。通过这样做,您可以使用 Microsoft Azure Data Lake Connection直接。

我没有测试它,但这应该有效:

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator

class MyAzureDataLakeSensor(BaseSensorOperator):
"""
Sense for files in Azure Data Lake

:param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
:param azure_data_lake_conn_id: The Azure Data Lake conn
"""

template_fields: Sequence[str] = ('path',)
ui_color = '#901dd2'

def __init__(
self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
) -> None:
super().__init__(**kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id

def poke(self, context: "Context") -> bool:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Poking for file in path: %s', self.path)
try:
hook.check_for_file(file_path=self.path)
return True
except FileNotFoundError:
pass
return False

使用示例:

MyAzureDataLakeSensor(
task_id='adls_sense',
path='folder/file.csv',
azure_data_lake_conn_id='azure_data_lake_default',
mode='reschedule'
)

关于airflow - 使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72138993/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com