我们正在运行一个liststate在300gb到400gb之间的作业,有时列表会增长到几千个。在我们的用例中,每个项都必须有自己的ttl,因此我们使用rocksdb后端在s3上为这个liststate的每个新项创建一个新计时器。
目前大约有1.4亿多个计时器(将在event.timestamp+40天触发)。
我们的问题是,作业的检查点突然被卡住,或者非常慢(比如几个小时内1%),直到它最终超时。通常会停止(flink Jmeter 盘显示 0/12 (0%)
前面的几行显示 12/12 (100%)
)在一段非常简单的代码上:
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
更多信息:
至少一次检查点模式似乎比一次更容易卡住
几个月前,这个州的数据容量达到了1.5tb,我认为数十亿的计时器没有任何问题。
运行两个任务管理器的机器上的ram、cpu和网络看起来都很正常 state.backend.rocksdb.thread.num = 4
第一个事件发生在我们收到大量事件(大约几百万分钟)的时候,而不是上一个事件。
所有的事件都来自Kafka的主题。
在至少一次检查点模式下,作业仍然正常运行和使用。
这是第二次发生在我们身上,拓扑运行得非常好,每天有几百万个事件,突然停止检查点。我们不知道是什么原因造成的。
任何人都能想到什么会突然导致检查站卡住?
1条答案
按热度按时间x3naxklr1#
一些想法:
如果有多个计时器或多或少同时触发,那么这场计时器风暴将阻止其他任何事情的发生——任务将循环调用计时器,直到没有更多的计时器要触发为止,在此期间,它们的输入队列将被忽略,检查点障碍将不会继续。
如果这是你的麻烦的原因,你可能会添加一些随机抖动到你的计时器,使事件风暴不会变成风暴以后计时器。重新组织使用状态ttl可能是另一种选择。
如果堆上有很多计时器,这会导致非常高的gc开销。这不一定会使工作失败,但会使检查点不稳定。在这种情况下,将计时器移到rocksdb中可能会有所帮助。
另外:由于您使用的是rocksdb,因此以时间为键从liststate切换到mapstate,可以删除单个条目,而无需在每次更新后重新序列化整个列表(对于rocksdb,mapstate中的每个键/值对都是一个单独的rocksdb对象。)通过这种方式提高清理效率可能是最好的补救方法。