- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
使用气流将 CSV 文件流式传输到 kafka 主题的最佳方法是什么?
为气流编写自定义运算符?
最佳答案
可能最好使用 PythonOperator
逐行处理文件。我有一个用例,我轮询和 SFTP 服务器获取文件,当我找到一些文件时,我逐行处理它们,将结果写为 JSON。我会做一些事情,比如将日期解析为 YYYY-MM-DD 格式等。这样的事情可能对你有用:
def csv_file_to_kafka(**context):
f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
现在如何下载文件完全取决于您。在我的例子中,我使用 SSHHook
和 GoogleCloudStorageHook
从 SFTP 服务器获取文件,然后将文件名传递给解析和清理 csv 文件的任务。我通过从 SFTP 中提取文件并将它们放入 Google Cloud Storage 来执行此操作:
"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')
"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []
for f in all_files:
files.append(f)
return files
def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')
sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []
new_files = process_sftp_files(files, gcs, sftp_client)
return new_files
def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
我知道我可以在一个大的 python 可调用文件中完成这一切,这就是我现在重构代码以便在可调用文件中的方式。所以它轮询 SFTP 服务器,提取最新文件,并根据我的规则在一个 python 函数中解析它们。我听说使用 XCom 并不理想,Airflow 任务不应该相互通信太多,据说。
根据您的用例,您甚至可能想要探索类似 Apache Nifi 的内容,实际上我现在也在研究这个问题。
关于python - 使用气流将文件流式传输到kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46778171/
根据the stable/airflow docs,您可以: mount a file to /requirements.txt to get additional Python modules fo
我在 google-cloud-composer 上创建并运行了 dags环境 ( dlkpipelinesv1 : composer-1.13.0-airflow-1.10.12)。我可以手动触发这
我按照文档安装了 Apache-airflow。 https://airflow.apache.org/docs/stable/start.html 当我执行airflow initdb时,每次都会出
我是一名优秀的程序员,十分优秀!