我有一个名为source的主题,它包含两种消息流:a和b。我已经编写了一个kafka streams应用程序,它使用该主题,找到具有相同关联id的a和b,并将它们聚合到一个新消息c中,并将其放在输出主题目标上
有时一个a没有一个b(反之亦然)会放在源主题上。我已经创建了一个可查询的状态存储,这样我就可以查看这些悬空消息,但是现在我想从中间主题中删除一条特定的消息。我猜这只是一个用正确的键(我有)得到一条消息的问题,然后将正文插入中间主题。问题是什么是最好的方法?
向源生成一个特殊的clear消息,这将导致聚合的消息变为null
使用空数据直接向中间主题写入消息
另一方面,也许kafka streams已经有了这个api调用?
附加问题:如果我知道我不想让消息在中间主题中停留超过6个月,我可以指示kafka streams创建保留时间为6个月的中间主题,还是应该在运行应用程序之前手动创建主题?
1条答案
按热度按时间hpcdzsge1#
附加问题:如果我知道我不想让消息在中间主题中停留超过6个月,我可以指示kafka streams创建保留时间为6个月的中间主题,还是应该在运行应用程序之前手动创建主题?
是,您可以设置保留时间,例如:
或者在创建主题时: