sql server—如何使用nifi/hdf从ms sql读取增量记录

bybem2ql  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(646)

我在mssql中有一些表,这些表每秒钟都会更新一次,查询或多或少是这样的

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}

假设selectinnerjoin查询结果为5条记录,如下所示。
如果查询第一次运行 ${lastUpdateTime} 以及 ${lastG_ID} 设置为0,将返回5条以下的记录。处理完记录后,查询将存储 max(G_ID) i、 e.5和 max(UpdateTime) i、 e.1512010479英寸 etl_stat table。

G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID

如表所示,再添加5条新记录:

G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID 
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID

查询将首先读取 max(G_ID) 以及 max(UpdateTime)etl_stat table 并将框架查询如下 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5 ,因此查询只返回5条delta记录,如下所示。

G_ID        UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID

因此,每次运行查询时,它都应该首先读取 max(G_ID) 以及 max(UpdateTime)etl_stat 表和框架中的selectinnerjoin查询,如上图所示,并获取增量更改。
使用sparksql的as-is体系结构
我实现了上述用例,如下所示:
1) spark jdbc读取phoenix表以获得 max(G_ID) 以及 max(UpdateTime)etl_stat table。
2) sparkjdbc将selectinnerjoin查询框架如下 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5 3) sparkjdbc运行步骤2内部连接查询,从mssqlserver读取delta消息,处理记录并插入hbase。
4) 成功插入hbase后,spark更新 etl_stat 最新版本的表格 G_ID i、 e.10和 UpdateTime i、 e.1512010500。
5) 此作业已安排为每1分钟运行一次。
使用nifi的准体系结构
我想将这个用例移到nifi,我想使用nifi从mssqldb读取记录并将这个记录发送给kafka。
在成功发布到kafka之后,nifi会在数据库中保存g\u id和updatetime。
消息到达kafka后,spark streaming将从kafka读取消息,并使用现有业务逻辑保存到hbase。
在每次运行时,nifi处理器都应该使用 max(G_ID) 以及 max(UpdateTime) 为了得到德尔塔的记录并出版给Kafka。
我是nifi/hdf的新手。我需要你的帮助和指导,以便使用nifi/hdf实现这一点。如果您对此用例有更好的解决方案/体系结构,请提出建议。
抱歉发了这么长的帖子。

f45qwnt8

f45qwnt81#

您所描述的是jdbc-kafka-connect连接器开箱即用的功能。设置你的配置文件,加载它,然后开始。完成。Kafka连接是ApacheKafka的一部分。不需要额外的工具和技术。
您可能还需要考虑适当的变更数据捕获(cdc)。对于专有的rdbms(oracle、db2、mssql等),您可以使用goldengate、attunity、dbvisit等商业工具。对于开源rdbms(例如mysql、postgresql),您应该看看开源debezium工具。所有这些cdc工具都直接与Kafka集成。

相关问题