如何使用自定义数据源从关系数据库中读取数据。我对Flink是个新手。添加新的自定义数据源时遇到问题。所以请帮助我添加自定义数据源,并从源数据库中连续读取数据。
qmelpv7a1#
正如承志所建议的,关系数据库的设计不是以流式方式处理的,最好使用kafka、kinesis或其他系统。但是,您可以编写一个自定义源函数,使用jdbc连接来获取数据。它必须不断地向数据库查询任何新数据。这里的问题是,您需要一种方法来确定哪些数据已经读取/处理,哪些没有。在我的脑海中,你可以使用一些东西,比如记住上次处理的主键是什么,然后在后续的查询中使用它,比如: SELECT * FROM events WHERE event_id > $last_processed_event_id; 或者你可以清除 events 某些事务中的表,如:SELECT * FROM unprocessed_events; DELETE FROM unprocessed_events WHERE event_id IN $PROCESSED_EVENT_IDS; event_id 可以是任何允许您唯一标识记录的内容,也可以是某个时间戳或一组字段。另一件需要考虑的事情是,您必须手动处理(的)检查点 last_processed_even_id 如果你想提供任何合理的 at-least-once 或者 exactly-once 保证。
SELECT * FROM events WHERE event_id > $last_processed_event_id;
events
event_id
last_processed_even_id
at-least-once
exactly-once
1条答案
按热度按时间qmelpv7a1#
正如承志所建议的,关系数据库的设计不是以流式方式处理的,最好使用kafka、kinesis或其他系统。
但是,您可以编写一个自定义源函数,使用jdbc连接来获取数据。它必须不断地向数据库查询任何新数据。这里的问题是,您需要一种方法来确定哪些数据已经读取/处理,哪些没有。在我的脑海中,你可以使用一些东西,比如记住上次处理的主键是什么,然后在后续的查询中使用它,比如:
SELECT * FROM events WHERE event_id > $last_processed_event_id;
或者你可以清除events
某些事务中的表,如:SELECT * FROM unprocessed_events; DELETE FROM unprocessed_events WHERE event_id IN $PROCESSED_EVENT_IDS;
event_id
可以是任何允许您唯一标识记录的内容,也可以是某个时间戳或一组字段。另一件需要考虑的事情是,您必须手动处理(的)检查点
last_processed_even_id
如果你想提供任何合理的at-least-once
或者exactly-once
保证。