kstream到ktable

jgzswidk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(528)
@StreamListener("input")
@SendTo("output")
public KStream<?, MyObject> process(KStream<Object, IncomingObject> input) {
KTable table = input.flatMapValues(value -> this.getMylogic(value));            
return table.toStream();
}

我正在尝试将kstream转换为ktable,然后再转换为kstream,但无法从kstream转换为ktable
值是json。请帮忙,我怎样才能使用聚合呢?

{
"name":"test",
address{
"localAddress":"myaddress",
"businessAddress":"testAddress"
}
}

在mylogic方法中,我只取一个地址发送到另一个主题。请帮忙

z0qdvdin

z0qdvdin1#

您需要应用聚合函数,例如count,以获得ktable作为结果。否则无法执行kstream->ktable->kstream。
您需要并且可以做的是kstream.count()(例如)->ktable->kstream。所以聚合的结果基本上会被发布到kstream,这也可能会被发布到kafka主题中。

相关问题