- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试编写一个管道,当 postgres db 被带到文件夹时,它应该使用 csv 的内容进行更新。我编写了一个 dag,它创建表并在从 Web UI 触发时推送 csv 内容。这是代码:
from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.',
schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Pushing data in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
create_table >> python_task
当将 csv 手动粘贴/带到文件夹时,我无法弄清楚如何触发任务。
最佳答案
原来 Airflow 有一个特殊的模块来满足这种需求。我使用 Airflow 本身提供的 FileSensor 解决了这个问题。
根据文档:
FileSensor Waits for a file or folder to land in a filesystem.If the path given is a directory then this sensor will only return true ifany files exist inside it (either directly, or within a subdirectory)
from datetime import datetime
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Creating table in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
file_sensing_task = FileSensor(task_id='sense_the_csv',
filepath='test.csv',
fs_conn_id='my_file_system',
poke_interval=10)
python_task = PythonOperator(task_id='populate_data', python_callable=my_func)
create_table >> file_sensing_task >> python_task
关于python - 根据文件系统更改触发 Airflow dag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65019365/
我按如下方式创建我的 Airflow DAG: dag = DAG(...) 但在多个教程和类(class)中,我看到他们像这样使用 with ... as 子句: with DAG(...) as
我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个相互独立的任务,而我有另一个 DAG,只有在所有 10 个任务都运行
下面是 Airflow DAG 代码。当 Airflow 在本地托管和在云 Composer 上托管时,它都能完美运行。但是,DAG 本身在 Composer UI 中不可单击。我发现了一个类似的问题
我有兴趣在使用 https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-when-t
我有一个 DAG(有向无环图),其顶点具有黑色或白色两种颜色中的任何一种。我需要将尽可能多的黑色顶点与图形应保持非循环的约束合并在一起。因此最终的 DAG 应该有最小值。的黑色顶点。这个问题的最佳算法
我正在尝试根据用户输入在 Airflow 中生成动态工作流。我知道可以根据文件和数据库中的数据选择它,但在所有这些情况下,工作流不会直接依赖于用户输入,如果多个用户使用相同的 dag,那么在这种情况下
我正在尝试拥有一个主 dag,它将根据我的需要创建更多 dags。我在 airflow.cfg 的 dags_folder 中有以下 python 文件。此代码在数据库中创建主 dag。该主 dag
我根据教程在 dags 文件夹中放置了一个 dag 文件,稍作修改,但它没有显示在 GUI 中或运行 airflow dags list 时。 最佳答案 回答我自己的问题:通过直接运行来检查 pyth
我根据教程在 dags 文件夹中放置了一个 dag 文件,稍作修改,但它没有显示在 GUI 中或运行 airflow dags list 时。 最佳答案 回答我自己的问题:通过直接运行来检查 pyth
有调用主 dag 中不同 dags 的任务列表。我正在使用 TriggerDagrunoperator 来完成此操作。但面临一些问题。 TriggerDagrunoperator 不会等待外部 dag
我设置了 Airflow 并运行一些 DAG,计划每天一次“0 0 * * *”。 我想检查下一次安排运行特定 dag 的时间,但我看不到我可以在管理员中的什么地方执行此操作。 最佳答案 如果你想使用
我通过包管理器在我的计算机上安装了 llc 程序(当然我已经安装了 LLVM,6.0.0 版本)。另外,我从源代码构建了它。我想要的是查看由 llvm 生成的 DAG。但是,不幸的是,我在 llc-d
我在 spark 中有一个操作,应该对数据框中的几列执行。通常,有 2 种可能性来指定此类操作 硬编码 handleBias("bar", df) .join(handleBias("baz",
Airflow 似乎跳过了我添加到/usr/local/airflow/dags 的 dags。 当我跑 airflow list_dags 输出显示 [2017-08-06 17:03:47,220
非常喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到错误:“{jobs.py:538} 错误 - DAG 运行因 DAG 陷入僵局:TEST_SCHEDULER_DAG”。 这
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我想查找特定执行日期的特定 dag 的所有 dag 运行。 当我阅读文档时,有这个功能:dag_runs = DagRun.find(dag_id=self.dag_name, execution_d
我有一个 python DAG Parent Job和 DAG Child Job . Child Job中的任务应该在成功完成 Parent Job 时触发每天运行的任务。如何添加外部作业触发器?
我有一个由 TriggerDagRunOperator 触发的 DAG。它似乎运行良好,除非我尝试从 Airflow GUI 中“标记失败”或“标记成功”。当我这样做时,它总是尝试将更改应用到所有以前
Airflow 正在将所有 dags 加载到数据库中,但不会触发它们。 日志文件显示以下错误 [2020-01-05 02:55:06,226] {{dagbag.py:436}} [2020-0
我是一名优秀的程序员,十分优秀!