我有一个kafka流,它完成了ktable中丢失的值(leftjoin做得很好)。但有时,我必须将每个值连接到一个数组中,但我不知道如何正确地执行该操作。
例如(我以一个家庭为例):
{father: idFather, mother : idMother, children:[{child: id1},{child: id2}]
我可以与一个KT连接,以找到父亲和母亲的名字(身份证连接)。但是对于孩子们,我不知道如何将每个孩子循环到数组中(我不知道有多少孩子)。
目前,我为每个子级创建新的kstream: stream.flatMapValues(value -> value.get("children"))
我为每一个孩子加入。然后我必须groupbykey并减少或聚合我的数据,以用人名重建输入数据。
事实上,这是可行的,但我不确定这是最好的方法,我更喜欢避免使用内部kafka存储进行reduce和aggregation操作。
有人有更好的主意吗?谢谢你的帮助
1条答案
按热度按时间nx7onnlm1#
方法是合理的。
如果ktable数据很小,可以考虑使用
GlobalKTable
为了加入。这允许使用kstream中的非键字段来查找GlobalKTable
.