如何在flink kinesis流中共享缓存

llew8vvj  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(447)

我最近一直在使用flink和kinesis分析。我有一个数据流,我还需要一个缓存来与流共享。
为了与kinesis流共享缓存数据,它连接到广播流。缓存源扩展source函数并实现processingtimecallback。每隔300秒从dynamodb获取数据,并使用keyedBroadcastProcessFunction将其广播到下一个流。
但是在添加广播流之后(在以前的版本中,我没有缓存,我使用keyedprocessfaction for kinesis stream),当我在kinesis analytics中执行它时,它会毫无例外地每1000秒重新启动一次!
我没有这个值的配置和方案之间的工作良好!
有人能帮我什么问题吗?

gc0ot86w

gc0ot86w1#

我的第一个想法是想知道这是否与检查点有关。您有权访问服务器日志吗?flink的日志应该可以让我们清楚地知道是什么导致了重启。
我之所以怀疑检查点是因为它发生在可预测的时间(并且超时时间很长),而使用广播状态会给检查点带来很大的压力。每个并行示例将检查广播状态的完整副本。
广播状态必须保持在堆上,所以另一种可能是内存不足。

相关问题