使用ktable时,当示例/使用者的数量等于分区的数量时,kafka streams不允许示例从特定主题的多个分区读取。我试着用globalktable实现这个功能,问题是数据会被覆盖,聚合也不能应用于它。
假设我有一个名为“data\u in”的主题,有3个分区(p1、p2、p3)。当我运行kafka流应用程序的3个示例(i1、i2、i3)时,我希望每个示例都从“data\u in”的所有分区读取数据。我的意思是i1可以从p1,p2和p3读取,i2可以从p1,p2和p3读取,i2等等。
编辑:请记住,生产者可以将两个相似的id发布到“data\u in”中的两个不同分区中。因此,当运行两个不同的示例时,globalktable将被覆盖。
请问,如何做到这一点?这是我代码的一部分
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
1条答案
按热度按时间ocebsuys1#
将输入主题“data\u in”的分区数更改为1个分区或使用
GlobalKtable
从主题中的所有分区获取数据,然后可以将流与之连接。这样,你的应用示例就不必再位于不同的用户群中了。代码如下所示:
编辑:我编辑了上面的代码,以强制对名为“new\u data\u in”的主题进行重新分区。