gpt4 book ai didi

sql-server - 如何使用 Nifi/HDF 从 MS SQL 读取增量记录

转载 作者:可可西里 更新时间:2023-11-01 15:25:33 27 4
gpt4 key购买 nike

我在 MS SQL 中有几个表,这些表每秒更新一次,查询或多或少看起来像这样

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}

假设选择内连接查询结果为 5 条记录,如下所示。

如果查询是第一次运行 ${lastUpdateTime}${lastG_ID}设置为 0,它将返回 5 条以下的记录。处理记录后,查询将存储 max(G_ID)即 5 和 max(UpdateTime)etl_stat 中的 1512010479表。

 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID

如果表再添加5条新记录,如下所示:

 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID

查询将首先读取 max(G_ID)max(UpdateTime)来自 etl_stat table并将按如下方式构建查询 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5 , 因此查询仅返回 5 个增量记录,如下所示。

G_ID        UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID

所以每次运行查询时,它应该首先读取 max(G_ID)max(UpdateTime)来自 etl_stat表格和框架选择内部连接查询如上所示,并获取增量更改。

使用 SPARK SQL 的架构

我已经按如下方式实现了上述用例:

1) Spark JDBC读取phoenix表得到max(G_ID)max(UpdateTime)来自 etl_stat表。

2) Spark JDBC 像这样构建选择内部连接查询 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

3) Spark JDBC 运行第 2 步内连接查询,从 MS SQL 服务器读取增量消息处理记录并插入到 HBase。

4) 成功插入HBase后,Spark更新etl_stat表与最新G_ID即 10 和 UpdateTime即 1512010500。

5) 此作业已按 cron 计划每 1 分钟运行一次。

成为使用 NIFI 的架构

我想把这个用例移到 Nifi 上,我想用 NiFi 从 MS SQL DB 中读取记录并将这条记录发送到 Kafka。

发布到Kafka成功后,NiFi会在数据库中保存G_ID和UpdateTime。

一旦消息到达 Kafka,Spark Streaming 将从 Kafka 读取消息并使用现有业务逻辑保存到 HBase。

在每次运行时,Nifi 处理器都应该使用 max(G_ID) 构建选择内部连接查询和 max(UpdateTime)以获取增量记录并发布到 Kafka。

我是 Nifi/HDF 的新手。我需要您的帮助和指导才能使用 Nifi/HDF 实现这一点。如果您对此用例有更好的解决方案/架构,请提出建议。

抱歉发了这么长的帖子。

最佳答案

您所描述的是 JDBC Kafka Connect connector 的内容开箱即用。设置您的配置文件,加载它,然后开始。完毕。 Kafka Connect 是 Apache Kafka 的一部分。无需额外的工具和技术。

您可能还需要考虑适当的更改数据捕获 (CDC)。对于专有 RDBMS(Oracle、DB2、MS SQL 等),您可以使用 GoldenGate、Attunity、DBVisit 等商业工具。对于开源 RDBMS(例如 MySQL、PostgreSQL),您应该查看开源 Debezium工具。所有这些 CDC 工具都直接与 Kafka 集成。

关于sql-server - 如何使用 Nifi/HDF 从 MS SQL 读取增量记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47591418/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com