如何在kafka流上实现分组变换

pdkcd3nj  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(416)

在kafka流上应用groupby()或groupbykey()时,会得到一个kgroupedstream对象。有没有可能在一个时间窗口内具体化该对象,并将分组数据写入一个主题?

vsdwdz23

vsdwdz231#

您可以编写自定义聚合器将groupedstream具体化为ktable。它将有列表格式的分组记录。以后可以发表到Kafka的题目上。

KTable<K, ArrayList<V>> groupedTable = streamObject.groupByKey().aggregate(
            // Custom Initializer
            ArrayList::new,
            // aggregator
            (key, value, list) -> {
                list.add(value);
                return list;
            }, new ArrayListSerde<V>(serdeType), storageName);
groupedTable.through(newTopic);
// Or convert into stream and publish
groupedTable.toStream().to(newTopic);

相关问题