- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
使用气流将 CSV 文件流式传输到 kafka 主题的最佳方法是什么?
为气流编写自定义运算符?
最佳答案
可能最好使用 PythonOperator
逐行处理文件。我有一个用例,我轮询和 SFTP 服务器获取文件,当我找到一些文件时,我逐行处理它们,将结果写为 JSON。我会做一些事情,比如将日期解析为 YYYY-MM-DD 格式等。这样的事情可能对你有用:
def csv_file_to_kafka(**context):
f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
现在如何下载文件完全取决于您。在我的例子中,我使用 SSHHook
和 GoogleCloudStorageHook
从 SFTP 服务器获取文件,然后将文件名传递给解析和清理 csv 文件的任务。我通过从 SFTP 中提取文件并将它们放入 Google Cloud Storage 来执行此操作:
"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')
"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []
for f in all_files:
files.append(f)
return files
def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')
sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []
new_files = process_sftp_files(files, gcs, sftp_client)
return new_files
def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
我知道我可以在一个大的 python 可调用文件中完成这一切,这就是我现在重构代码以便在可调用文件中的方式。所以它轮询 SFTP 服务器,提取最新文件,并根据我的规则在一个 python 函数中解析它们。我听说使用 XCom 并不理想,Airflow 任务不应该相互通信太多,据说。
根据您的用例,您甚至可能想要探索类似 Apache Nifi 的内容,实际上我现在也在研究这个问题。
关于python - 使用气流将文件流式传输到kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46778171/
我刚刚更新了 Ruby,现在我在尝试启动 compass 时遇到以下错误: Encoding::CompatibilityError on line ["28"] of /usr/local/Cell
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 6 年前。
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在尝试在我的 iOS 应用程序中开发可折叠/ Accordion 式的功能。这将是您可以在网站上找到的典型 FAQ 类型功能。我想点击标题,然后显示详细信息。 因为这是帮助部分,只有几个项目,我认
我正在尝试设计一个基于 REST 的 Web 服务来与我正在开发的农场动物管理系统进行交互。 为了详细说明问题,我收藏了动物 属于一个农场。每只动物都有自己的信息——例如姓名、身份证号、品种年龄等。因
我有 3 种不同的表单,其中复选框数量不同,每个部分基本上代表一个表单,因此当用户选择该部分中的复选框时,它会显示他们在该部分的总金额中 checkout 了多少 HTML
我有一份 32 页的 PDF 版家谱。与其将家谱全部放在一个非常大的 PDF 页面上(这是我想要的),不如将其格式化为一组 8 个单独的美国信纸大小的页面应该在整个宽度上缝合; 4 行这样就完成了树。
指SASS implementation for Java? : 在 Maven 目标编译包中自动编译 compass-style.org 样式表的最佳方法是什么? 我不想发送太多的自编译库,也不想通
鉴于以下 XAML... 我正在寻找一种绑定(bind) ComboBox、Button 和 Command 的方法,以便当 ComboBox 的值更改时,在 Command 上调用 CanExe
在玩具应用程序中,我有一个显示所有帖子标题的“帖子”模板。当您单击每个标题时,我不想直接进入“显示” View ,而是直接内联展开该帖子的其余内容。 我考虑过让 postRoute 重用 postsR
我需要一些使用 Twitter Bootstrap 或其他响应式框架的自定义 Swagger-UI 实现。需要在我的移动设备上使用这样的 UI 测试我的 API,但 swagger-ui 不能很好地扩
我正在做一个项目,我真的在尝试编写面向对象的 JavaScript 代码。我刚刚开始阅读Douglas Crockford's JavaScript: The Good Parts我很快开始意识到用
在 C# 中,我通过执行以下操作来加密文本数据(请注意我正在以 block ( block )的形式加密数据): public string EncryptData(string pu
我正在构建一个社交网站,该网站将向全世界公开 REST API (WCF WebAPI),以便任何开发人员都能够为该网站创建客户端应用程序、将其与其他服务集成等。 我想为 API 实现 Faceboo
我是一名优秀的程序员,十分优秀!