gpt4 book ai didi

sql - DBMS 级别的管道和过滤器 : Splitting the MERGE output stream

转载 作者:行者123 更新时间:2023-12-01 21:59:47 25 4
gpt4 key购买 nike

设想

我们有一个非常标准的数据导入过程,我们在其中加载staging表,然后 MERGE它变成了 target table 。

新要求(绿色)涉及捕获导入数据的子集
成一个单独的 queue表进行完全不相关的处理。

Scenario schema

“挑战”

(1) 子集由一组记录组成:那些
新插入target仅表。

(2) 子集是一些插入列的投影,但也
至少一列仅存在于源中(staging table )。

(3) MERGE语句已经使用了 OUTPUT..INTO条款
严格记录$action s 由 MERGE 拍摄,这样我们就可以PIVOT结果和COUNT插入次数、更新次数和
用于统计目的的删除。我们真的不喜欢缓冲
像这样对整个数据集进行操作,并且更喜欢聚合
即时的总和。不用说,我们不想添加更多数据到
OUTPUT table 。

(4) 我们不想做 MERGE 的匹配工作
无论出于何种原因,即使是部分原因,第二次执行。这target表真的很大,我们不能索引所有的东西,而且
操作通常非常昂贵(几分钟,而不是几秒钟)。

(5) 我们不考虑对 MERGE 的任何输出进行往返。到
客户端只是为了让客户端可以将其路由到 queue经过
立即寄回。数据必须留在服务器上。

(6) 我们希望避免在临时存储中缓冲整个数据集
之间stagingqueue .

最好的方法是什么?

失败

(a) 仅将插入的记录入队的要求阻止了我们
从定位到 queue表直接在 OUTPUT..INTO的条款MERGE ,因为它不允许任何 WHERE条款。我们可以使用一些CASE标记不需要的记录以供后续删除的技巧
来自 queue没有处理,但这似乎很疯狂。

(b) 因为有些列是为 queue 准备的不要出现在target表,我们不能简单地在目标上添加插入触发器
表加载 queue . “数据流拆分”必须更快发生。

(c) 因为我们已经使用了 OUTPUT..INTO MERGE中的条款, 我们
无法添加第二个 OUTPUT子句和嵌套 MERGEINSERT..SELECT加载队列。这是一种耻辱,因为它
感觉像是对有用的东西的完全任意限制
否则很好; SELECT仅过滤具有$action我们想要 ( INSERT ) 和 INSERT他们在 queue在一个
陈述。因此,DBMS 理论上可以避免缓冲整个
数据集并简单地将其流式传输到 queue . (注:我们没有追求
并且很可能它实际上并没有以这种方式优化计划。)

情况

我们觉得我们已经用尽了我们的选择,但决定转向 hive 思维
为了确定。我们能想到的只有:

(S1) 创建 VIEWtarget还包含可为空的表
用于 queue 的数据的列只有,并拥有SELECT语句将它们定义为 NULL .然后,设置 INSTEAD OF填充 target 的触发器表和queue适本地。最后,连接 MERGE以定位 View 。这个
有效,但我们不是这个结构的粉丝——它绝对是
看起来很棘手。

(S2) 放弃,使用临时表缓冲整个数据集
另一个 MERGE..OUTPUT .后 MERGE , 立即复制数据
(再次!)从临时表到 queue .

最佳答案

我的理解是,主要障碍是 OUTPUT 的限制。 SQL Server 中的子句。它允许一个 OUTPUT INTO table和/或一个 OUTPUT将结果集返回给调用者。

你想保存MERGE的结果以两种不同的方式声明:

  • MERGE 影响的所有行用于收集统计信息
  • 仅插入行 queue


  • 简单变体

    我会使用你的 S2 解决方案。至少开始。它易于理解和维护,并且应该非常高效,因为最耗费资源的操作( MERGETarget 本身只会执行一次)。下面有第二个变体,比较它们在真实数据上的表现会很有趣。

    所以:
  • 使用 OUTPUT INTO @TempTableMERGE
  • 要么INSERT来自 @TempTable 的所有行进入 Stats或在插入前聚合。如果您只需要汇总统计数据,则可以汇总该批次的结果并将其合并到最终的 Stats 中。而不是复制所有行。
  • INSERT进入 Queue只有来自 @TempTable 的“插入”行.

  • 我将从@i-one 的答案中获取样本数据。

    架构
    -- I'll return to commented lines later

    CREATE TABLE [dbo].[TestTarget](
    -- [ID] [int] IDENTITY(1,1) NOT NULL,
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL
    );

    CREATE TABLE [dbo].[TestStaging](
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL,
    [baz] [varchar](10) NULL
    );

    CREATE TABLE [dbo].[TestStats](
    [MergeAction] [nvarchar](10) NOT NULL
    );

    CREATE TABLE [dbo].[TestQueue](
    -- [TargetID] [int] NOT NULL,
    [foo] [varchar](10) NULL,
    [baz] [varchar](10) NULL
    );

    样本数据
    TRUNCATE TABLE [dbo].[TestTarget];
    TRUNCATE TABLE [dbo].[TestStaging];
    TRUNCATE TABLE [dbo].[TestStats];
    TRUNCATE TABLE [dbo].[TestQueue];

    INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
    VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

    INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
    VALUES
    ('A', 'A_'),
    ('B', 'B?');

    合并
    DECLARE @TempTable TABLE (
    MergeAction nvarchar(10) NOT NULL,
    foo varchar(10) NULL,
    baz varchar(10) NULL);

    MERGE INTO TestTarget AS Dst
    USING TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
    Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action AS MergeAction, inserted.foo, Src.baz
    INTO @TempTable(MergeAction, foo, baz)
    ;

    INSERT INTO [dbo].[TestStats] (MergeAction)
    SELECT T.MergeAction
    FROM @TempTable AS T;

    INSERT INTO [dbo].[TestQueue]
    ([foo]
    ,[baz])
    SELECT
    T.foo
    ,T.baz
    FROM @TempTable AS T
    WHERE T.MergeAction = 'INSERT'
    ;

    SELECT * FROM [dbo].[TestTarget];
    SELECT * FROM [dbo].[TestStats];
    SELECT * FROM [dbo].[TestQueue];

    结果
    TestTarget
    +-----+-----+
    | foo | bar |
    +-----+-----+
    | A | AA |
    | B | BB |
    | C | CC |
    +-----+-----+

    TestStats
    +-------------+
    | MergeAction |
    +-------------+
    | INSERT |
    | UPDATE |
    | UPDATE |
    +-------------+

    TestQueue
    +-----+-----+
    | foo | baz |
    +-----+-----+
    | C | CCC |
    +-----+-----+

    第二种变体

    在 SQL Server 2014 Express 上测试。
    OUTPUT子句可以将其结果集发送到表和调用者。所以, OUTPUT INTO可以进 Stats如果我们直接包装 MERGE语句转换成存储过程,然后我们可以使用 INSERT ... EXECQueue .

    如果您检查执行计划,您会看到 INSERT ... EXEC无论如何都会在幕后创建一个临时表(另见 The Hidden Costs of INSERT EXEC by
    Adam Machanic),所以我希望当您显式创建临时表时,整体性能将类似于第一个变体。

    还有一个问题要解决: Queue表应该只有“插入”行,而不是所有受影响的行。要实现这一点,您可以在 Queue 上使用触发器。表以丢弃“插入”以外的行。另一种可能性是使用 IGNORE_DUP_KEY = ON 定义唯一索引。并以这样一种方式准备数据,即“非插入”行将违反唯一索引并且不会插入到表中。

    所以,我将添加一个 ID IDENTITY列到 Target表,我会添加一个 TargetID列到 Queue table 。 (在上面的脚本中取消注释它们)。
    另外,我将为 Queue 添加一个索引。 table :
    CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
    (
    [TargetID] ASC
    ) WITH (
    PAD_INDEX = OFF,
    STATISTICS_NORECOMPUTE = OFF,
    SORT_IN_TEMPDB = OFF,
    IGNORE_DUP_KEY = ON,
    DROP_EXISTING = OFF,
    ONLINE = OFF,
    ALLOW_ROW_LOCKS = ON,
    ALLOW_PAGE_LOCKS = ON)

    重要的部分是 UNIQUEIGNORE_DUP_KEY = ON .

    这是 MERGE 的存储过程:
    CREATE PROCEDURE [dbo].[TestMerge]
    AS
    BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    MERGE INTO dbo.TestTarget AS Dst
    USING dbo.TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
    Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action INTO dbo.TestStats(MergeAction)
    OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID,
    inserted.foo,
    Src.baz
    ;

    END

    用法
    TRUNCATE TABLE [dbo].[TestTarget];
    TRUNCATE TABLE [dbo].[TestStaging];
    TRUNCATE TABLE [dbo].[TestStats];
    TRUNCATE TABLE [dbo].[TestQueue];

    -- Make sure that `Queue` has one special row with TargetID=0 in advance.
    INSERT INTO [dbo].[TestQueue]
    ([TargetID]
    ,[foo]
    ,[baz])
    VALUES
    (0
    ,NULL
    ,NULL);

    INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
    VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

    INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
    VALUES
    ('A', 'A_'),
    ('B', 'B?');

    INSERT INTO [dbo].[TestQueue]
    EXEC [dbo].[TestMerge];

    SELECT * FROM [dbo].[TestTarget];
    SELECT * FROM [dbo].[TestStats];
    SELECT * FROM [dbo].[TestQueue];

    结果
    TestTarget
    +----+-----+-----+
    | ID | foo | bar |
    +----+-----+-----+
    | 1 | A | AA |
    | 2 | B | BB |
    | 3 | C | CC |
    +----+-----+-----+

    TestStats
    +-------------+
    | MergeAction |
    +-------------+
    | INSERT |
    | UPDATE |
    | UPDATE |
    +-------------+

    TestQueue
    +----------+------+------+
    | TargetID | foo | baz |
    +----------+------+------+
    | 0 | NULL | NULL |
    | 3 | C | CCC |
    +----------+------+------+
    INSERT ... EXEC期间会有额外提示:
    Duplicate key was ignored.

    如果 MERGE更新了一些行。当唯一索引在 INSERT 期间丢弃某些行时发送此警告消息。由于 IGNORE_DUP_KEY = ON .

    A warning message will occur when duplicate key values are inserted into a unique index. Only the rows violating the uniqueness constraint will fail.

    关于sql - DBMS 级别的管道和过滤器 : Splitting the MERGE output stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34533204/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com