https://zhuanlan.zhihu.com/p/26119361
所有的消费均是客户端发起Pull请求的,告诉消息的offset位置,broker去查询并返回。但是有一点需要非常明确的是,消息消费后,消息其实并没有物理地被清除,这是一个非常特殊的设计。本文来探索此设计的一些细节。
消息的存储是一直存在于CommitLog中的,由于CommitLog是以文件为单位(而非消息)存在的,而且CommitLog的设计是只允许顺序写,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。
所以消息被消费了,消息所占据的物理空间也不会立刻被回收。但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。
那消息文件到底删不删,什么时候删?
消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):
注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。
消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:
注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。
RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作,相比Ext3有非常明显的提升。
由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老,是无需要被消费的,是不合适的。
这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?
对于一个全新的消费组,PushConsumer默认就是跳过以前的消息而从最尾开始消费的,解析请参看 https://zhuanlan.zhihu.com/p/25265380 相关章节。
但对于已存在的消费组,RocketMQ没有内置的实现,但有以下手段可以解决:
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt msg: msgs){
if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期
continue;//过期消息跳过
}
//do consume here
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(//
List<MessageExt> msgs, //
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
long diff = Long. parseLong(maxOffset) - offset;
if (diff > 100000) { //消息堆积了10W情况的特殊处理
return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
}
//do consume here
return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/guolong1983811/article/details/78821917
内容来源于网络,如有侵权,请联系作者删除!