如何在单个Flink管道中聚合不同的密钥

dwbf0jvd  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(106)

我们需要在两个不同的键上聚合数据。作业的输入记录如下。

{
   .....
   .....
   "key1" : "093570lells99345dfklsfkd",
   "key2" : "8587656783487535nxdghljd",
   .....
   .....
}

记录存储在沙发底座中。现在Flink数据管道看起来像。

env.addSource(readFromCouchBase...)
  .name("couchbase-flinkjob")
  .assignTimestampsAndWatermarks(
    new TimestampExtractorAndWatermarkEmitter(60 * 1000, false))
  .keyBy[(String)](key1)
  .timeWindow(Time.seconds(10))
  .aggregate(new CountGroupFunctionWithEventTimeProcessing, new CountGroupWindowFunction)
  .addSink(new IdempotentPostgresSqlSinkFunction).name("postgres-sink")

现在,我们还需要对key 2进行分组。为此,我们是否需要创建新的管道,或者是否有一种方法可以修改当前管道,以便于对key 1和key 2分别进行分组?有效的方法是使用当前管道本身来获得所需的分组-因为这将避免从沙发基座阅读两次。

n8ghc7c1

n8ghc7c11#

您可以这样做:

val events = env.addSource(readFromCouchBase...)
  .name("couchbase-flinkjob")
  .assignTimestampsAndWatermarks(
    new TimestampExtractorAndWatermarkEmitter(60 * 1000, false));

events
  .keyBy[String](key1)
  .timeWindow(Time.seconds(10))
  .aggregate(new CountGroupFunctionWithEventTimeProcessing, new CountGroupWindowFunction)
  .name("window-for-key1")
  .addSink(new IdempotentPostgresSqlSinkFunction)
  .name("postgres-sink");

events
  .keyBy[String](key2)
  ...

相关问题