java—如何在ApacheFlink流媒体中从关系数据库读取数据

myzjeezk  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(324)

如何使用自定义数据源从关系数据库中读取数据。我对Flink是个新手。添加新的自定义数据源时遇到问题。所以请帮助我添加自定义数据源,并从源数据库中连续读取数据。

qmelpv7a

qmelpv7a1#

正如承志所建议的,关系数据库的设计不是以流式方式处理的,最好使用kafka、kinesis或其他系统。
但是,您可以编写一个自定义源函数,使用jdbc连接来获取数据。它必须不断地向数据库查询任何新数据。这里的问题是,您需要一种方法来确定哪些数据已经读取/处理,哪些没有。在我的脑海中,你可以使用一些东西,比如记住上次处理的主键是什么,然后在后续的查询中使用它,比如: SELECT * FROM events WHERE event_id > $last_processed_event_id; 或者你可以清除 events 某些事务中的表,如:
SELECT * FROM unprocessed_events; DELETE FROM unprocessed_events WHERE event_id IN $PROCESSED_EVENT_IDS; event_id 可以是任何允许您唯一标识记录的内容,也可以是某个时间戳或一组字段。
另一件需要考虑的事情是,您必须手动处理(的)检查点 last_processed_even_id 如果你想提供任何合理的 at-least-once 或者 exactly-once 保证。

相关问题