从kafka中间主题中删除消息

6jjcrrmo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(393)

我有一个名为source的主题,它包含两种消息流:a和b。我已经编写了一个kafka streams应用程序,它使用该主题,找到具有相同关联id的a和b,并将它们聚合到一个新消息c中,并将其放在输出主题目标上
有时一个a没有一个b(反之亦然)会放在源主题上。我已经创建了一个可查询的状态存储,这样我就可以查看这些悬空消息,但是现在我想从中间主题中删除一条特定的消息。我猜这只是一个用正确的键(我有)得到一条消息的问题,然后将正文插入中间主题。问题是什么是最好的方法?
向源生成一个特殊的clear消息,这将导致聚合的消息变为null
使用空数据直接向中间主题写入消息
另一方面,也许kafka streams已经有了这个api调用?
附加问题:如果我知道我不想让消息在中间主题中停留超过6个月,我可以指示kafka streams创建保留时间为6个月的中间主题,还是应该在运行应用程序之前手动创建主题?

hpcdzsge

hpcdzsge1#

附加问题:如果我知道我不想让消息在中间主题中停留超过6个月,我可以指示kafka streams创建保留时间为6个月的中间主题,还是应该在运行应用程序之前手动创建主题?
是,您可以设置保留时间,例如:

kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config retention.ms=16070400000

或者在创建主题时:

kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --if-not-exists --config retention.ms=16070400000 --topic my_topic

相关问题