- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我在 MS SQL 中有几个表,这些表每秒更新一次,查询或多或少看起来像这样
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
假设选择内连接查询结果为 5 条记录,如下所示。
如果查询是第一次运行 ${lastUpdateTime}
和 ${lastG_ID}
设置为 0,它将返回 5 条以下的记录。处理记录后,查询将存储 max(G_ID)
即 5 和 max(UpdateTime)
即 etl_stat
中的 1512010479表。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
如果表再添加5条新记录,如下所示:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
查询将首先读取 max(G_ID)
和 max(UpdateTime)
来自 etl_stat table
并将按如下方式构建查询 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
, 因此查询仅返回 5 个增量记录,如下所示。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
所以每次运行查询时,它应该首先读取 max(G_ID)
和 max(UpdateTime)
来自 etl_stat
表格和框架选择内部连接查询如上所示,并获取增量更改。
使用 SPARK SQL 的架构
我已经按如下方式实现了上述用例:
1) Spark JDBC读取phoenix表得到max(G_ID)
和 max(UpdateTime)
来自 etl_stat
表。
2) Spark JDBC 像这样构建选择内部连接查询 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3) Spark JDBC 运行第 2 步内连接查询,从 MS SQL 服务器读取增量消息处理记录并插入到 HBase。
4) 成功插入HBase后,Spark更新etl_stat
表与最新G_ID
即 10 和 UpdateTime
即 1512010500。
5) 此作业已按 cron 计划每 1 分钟运行一次。
成为使用 NIFI 的架构
我想把这个用例移到 Nifi 上,我想用 NiFi 从 MS SQL DB 中读取记录并将这条记录发送到 Kafka。
发布到Kafka成功后,NiFi会在数据库中保存G_ID和UpdateTime。
一旦消息到达 Kafka,Spark Streaming 将从 Kafka 读取消息并使用现有业务逻辑保存到 HBase。
在每次运行时,Nifi 处理器都应该使用 max(G_ID)
构建选择内部连接查询和 max(UpdateTime)
以获取增量记录并发布到 Kafka。
我是 Nifi/HDF 的新手。我需要您的帮助和指导才能使用 Nifi/HDF 实现这一点。如果您对此用例有更好的解决方案/架构,请提出建议。
抱歉发了这么长的帖子。
最佳答案
您所描述的是 JDBC Kafka Connect connector 的内容开箱即用。设置您的配置文件,加载它,然后开始。完毕。 Kafka Connect 是 Apache Kafka 的一部分。无需额外的工具和技术。
您可能还需要考虑适当的更改数据捕获 (CDC)。对于专有 RDBMS(Oracle、DB2、MS SQL 等),您可以使用 GoldenGate、Attunity、DBVisit 等商业工具。对于开源 RDBMS(例如 MySQL、PostgreSQL),您应该查看开源 Debezium工具。所有这些 CDC 工具都直接与 Kafka 集成。
关于sql-server - 如何使用 Nifi/HDF 从 MS SQL 读取增量记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47591418/
我有一个网站。 必须登录才能看到里面的内容。 但是,我使用此代码登录。 doc = Jsoup.connect("http://46.137.207.181/Account/Login.aspx")
我正在尝试为我的域创建一个 SPF 记录并使我的邮件服务器能够对其进行评估。我在邮件服务器上使用 Postfix 并使用 policyd-spf (Python) 来评估记录。目前,我通过我的私有(p
我需要为负载平衡的 AWS 站点 mywebsite.com 添加 CName 记录。记录应该是: @ CNAME mywebsite.us-east-1.elb.amazon
我目前正在开发一个相当大的多层应用程序,该应用程序将部署在海外。虽然我希望它在解聚后不会折叠或爆炸,但我不能 100% 确定这一点。因此,如果我知道我可以请求日志文件,以准确找出问题所在以及原因,那就
我使用以下命令从我的网络摄像头录制音频和视频 gst-launch-0.10 v4l2src ! video/x-raw-yuv,width=640,height=480,framerate=30/1
我刚刚开始使用 ffmpeg 将视频分割成图像。我想知道是否可以将控制台输出信息保存到日志文件中。我试过“-v 10”参数,也试过“-loglevel”参数。我在另一个 SO 帖子上看到使用 ffmp
我想针对两个日期查询我的表并检索其中的记录。 我这样声明我的变量; DECLARE @StartDate datetime; DECLARE @EndDate datetime; 并像这样设置我的变量
在 javascript 中,我可以使用简单的 for 循环访问对象的每个属性,如下所示 var myObj = {x:1, y:2}; var i, sum=0; for(i in myObj) s
最近加入了一个需要处理大量代码的项目,我想开始记录和可视化调用图的一些流程,让我更好地理解一切是如何组合在一起的。这是我希望在我的理想工具中看到的: 每个节点都是一个函数/方法 如果一个函数可以调用另
如何使用反射在F#中创建记录类型?谢谢 最佳答案 您可以使用 FSharpValue.MakeRecord [MSDN]创建一个记录实例,但是我认为F#中没有任何定义记录类型的东西。但是,记录会编译为
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 3年前关闭。 Improve thi
我是 Sequelize 的新手并且遇到了一些语法问题。我制作了以下模型: // User sequelize.define('user', { name: { type: DataTyp
${student.name} Notify 这是我的output.jsp。请注意,我已经放置了一个链接“Notify”以将其转发到 display.jsp 上。但我不确定如何将 Stud
例如,这是我要做的查询: server:"xxx.xxx.com" AND request_url:"/xxx/xxx/xxx" AND http_X_Forwarded_Proto:(https O
我一直在开发大量 Java、PHP 和 Python。所有这些都提供了很棒的日志记录包(分别是 Log4J、Log 或logging)。这在调试应用程序时有很大帮助。特别是当应用程序 headless
在我的Grails应用程序中,我异步运行一些批处理过程,并希望该过程记录各种状态消息,以便管理员以后可以检查它们。 我考虑过将log4j JDBC附加程序用作最简单的解决方案,但是据我所知,它不使用D
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
如果我有一条包含通用字段的记录,在更改通用字段时是否有任何方法可以模仿方便的 with 语法? 即如果我有 type User = // 'photo can be Bitmap or Url {
假设我有一个名为 Car 的自定义对象。其中的所有字段都是私有(private)的。 public class Car { private String mName; private
当记录具有特定字段时,我需要返回 true 的函数,反之亦然。示例: -record(robot, {name, type=industrial, ho
我是一名优秀的程序员,十分优秀!