我在使用Kafka和Spring云流新。需要帮助。
设置
我有两个spring-boot应用程序app-1和app-2。
我使用springcloudstream和springcloudstreambinder kafka进行异步通信。
有一个主题-1
用例
假设app-1发送了一条关于topic-1的消息,而app-2正在监听。
app-2使用了该消息并成功地处理了它。
现在这个主题的偏移量增加了。
问题
如何实现一种机制,在指定的时间段后从kafka日志中删除唯一成功使用的消息数据?
在Kafka看来,被消费的东西的责任就是消费者的责任。所以我猜,在spring云流kafka中一定有一些kafka消息日志控制机制我不知道。
注1:我知道kafka日志保留时间和磁盘属性。但Kafka日志将被删除,即使是非消费的消息。
注2:这个问题我已经问过了,但没什么帮助。
1条答案
按热度按时间q1qsirdb1#
据我所知,Kafka没有这种机制;当然不是在springcloudstream或它所基于的库中。kafka客户机无法访问这种低级构造。
此外,消费者补偿完全独立于主题日志;在现代经纪商中,它们存储在一个特殊的主题中。
编辑
根据下面的评论
kafka-delete-records.sh
可以使用命令行工具。注意,这使用scala
AdminClient
默认情况下,它不在scst类路径上(从2.0开始)。然而,java
AdminClient
支持类似功能:您可以创建
AdminClient
使用 BootAutoConfiguration
KafkaAdmin
.