我们需要在两个不同的键上聚合数据。作业的输入记录如下。
{
.....
.....
"key1" : "093570lells99345dfklsfkd",
"key2" : "8587656783487535nxdghljd",
.....
.....
}
记录存储在沙发底座中。现在Flink数据管道看起来像。
env.addSource(readFromCouchBase...)
.name("couchbase-flinkjob")
.assignTimestampsAndWatermarks(
new TimestampExtractorAndWatermarkEmitter(60 * 1000, false))
.keyBy[(String)](key1)
.timeWindow(Time.seconds(10))
.aggregate(new CountGroupFunctionWithEventTimeProcessing, new CountGroupWindowFunction)
.addSink(new IdempotentPostgresSqlSinkFunction).name("postgres-sink")
现在,我们还需要对key 2进行分组。为此,我们是否需要创建新的管道,或者是否有一种方法可以修改当前管道,以便于对key 1和key 2分别进行分组?有效的方法是使用当前管道本身来获得所需的分组-因为这将避免从沙发基座阅读两次。
1条答案
按热度按时间n8ghc7c11#
您可以这样做: