我有一个简单的Apache·Flink的工作:
datasource(apache kafka)-filter-keyby-cep pattern(带计时器)-patternProcessFunction-keyedprocessfunction(这里我有valuestate(boolean)和注册计时器5分钟。如果valuestate不为null,我将更新valuestate(收集器中没有要发送的内容)并更新计时器。如果valuestate为null,我将保存为true,然后在收集器中发送输入事件并设置计时器。当ontimer方法就绪时,我将清除valuestate)-sink(apachekafka)。
作业设置:
检查点间隔:5000ms
增量检查点:true
语义:正好一次
状态后端:rocksdb
平行度:4
按理说,我的工作做得很好,但我有一些问题。
我在集群上进行了两个测试(2个作业管理器和3个任务管理器):
第一次测试:
我开始工作,连接到一个空的apachekafka主题,然后在flinkwebui中看到检查点统计:
1) 最新确认-触发时间=5000ms(如我的检查点间隔)
2) 状态大小=每5秒间隔340 kb
3) 所有状态均已完成(蓝色)。
第二次试验:
我开始用ApacheKafka主题中的其他键(从“1”到integer.max\u value)发送json消息。发送速度是:1000条消息/秒,然后我在flink web ui中看到检查点统计:
1) 最新确认-触发时间=1-6分钟
我的问题1:为什么时间在增长?是坏的还是好的?
2) 州的规模不断扩大。我用Kafka发了大约10分钟的信息(1000 x 60 x 10=600000条信息)。发送后的状态大小为100mb-150mb。
3) 送完信后,我等了大约一个小时,看到:
最新确认-触发时间=5000ms(如我的检查点间隔)
状态大小为:每5秒间隔100mb-150mb。
我的问题2:为什么不减少?毕竟,我检查了我的作业日志,看到了600000条记录:清除了键的valuestate(ontimer方法成功),作业逻辑(请参阅说明my keyedprocessfunction)运行良好
我想做什么?
1) 设置检查点之间的暂停
2) 禁用增量检查点
3) 启用异步检查点(在flink-conf.yml中)
它不给任何改变!!!
我的问题3:我该怎么办??因为在工业服务器上,速度是:1000万条消息/小时并且检查点大小会立即增加。
暂无答案!
目前还没有任何答案,快来回答吧!