Kafka 是否可以在ksqlDB中创建一个流来发出一个ID,该ID可以检测连接在一起的3个不同表的更改?

cxfofazt  于 2023-02-11  发布在  Apache
关注(0)|答案(1)|浏览(120)

是否可以创建一个流来检测3个不同表上的更改?例如,我有一个表A,其中包含表B和表C的ID。如果我正确构造了连接查询,那么如果表B或表C中有更改,我是否可以发出一个包含表A ID的事件?

    • 表A**
  • 身份证
  • b_id
  • c_id
  • 字段_abc
  • 字段_xyz
    • 表B**
  • 身份证
    • 表C**
  • 身份证

我想要一个流,将发出表A的id的,如果有任何变化,这3个表。这是可能的吗?
例如,如果字段field_abc、foo或bar要更改,我希望将Table A的id发送到流。

wnavrhmk

wnavrhmk1#

我最近遇到了一个和你描述的类似的问题。由于ksqlDB的限制,目前使用流或表是不可能的。尽管如此,我们还是找到了一种方法来达到同样的结果。
我们的解决方案是使用连接器创建一个自定义查询,该连接器创建一个3向连接表并组合这3个表中更新的字段。

CREATE SOURCE CONNECTOR xyz_change WITH (
    'connector.class'          = '${v_connector_class}',
    'connection.url'           = '${v_connection_url}',
    'connection.user'          = '${v_connection_user}',
    'connection.password'      = '${v_connection_pass}',
    'topic.prefix'             = 'jdbc_abc_change',
    'mode'                     = 'timestamp+incrementing',
    'numeric.mapping'          = 'best_fit',
    'incrementing.column.name' = 'id',
    'timestamp.column.name'    = 'last_modified',
    'key'                      = 'id',
    'key.converter'            = '${v_converter_long}',
    'query'                    = 'select id, last_modified from(select a.id as id, GREATEST(a.last_modified, COALESCE(b.last_modified,from_unixtime(0)), COALESCE(c.last_modified,from_unixtime(0))) as last_modified  from aaa a  LEFT JOIN bbb b on a.fk_id = b.id  LEFT JOIN ccc c on a.fk_id = c.id ) sub'
);

有了它,你就可以创建任何你需要的流/表了。

相关问题