I have a flink application which queries the database for every 30 min, do some transformations and persist into database. I am trying to process 100k records, because of backpressure from jdbc write task, database querying task and one of transformation task is not getting checkpointed and timing out. Enabling unaligned checkpoints also doesn't work. Using flink version 1.14.5, parallelism 15.
task manager config:
taskmanager.memory.network.min: 128MB
taskmanager.memory.network.max: 128MB
taskmanager.memory.managed.size: 128MB
taskmanager.memory.task.heap.size: 768MB
taskmanager.memory.jvm-overhead.max: 256MB
taskmanager.memory.jvm-overhead.min: 256MB
taskmanager.memory.jvm-metaspace.size: 256MB
taskmanager.memory.framework.off-heap.size: 128MB
taskmanager.memory.framework.heap.size: 128MB
taskmanager.memory.task.off-heap.size: 256MB
checkpointing config:
checkpoint latency
checkpoint status task by task
Execution flow
Even with 10k records also, I am seeing this issue. missing something here?
Another flink application has solace as input source and the same jdbc write task. This is working fine even with unaligned checkpoints disabled and same flink configuration.
Thanks for the help.
1条答案
按热度按时间2eafrhcq1#
如果JDBC数据库是瓶颈,则可能是供应不足。
理想情况下,数据库查询应该异步完成,这样相关的任务就不会在i/o上阻塞。你可以用Flink的async i/o operator或Flink SQL查找连接来实现这一点。改变这一点可能会有所帮助。
一个更好的方法可能是将作业转换为基于CDC的。这种更加解耦的、纯粹的流架构更适合Flink。