我要先做几点志愿者:
我是Flink的新手(和它一起工作了大约一个月)
我使用的是kinesis分析(aws托管的flink解决方案)。不管怎么说,这并没有真正限制flink的多功能性或容错选项,但我还是要说出来。
我们有一个相当直接的滑动窗口应用程序。密钥流通过特定的密钥(例如ip地址)组织事件,然后在processorfunction中处理它们。我们主要用它来记录事物的数量。例如,过去24小时内某个特定ip地址的登录次数。每隔30秒,我们计算窗口中每个键的事件数,并将该值保存到外部数据存储中。状态也会更新以反映该窗口中的事件,以便旧事件过期并且不会占用内存。
有趣的是,基数不是问题。如果我们有20万人登录,在24小时内,一切都是完美的。当一个ip在24小时内登录20万次时,事情开始变得棘手起来。此时,检查站的时间开始越来越长。一个普通的检查点需要2-3秒,但是在这种用户行为下,检查点开始需要5分钟,然后10分钟,然后15分钟,然后30分钟,然后40分钟,以此类推。
令人惊讶的是,应用程序可以在这种情况下平稳运行一段时间。大概10到12个小时。但是,检查点迟早会完全失败,然后我们的最大迭代器年龄开始激增,没有新的事件被处理等等。
在这一点上,我尝试了一些方法:
向问题投掷更多金属(同时启用自动缩放)
对检查点之间的间隔和最小次数的检查大惊小怪https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/api_checkpointconfiguration.html
重构以减少存储状态的占用
(1) 其实做的不多(2) 这似乎有帮助,但随后又出现了一个更大的流量高峰,而我们之前看到的情况压榨了任何好处(3)目前尚不清楚这是否有帮助。我认为我们的应用程序内存占用比你想象的yelp或airbnb都使用flink集群来处理大规模应用程序的内存占用要小,所以我不能想象我的状态真的有问题。
我想说的是,我希望我们不必对应用程序输出的期望进行深刻的改变。这个滑动窗口是非常有价值的数据。
编辑:有人问我的状态看起来是什么样的valuestate[foostate]
case class FooState(
entityType: String,
entityID: String,
events: List[BarStateEvent],
tableName: String,
baseFeatureName: String,
)
case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
编辑:我想强调一下用户davidanderson在评论中说的话:
有时用于实现滑动窗口的一种方法是使用mapstate,其中键是切片的时间戳,值是事件列表。
这是必要的。对于其他试图走这条路的人来说,我找不到一个可行的解决方案,不把事件分成一些时间段。我的最终解决方案是将事件分成30秒的批,然后按照david的建议将它们写入map状态。这似乎很管用。对于我们的高负载时期,检查点保持在3mb,它们总是在不到一秒钟的时间内完成。
1条答案
按热度按时间zz2j4svz1#
如果您有一个24小时长的滑动窗口,它会滑动30秒,那么每个登录都会分配给2880个单独的窗口。没错,Flink的滑动窗口可以复制。在这种情况下,24602份。
如果您只是计算登录事件,那么在窗口关闭之前不需要实际缓冲登录事件。你可以使用
ReduceFunction
执行增量聚合。我的猜测是,您没有利用这种优化,因此,当您有一个热键(ip地址)时,处理该热键的示例的数据量不成比例,并且需要很长时间来检查点。
另一方面,如果您已经在进行增量聚合,并且检查点像您描述的那样有问题,那么有必要更深入地了解原因。
一种可能的补救方法是使用
ProcessFunction
. 这样做可以避免维护2880个单独的窗口,并使用更高效的数据结构。编辑(基于更新的问题):
我认为问题在于:当使用rocksdb state后端时,state以序列化字节的形式存在。每个状态访问和更新都必须通过ser/de
List[BarStateEvent]
正在反序列化,然后在每次修改时重新序列化。对于一个列表中有20万个事件的ip地址,这将是非常昂贵的。你应该做的是
ListState
或者MapState
. 这些状态类型针对rocksdb进行了优化。rocksdb状态后端可以附加到ListState
不反序列化列表。和MapState
,Map中的每个键/值对都是一个单独的rocksdb对象,允许高效的查找和修改。有时用于实现滑动窗口的一种方法是使用mapstate,其中键是切片的时间戳,值是事件列表。在flink文档中有一个类似的例子(但是有翻滚的窗口)。
或者,如果您的状态可以放入内存,您可以使用fsstatebackend。那么您的所有状态都将是jvm堆上的对象,而ser/de只在检查点和恢复期间起作用。