kafka jdbc连接器加载所有数据,然后增量加载

bpzcxfmw  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(394)

我试图弄清楚如何从一个查询中首先获取所有数据,然后使用kafka连接器以增量方式只获取更改。原因是我想将所有数据加载到ElasticSearch中,然后使es与我的kafka流保持同步。目前,我首先使用mode=bulk的连接器,然后将其更改为timestamp。这个很好用。
但是,如果我们想将所有数据重新加载到流和es,这意味着我们必须编写一些脚本,以某种方式清除或删除kafka流和es索引数据,修改connect ini以将模式设置为bulk,重新启动一切,给它时间加载所有数据,然后再次将脚本修改为timestamp模式,然后再次重新启动所有内容(之所以需要这样一个脚本,是因为有时,批量更新会通过一个我们还不能控制的etl过程纠正历史数据,而且这个过程不会更新时间戳)
有没有人做了类似的事情,找到了更优雅的解决方案?

jgovgodb

jgovgodb1#

如何首先从查询中获取所有数据,然后使用kafka连接器以增量方式只进行更改。
也许这对你有帮助。例如,我有一个表:

╔════╦═════════════╦═══════════╗
║ Id ║    Name     ║  Surname  ║
╠════╬═════════════╬═══════════╣
║  1 ║ Martin      ║ Scorsese  ║
║  2 ║ Steven      ║ Spielberg ║
║  3 ║ Christopher ║ Nolan     ║
╚════╩═════════════╩═══════════╝

在这种情况下,我将创建一个视图:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;

在kafka jdbc连接器的属性文件中,您可以使用:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS

因此,kafka jdbc连接器将采取以下步骤:
首先是exid=0的所有数据;
它将在connector.offsets文件中存储偏移值=0;
新行将插入到directors表中。
kafka jdbc连接器将执行: Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS 并且会注意到exid已经增加了。
数据将在Kafka流中更新。

j2datikz

j2datikz2#

很长一段时间后又回到这个主题。该方法能够解决这个问题,而且不必使用批量模式
停止连接器
擦除每个连接器jvm的偏移文件
(可选)如果您想执行完整的擦除和加载,您可能还想使用kafka/connect-utils/restapi删除主题(不要忘记state主题)
重新启动连接。

相关问题