ApacheFlink—每小时聚合数据的每日聚合

rks48beu  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(493)

我有一个窗口每小时聚合的数据流。
数据流 <RawData> ds=。。。。。

SingleOutputStreamOperator<HourlyAggregated> hourly =  
  ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR)
             .timeWindow(Time.hours(1))
             .aggregate(new HourlyCountersAggregation());

每小时这个数据流 <HourlyAggregated> 沉入Cassandra。
我还想每天聚合相同的数据流 <HourlyAggregated> .
建议使用什么方法从每小时聚合的数据流中执行此操作,以避免保持数据流的大量每日流状态 <RawData> ds。。。。

yxyvkwin

yxyvkwin1#

我认为您可以尝试分叉作业图,以便在接收器和下一个窗口中使用每小时聚合数据的输出(这样您可以在每小时聚合的基础上进行每日聚合,从而节省计算时间)。比如:

SingleOutputStreamOperator<HourlyAggregated> hourly = ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR)
         .timeWindow(Time.hours(1))
         .aggregate(new HourlyCountersAggregation());

hourly.addSink(...);

SingleOutputStreamOperator<HourlyAggregated> daily = hourly.timeWindow(Time.hours(24))
         .trigger(CountEventTrigger.of(24))
         .aggregate(...);

相关问题