flink 1.10.1中使用rocksdb状态后端的检查点问题

mepcadol  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(672)

我们的flink工作遇到了一个很难观察到的问题。
这项工作相当简单,它:
使用flink kinesis连接器从kinesis读取消息
对消息进行键控,并将它们分发给大约30个不同的cep操作符,以及一些自定义的windowfunction
从cep/windows发出的消息被转发到sink函数,该函数将消息写入sqs
我们正在运行flink 1.10.1 fargate,使用2个4vcpus/8gb容器,我们使用rocksdb状态后端,配置如下:

state.backend: rocksdb
state.backend.async: true
state.backend.incremental: false
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.files.open: 130048

作业以8的并行度运行。
当作业从冷启动时,它使用很少的cpu,检查点在2秒内完成。随着时间的推移,检查点的大小会增加,但时间在几秒钟内仍然非常合理:

在这段时间里,我们可以观察到TaskManager的cpu使用率由于某些原因而缓慢增长:

最终,检查点时间将开始增加到几分钟,然后将开始反复超时(10分钟)。此时:
检查点大小(完成时)约为60mb
cpu使用率很高,但不是100%(通常在60-80%左右)
查看进行中的检查点,通常95%以上的操作员在30秒内完成检查点,但少数人只会坚持,永远不会完成。sqsFlume将始终包含在本文件中,但 SinkFunction 他并不富有,也没有国家。
在这些操作员上使用背压监视器报告背压高
最终,这种情况解决了以下两种方法之一:
由于失败的检查点比例阈值,有足够多的检查点无法触发作业失败
检查点最终开始成功,但永远不会回到最初的5-10秒(当状态大小更像30mb而不是60mb时)
我们真的不知道如何调试这个。与你在这里的一些问题中看到的那种状态相比,我们的状态似乎非常小。我们的音量也很低,我们经常低于100条记录/秒。
我们非常感谢您对我们可以进行调试的领域的任何意见。
谢谢,

bf1o4zei

bf1o4zei1#

将此属性添加到配置中:

state.backend.rocksdb.checkpoint.transfer.thread.num: {threadNumberAccordingYourProjectSize}

如果不添加此项,它将为1(默认值)
链接:https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/rocksdboptions.java#l62

lo8azlld

lo8azlld2#

有几点:
随着时间的推移,这个国家逐渐发展起来并不罕见。也许您的密钥空间正在增长,并且您正在为每个密钥保留一些状态。如果您依赖于statettl使过时状态过期,那么可能它的配置方式不允许它像您期望的那样快速清除过期状态。在排除某些可能的匹配之前,不经意地创建cep模式也相对容易,这些模式需要将某些状态保持很长时间。
下一个好的步骤是确定背压的原因。最常见的原因是作业没有足够的资源。随着时间的推移,大多数工作逐渐需要更多的资源,例如,随着被管理的用户数量的增加。例如,您可能需要增加并行性,或为示例提供更多内存,或增加接收器的容量(或网络到接收器的速度),或为rocksdb提供更快的磁盘。
除了供给不足外,造成背压的其他原因还包括
正在用户函数中执行阻塞i/o
大量定时器同时启动
不同源之间的事件时间偏差导致大量状态被缓冲
数据倾斜(热键)压倒了一个子任务或插槽
冗长的gc暂停
争夺关键资源(例如,使用nas作为rocksdb的本地磁盘)
启用rocksdb本机度量可以提供一些见解。

相关问题