使用scs删除kafka日志中已消耗的消息

t3psigkw  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(420)

我在使用Kafka和Spring云流新。需要帮助。
设置
我有两个spring-boot应用程序app-1和app-2。
我使用springcloudstream和springcloudstreambinder kafka进行异步通信。
有一个主题-1
用例
假设app-1发送了一条关于topic-1的消息,而app-2正在监听。
app-2使用了该消息并成功地处理了它。
现在这个主题的偏移量增加了。

问题

如何实现一种机制,在指定的时间段后从kafka日志中删除唯一成功使用的消息数据?
在Kafka看来,被消费的东西的责任就是消费者的责任。所以我猜,在spring云流kafka中一定有一些kafka消息日志控制机制我不知道。
注1:我知道kafka日志保留时间和磁盘属性。但Kafka日志将被删除,即使是非消费的消息。
注2:这个问题我已经问过了,但没什么帮助。

q1qsirdb

q1qsirdb1#

据我所知,Kafka没有这种机制;当然不是在springcloudstream或它所基于的库中。kafka客户机无法访问这种低级构造。
此外,消费者补偿完全独立于主题日志;在现代经纪商中,它们存储在一个特殊的主题中。
编辑
根据下面的评论 kafka-delete-records.sh 可以使用命令行工具。
注意,这使用scala AdminClient 默认情况下,它不在scst类路径上(从2.0开始)。
然而,java AdminClient 支持类似功能:

/**
 * Delete records whose offset is smaller than the given offset of the corresponding partition.
 *
 * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.11.0.0 or higher.
 *
 * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
 * @return                      The DeleteRecordsResult.
 */
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
    return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}

您可以创建 AdminClient 使用 Boot
AutoConfiguration KafkaAdmin .

AdminClient client = AdminClient.create(kafkaAdmin.getConfig());

相关问题