我使用flink从postgresql数据库中读取数据,该数据库不断更新新数据。目前,我可以使用flink的jdbccatalog从这个数据库进行一次性查询。
我想对这个数据库运行一个连续查询,但是因为sql源不是一个无界输入,所以我的查询只运行一次就停止了。据我所知,我可以:
以暴力的方式重复运行这些查询,可能使用迭代。
使用jdbcdynamictablesource,因为连续查询是在动态表上完成的。
第二种解决方案是理想的。但是,我不知道如何在jdbcdynamictablesource上运行连续查询。我能够用所需的连接选项示例化jdbcdynamictablesource,但是我应该如何对其运行连续查询以生成动态表呢?从本质上讲,如何使用jdbcdynamictablesource?
任何建议/代码样本将不胜感激!
1条答案
按热度按时间ubby3x7f1#
我认为有4种方法可以解决你的问题:
将changelog数据捕获(cdc)与debezium一起使用。疾病预防控制中心会查看你的博士后档案,并产生一系列的变化。一些flink连接器已经可以用来解释它,并从中构建一个表。这应该是你喜欢的方式,但它需要一些管理权,你的postgres的示例我相信。
使用postgres的listen/notify,通过管道将其传输到消息队列,在flink中进行解释,并进行一些重复数据消除。不过,这种技术似乎复杂而脆弱。
使用kafka connect的jdbc连接器,配置为使用
incrementing.column.name
设置为递增的主键,或使用触发器更新的上次更改时间戳。但你需要Kafka。不是实时的,但您可以将轮询间隔缩短到每秒一次(确保轮询列上有索引)。这里的冰山就是当失败时会发生什么。我认为1)和3)应该可以。此外,还有一些性能问题:1)将减慢对postgres的写入速度(来自复制i/o开销),3)可能会减慢对postgres的读取速度(来自常量轮询)
所有的解决方案都涉及kafka或消息队列。您也可以尝试4):
实现3)并在flink sourcefunction中自己轮询数据库。确保使用有状态的源函数,并将查询偏移量作为valuestate,以便在发生故障时可以在正确的偏移量处重新启动。处理重复查询的一些想法:将源代码的并行度设置为1,或者轮询对并行度进行模化的键。
编辑:5。就像1.中的debezium cdc,但在flink中被 Package 为来源。这可能是最好的选择。