如何使用kafka jdbc连接器跟踪具有特定列值的行(按id)?

cqoc49vn  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(233)

我有一个包含大量记录的表。有一列定义了记录的类型。我想收集该列中具有特定值的记录。有点:

Select * FROM myVeryOwnTable WHERE type = "VERY_IMPORTANT_TYPE"

我发现我不能用 WHERE 当我选择增量(+timestamp)模式时,在自定义查询中使用一个子句,否则我需要小心自己进行过滤。我想实现的背景是,我使用logstash将某种类型的数据从mysql传输到es。通过使用可以包含where子句的查询,很容易实现这一点。但是,使用kafka,在db中插入新行之后,我可以更快地(几乎立即)传输数据。
谢谢你的提示和建议。
多亏了@wardziniak,我才得以成功。

query=select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p
topic.prefix=test-mysql-jdbc-
incrementing.column.name=id

然而,我期待着一个主题 test-mysql-jdbc-myVeryOwnTable 所以我已经注册了我的消费者。但是,使用上面显示的查询将跳过表名,因此我的主题的名称与上面定义的前缀完全相同。所以我刚刚更新了我的属性 topic.prefix=test-mysql-jdbc-myVeryOwnTable 它似乎工作得很好。

kkih6yb8

kkih6yb81#

您可以在jdbc源连接器中使用子查询 query 财产。
jdbc源连接器配置示例:

{
    ...
    "query": "select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p",
    "incrementing.column.name": "id",
    ...
}

相关问题