gpt4 book ai didi

python - 使用气流将文件流式传输到kafka

转载 作者:太空狗 更新时间:2023-10-29 21:41:41 25 4
gpt4 key购买 nike

使用气流将 CSV 文件流式传输到 kafka 主题的最佳方法是什么?

为气流编写自定义运算符?

最佳答案

可能最好使用 PythonOperator 逐行处理文件。我有一个用例,我轮询和 SFTP 服务器获取文件,当我找到一些文件时,我逐行处理它们,将结果写为 JSON。我会做一些事情,比如将日期解析为 YYYY-MM-DD 格式等。这样的事情可能对你有用:

def csv_file_to_kafka(**context):

f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)

for row in reader:
"""
Send the row to Kafka
"""
return

csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)

现在如何下载文件完全取决于您。在我的例子中,我使用 SSHHookGoogleCloudStorageHook 从 SFTP 服务器获取文件,然后将文件名传递给解析和清理 csv 文件的任务。我通过从 SFTP 中提取文件并将它们放入 Google Cloud Storage 来执行此操作:

"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')

"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []

for f in all_files:
files.append(f)

return files

def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')

sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []

new_files = process_sftp_files(files, gcs, sftp_client)

return new_files

def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)

for row in reader:
"""
Send the row to Kafka
"""
return

get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)

我知道我可以在一个大的 python 可调用文件中完成这一切,这就是我现在重构代码以便在可调用文件中的方式。所以它轮询 SFTP 服务器,提取最新文件,并根据我的规则在一个 python 函数中解析它们。我听说使用 XCom 并不理想,Airflow 任务不应该相互通信太多,据说。

根据您的用例,您甚至可能想要探索类似 Apache Nifi 的内容,实际上我现在也在研究这个问题。

关于python - 使用气流将文件流式传输到kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46778171/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com