Flink优化更大的聚合或更多的任务

xyhw6mcr  于 2023-05-05  发布在  Apache
关注(0)|答案(3)|浏览(105)

对于Apache Flink聚合,最好是具有复杂状态的聚合,还是具有更小的聚合但更多的任务。
例如,如果我有一个用户通过Web界面观看视频的数据流。我需要以下项目的聚合:

  • 用户观看多少视频
  • 一个用户从多少个不同的ip地址观看视频
  • 用户观看视频的不同登录会话有多少个
  • 等(约10个不同方面)

对于Flink资源来说,为用户创建一个聚合对象并收集每个对象的统计数据(在内部跟踪不同的值)更好,还是为每个键组合创建多个流更好?

inputStream
  .keyBy("accountId")
  .window(TumblingProcessingTimeWindows.of(Time.minutes(1))
  .aggregate(new UberAggregator());

其中UberAggregator函数可以跟踪不同方面的所有不同值

inputStream
  .keyBy("accountId", "ipAddress)
  .window(TumblingProcessingTimeWindows.of(Time.minutes(1))
  .aggregate(new SumAggregator());

inputStream
  .keyBy("accountId", "videoId")
  .window(TumblingProcessingTimeWindows.of(Time.minutes(1))
  .aggregate(new SumAggregator());

inputStream
  .keyBy("accountId", "sessionId")
  .window(TumblingProcessingTimeWindows.of(Time.minutes(1))
  .aggregate(new SumAggregator());

...

其中SumAggregator是一个简单的聚合函数,用于跟踪一件事。

zsohkypk

zsohkypk1#

这实际上取决于数据的规模,以及流中密钥的分布。
对于帐户ID范围有限、每个帐户的IP地址/会话ID有限的小数据集,在流中使用UberAgg完全可以。Flink内部可以管理状态中的所有数据(唯一IP,会话ID)。
对于具有较大帐户ID范围的大型日期集,但IP地址/会话ID有限(所以不存在某些帐户具有非常大的唯一IP地址/会话ID集),我更喜欢使用UberAgg。为了避免状态变得太大,我们可以基于对总帐户ID的估计来设置更高的并行度。Flink将很好地处理可扩展性。
对于具有倾斜帐户ID的大型数据集或某些帐户具有非常大的IP集,这将使某些操作员具有大型状态(这将导致流式作业中的缓慢任务),那么我更喜欢第二种解决方案。虽然3 keyBy会引入额外的数据混洗,但它们也可以通过在键中添加额外的字段来处理倾斜数据问题。

fzsnzjdm

fzsnzjdm2#

从状态管理的Angular 来看,UberAggregator更好。让我们假设Flink部署使用RocksDB支持的存储来进行状态持久化(这是常见的情况)。
在Flink中,键控操作符状态被分区并分发到所有并行操作符示例。每个操作符示例被分配一个键组,该键组是所有键的子集。
在UberAggregator示例中,有一个键控运算符。该操作符的状态将由作为键的accountId组成,它们的值状态将存储为该键的列族。
如果我们假设有100万个可能的accountId,UberAggregator方法将产生100万个带值的键。
将其与使用10个不同的SumAggregator运算符进行比较。每个操作员将有其唯一的100万个键(用于accountId)。这意味着RocksDB后端将维护1000万个密钥。
这种增加的状态大小将对检查点设置、失败的任务恢复等产生影响。
UberAggregator更好。

cwtwac6a

cwtwac6a3#

最有效的是单个UberAggregator。网络 Shuffle 是昂贵的,所以做10个keyBy()并不是最佳的,这就是为什么有一个keyBy()会更有效。
这也是最简单的工作流程,您希望从可能有效的最简单解决方案开始,然后进行测量/优化。

相关问题