对于Apache Flink聚合,最好是具有复杂状态的聚合,还是具有更小的聚合但更多的任务。
例如,如果我有一个用户通过Web界面观看视频的数据流。我需要以下项目的聚合:
- 用户观看多少视频
- 一个用户从多少个不同的ip地址观看视频
- 用户观看视频的不同登录会话有多少个
- 等(约10个不同方面)
对于Flink资源来说,为用户创建一个聚合对象并收集每个对象的统计数据(在内部跟踪不同的值)更好,还是为每个键组合创建多个流更好?
inputStream
.keyBy("accountId")
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))
.aggregate(new UberAggregator());
其中UberAggregator函数可以跟踪不同方面的所有不同值
或
inputStream
.keyBy("accountId", "ipAddress)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))
.aggregate(new SumAggregator());
inputStream
.keyBy("accountId", "videoId")
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))
.aggregate(new SumAggregator());
inputStream
.keyBy("accountId", "sessionId")
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))
.aggregate(new SumAggregator());
...
其中SumAggregator是一个简单的聚合函数,用于跟踪一件事。
3条答案
按热度按时间zsohkypk1#
这实际上取决于数据的规模,以及流中密钥的分布。
对于帐户ID范围有限、每个帐户的IP地址/会话ID有限的小数据集,在流中使用UberAgg完全可以。Flink内部可以管理状态中的所有数据(唯一IP,会话ID)。
对于具有较大帐户ID范围的大型日期集,但IP地址/会话ID有限(所以不存在某些帐户具有非常大的唯一IP地址/会话ID集),我更喜欢使用UberAgg。为了避免状态变得太大,我们可以基于对总帐户ID的估计来设置更高的并行度。Flink将很好地处理可扩展性。
对于具有倾斜帐户ID的大型数据集或某些帐户具有非常大的IP集,这将使某些操作员具有大型状态(这将导致流式作业中的缓慢任务),那么我更喜欢第二种解决方案。虽然3 keyBy会引入额外的数据混洗,但它们也可以通过在键中添加额外的字段来处理倾斜数据问题。
fzsnzjdm2#
从状态管理的Angular 来看,UberAggregator更好。让我们假设Flink部署使用RocksDB支持的存储来进行状态持久化(这是常见的情况)。
在Flink中,键控操作符状态被分区并分发到所有并行操作符示例。每个操作符示例被分配一个键组,该键组是所有键的子集。
在UberAggregator示例中,有一个键控运算符。该操作符的状态将由作为键的accountId组成,它们的值状态将存储为该键的列族。
如果我们假设有100万个可能的accountId,UberAggregator方法将产生100万个带值的键。
将其与使用10个不同的SumAggregator运算符进行比较。每个操作员将有其唯一的100万个键(用于accountId)。这意味着RocksDB后端将维护1000万个密钥。
这种增加的状态大小将对检查点设置、失败的任务恢复等产生影响。
UberAggregator更好。
cwtwac6a3#
最有效的是单个
UberAggregator
。网络 Shuffle 是昂贵的,所以做10个keyBy()
并不是最佳的,这就是为什么有一个keyBy()
会更有效。这也是最简单的工作流程,您希望从可能有效的最简单解决方案开始,然后进行测量/优化。