@StreamListener("input")
@SendTo("output")
public KStream<?, MyObject> process(KStream<Object, IncomingObject> input) {
KTable table = input.flatMapValues(value -> this.getMylogic(value));
return table.toStream();
}
我正在尝试将kstream转换为ktable,然后再转换为kstream,但无法从kstream转换为ktable
值是json。请帮忙,我怎样才能使用聚合呢?
{
"name":"test",
address{
"localAddress":"myaddress",
"businessAddress":"testAddress"
}
}
在mylogic方法中,我只取一个地址发送到另一个主题。请帮忙
1条答案
按热度按时间z0qdvdin1#
您需要应用聚合函数,例如count,以获得ktable作为结果。否则无法执行kstream->ktable->kstream。
您需要并且可以做的是kstream.count()(例如)->ktable->kstream。所以聚合的结果基本上会被发布到kstream,这也可能会被发布到kafka主题中。