- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
考虑这个示例 - 您需要从源数据库加载 table1,执行一些通用转换(例如为带时间戳的列转换时区)并将结果数据写入 Snowflake。这是一个简单的操作,可以使用 3 个 dagster 操作来实现。
现在,假设您需要使用 100 个表来做同样的事情。你会如何处理 dagster?您真的需要创建 100 个作业/图表吗?或者您可以创建一个将执行 100 次的作业吗?您能否限制同时运行这些作业的数量?
最佳答案
执行此操作有两个主要选项:
使用此设置,您的所有 ETL 都将在一个作业中发生。您将有一个初始操作,它会为您想要为其执行此过程的每个表名生成一个 DynamicOutput,并将其提供给将在每个单独的 DynamicOutput 上运行的一组操作(可能组织成一个图表)。
根据您使用的执行器,可以限制整体步骤并发(例如,默认 multiprocess_executor 支持此选项)。
from dagster import job, op, graph
import pandas as pd
@op(config_schema={"table_name": str})
def extract_table(context) -> pd.DataFrame:
table_name = context.op_config["table_name"]
# do some load...
return pd.DataFrame()
@op
def transform_table(table: pd.DataFrame) -> pd.DataFrame:
# do some transform...
return table
@op(config_schema={"table_name": str})
def load_table(context, table: pd.DataFrame):
table_name = context.op_config["table_name"]
# load to snowflake...
@job
def configurable_etl():
load_table(transform_table(extract_table()))
# this is what the configuration would look like to extract from table
# src_foo and load into table dest_foo
configurable_etl.execute_in_process(
run_config={
"ops": {
"extract_table": {"config": {"table_name": "src_foo"}},
"load_table": {"config": {"table_name": "dest_foo"}},
}
}
)
在这里,您通过为相关操作提供配置架构来创建一个可以指向源表和目标表的作业。根据这些配置选项(当您通过运行配置创建运行时提供),您的作业将在不同的源/目标表上运行。
该示例显示使用 python API 明确运行此作业,但如果您从 Dagit 运行它,您还可以在那里输入此配置的 yaml 版本。如果你想简化配置模式(如图所示,它嵌套得很漂亮),你总是可以创建一个 Config Mapping使界面更好:)
从这里,您可以通过为您的作业提供唯一标记并使用 QueuedRunCoordinator 来限制运行并发。限制该标签的最大并发运行数。
关于dagster - 是否可以使用 Dagster 创建动态作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69949073/
假设我在 Dagster 中有两个实体连接在一条管道上。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为实现此结果,我在数据满足无效条件时引发错
考虑这个示例 - 您需要从源数据库加载 table1,执行一些通用转换(例如为带时间戳的列转换时区)并将结果数据写入 Snowflake。这是一个简单的操作,可以使用 3 个 dagster 操作来实
考虑这个示例 - 您需要从源数据库加载 table1,执行一些通用转换(例如为带时间戳的列转换时区)并将结果数据写入 Snowflake。这是一个简单的操作,可以使用 3 个 dagster 操作来实
我已经开始在我们的 ML 管道中使用 Dagster,并且遇到了一些基本问题,我想知道我是否遗漏了一些微不足道的东西,或者这是否就是这样...... 假设我有一个简单的 ML 管道: Load raw
我已经开始在我们的 ML 管道中使用 Dagster,并且遇到了一些基本问题,我想知道我是否遗漏了一些微不足道的东西,或者这是否就是这样...... 假设我有一个简单的 ML 管道: Load raw
假设我使用以下实体创建了一个 Dagster 管道: 从文件执行 SQL 查询并获得结果 将结果写入表 我想对 10 个不同的表并行执行此操作。每个表都需要不同的 SQL 查询。 最好的方法是什么?
嗨,我正在尝试集成 Dagster进入正在进行的 Django 项目。我在向 Dagster 提供 Django 上下文(模型、应用程序等)方面遇到了困难。截至目前,我只是检查 dagit 是否存在于
我正在使用 dagster 0.11.3(撰写本文时最新版本) 我创建了一个如下所示的 Dagster 管道(另存为 pipeline.py): @solid def return_a(context
我正在使用 dagster 0.11.3(撰写本文时最新版本) 我创建了一个如下所示的 Dagster 管道(另存为 pipeline.py): @solid def return_a(context
我是一名优秀的程序员,十分优秀!