我在一个股票市场项目中使用apache flink来计算当前的价格变化。公式是
price_change = (current_price - previous_close_price) / previous_close_price
``` `previous_close_price` 指证券在前一交易日的收盘价。每天开市前,我都要更新 `previous_close_price` .
现在我想出了几个解决办法,但我不知道哪一个是最好的。
商店 `previous_close_price` 在redis中,每次计算都会得到价格。更新价格既简单又灵活,但此解决方案可能会降低性能。
将状态的ttl设置为1天。当旧状态过期时获取新状态。但由于ttl是硬编码的,所以它并不灵活。
广播状态模式。我不确定这个解决方案是否有效。
给Flink发个特别消息。当flink收到消息时,它会更新 `previous_close_price` .
任何建议都将被采纳。
1条答案
按热度按时间voase2hg1#
我建议在#4上使用一种变体:
有两个来源,一个只用于收盘价,另一个用于交易流。通过安全机制为两个流设置密钥,并使用协处理函数将它们连接起来。在协处理函数中以键控状态存储上一个\u关闭\u价格。
每天开盘前,输入最新收盘价。
这可以通过richcoflatmap完成,但我建议使用协处理函数,因为您可能希望使用一个side输出来报告错误(例如,缺少上一个收盘价的证券)。
至于其他方法:
我认为把以前的价格数据保存在外部数据存储中没有任何好处。
我觉得这个效果不太好。没有用于触发加载新数据的钩子,而且,只有在访问时才会清除状态。
这感觉不是广播状态的好用例,除非集群中的每个人都需要知道所有证券的收盘价。