我有一个kafka集群,它有一个用户,每天都在处理tb的数据。一旦消息被使用并提交,就可以立即删除它(或者在保留几分钟之后)。
看起来像是 log.retention.bytes
以及 log.retention.hours
配置从消息创建开始计算。这对我不好。
如果消费者因维护/事故而停机,我希望保留数据,直到数据恢复联机。如果我碰巧用完了空间,我希望拒绝接受生产者提供的新数据,并且不删除尚未使用的数据(因此 log.retention.bytes
帮不了我)。
有什么想法吗?
我有一个kafka集群,它有一个用户,每天都在处理tb的数据。一旦消息被使用并提交,就可以立即删除它(或者在保留几分钟之后)。
看起来像是 log.retention.bytes
以及 log.retention.hours
配置从消息创建开始计算。这对我不好。
如果消费者因维护/事故而停机,我希望保留数据,直到数据恢复联机。如果我碰巧用完了空间,我希望拒绝接受生产者提供的新数据,并且不删除尚未使用的数据(因此 log.retention.bytes
帮不了我)。
有什么想法吗?
1条答案
按热度按时间ztmd8pv51#
如果可以确保邮件具有唯一密钥,则可以将主题配置为使用压缩而不是定时保留策略。然后让您的消费者在处理完每条消息后,使用消息键(但值为空)将一条消息发送回同一主题。Kafka会压缩这些信息。您可以根据需要调整压缩参数(以及日志段文件大小,因为头段从未压缩过,如果希望压缩更快开始,您可能需要将其设置为较小的大小)。
但是,正如我前面提到的,只有当消息具有唯一的密钥时,这才有效,否则您不能简单地打开压缩,因为这将导致在您的消费者停机(或落在标题段后面)期间丢失以前具有相同密钥的消息。