gpt4 book ai didi

snowflake-cloud-data-platform - 从外部源查询 dbt

转载 作者:行者123 更新时间:2023-12-04 16:26:23 26 4
gpt4 key购买 nike

我有以下问题:

  • 我有一个 AWS S3 管道,每天都会输出一个 json.gz 文件。
  • 我想用 dbt 把那个文件放到雪花里(没有雪管使用 atm)

  • 我已经通过创建存储集成设法做到了这一点,并且我已经使用我的角色(用于运行 dbt)手动创建了一个架构并在该架构上分配使用情况。到现在为止还挺好。
    然后我读到了这个:
    https://github.com/fishtown-analytics/dbt-external-tables
    问题是,这是正常运行的唯一方法,我必须更改我的 dbt profiles.yml,将默认架构设置为 S3_MIXPANEL,默认数据库为 RAW_DEV,使用 --target 'ingest_dev' 参数运行不同的目标和角色.
    我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用类似 {{ source() }} 这样的东西,这样我就可以以某种方式指出我的文档这是一个外部源。我认为这个 dbt-external-tables 并没有很好地解释我的情况?
    请任何人都可以帮助我并分享如何在不每次更改默认架构宏和 dbtprofiles.yml 的情况下正确地从外部阶段创建架构和查询?
    我已成功运行以下代码:
    {{
    config(
    materialized ='incremental',
    schema = generate_schema_name('S3_MIXPANEL')
    )
    }}

    SELECT
    metadata$filename as file_name,
    to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
    $1 as payload,
    CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at

    from

    @my_s3_stage


    {% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date>(
    SELECT
    max(event_date)
    FROM
    {{ this }}
    )
    {% endif %}

    {{ row_limit() }}
    编辑 22-06-20:
    我已经在我的模型中添加了 src_mixpanel.yml 文件并运行了 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,也许是愚蠢的问题,但我真的不知道如何安装您的软件包,所以我手动将您的所有宏添加到我的中)。
    现在,当我运行此代码时:
    dbt run-operation stage_external_sources
    version: 2

    sources:

    - name: s3_mixpanel
    database: RAW_DEV
    tables:
    - name: events
    external:
    location: '@my_s3_stage'
    auto_refresh: false # depends on your S3 setup
    partitions:
    - name: event_date
    expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
    data_type: date
    - name: file_name
    expression: metadata$filename
    data_type: string
    columns:
    - name: properties
    data_type: variant
    我收到一个错误:

    Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
    'dict object' has no attribute 'sources'

    最佳答案

    作为dbt-external-tables的维护者包,我将分享其固执的观点。该包认为您应该首先将所有外部源(S3 文件)作为外部表或使用雪管进行暂存,在一个过程中尽可能少地包含混淆逻辑。然后,您可以在 dbt 模型中选择它们作为源,以及所有必需的业务逻辑。
    如果我的理解是正确的,您将按如下方式在一个名为(例如)models/staging/mixpanel/src_mixpanel.yml 的文件中暂存您的 mixpanel 数据:

    version: 2

    sources:

    - name: s3_mixpanel
    database: raw_dev
    tables:
    - name: events
    external:
    location: '@my_s3_stage'
    file_format: "( type = json )" # or a named file format
    auto_refresh: false # depends on your S3 setup
    partitions:
    - name: event_date
    expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
    columns:
    - name: properties
    data_type: variant
    你可以从包中运行这个宏来创建外部表——如果你没有 auto_refresh,在创建之后,更新它的分区元数据。启用(见雪花 docs):
    dbt run-operation stage_external_sources
    然后,您可以在增量模型中从此源中进行选择,就像上面的模型一样。现在, event_date是这个外部表上的一个分区列,因此对其进行过滤 应该启用 Snowflake 修剪文件(尽管这在历史上对于动态的、子查询派生的过滤器是不一致的)。
    {{
    config(
    materialized ='incremental'
    )
    }}

    SELECT
    metadata$filename as file_name,
    event_date,
    value as payload,
    properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at

    from {{ source('s3_mixpanel', 'events' }}


    {% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date >(
    SELECT
    max(event_date)
    FROM
    {{ this }}
    )
    {% endif %}

    {{ row_limit() }}

    关于snowflake-cloud-data-platform - 从外部源查询 dbt,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62992304/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com