搞懂AbstractFetcherThread的processPartitionData、truncate、buildFetch等方法,就掌握了拉取线程的处理逻辑。串联起这三个方法的doWork方法就能完整理解Follower副本应用拉取线程(即ReplicaFetcherThread线程),从Leader副本获取消息并处理的流程了。
doWork,AbstractFetcherThread的核心方法,线程的主逻辑运行方法:
AbstractFetcherThread线程只要一直处运行状态,就会不断重复这俩操作。
因为分区的Leader可能随时变化。每当有新Leader产生,Follower副本就必须主动执行截断,将自己的本地日志裁剪成与Leader一模一样的消息序列,甚至,Leader副本也要执行截断,将LEO调整到分区高水位处。
先对分区状态进行分组。既然是做截断,则该方法操作的就只能是处于【截断中】状态的分区。
Leader Epoch机制,替换高水位值在日志截断中的作用:
doTruncate调用抽象方法truncate,而truncate实现在ReplicaFetcherThread。
第1步,为partitionStates中的分区构造FetchRequest.Builder对象,之后调用其build方法创建FetchRequest请求对象。这里的partitionStates保存要去获取消息的一组分区及对应状态信息。该步的输出结果是两个对象:
第2步,处理出错分区:将这组分区加入到有序Map末尾,等待后续重试。若发现当前无可读取分区,会阻塞等待一段时间
第3步,发送FETCH请求给对应Leader副本,并处理相应Response,即processFetchRequest要做的事。
搞清processFetchRequest的核心逻辑,就能明白拉取线程是如何执行拉取动作:
拿到Response后,从中取出分区的核心信息:
比较要读取的位移值==当前AbstractFetcherThread线程缓存的、该分区下一条待读取的位移值
当前分区是否处于可获取状态
若不满足这俩条件,说明该Request可能是个之前等待了许久都未处理的请求,就不用处理了。
相反,若满足这两个条件且:
ReplicaFetcherThread继承自AbstractFetcherThread,是Follower副本端创建的线程,用于向Leader副本拉取消息数据。
ReplicaFetcherThread的定义代码有些长,但构造器中大部分字段都解析过了。现在,只需学习ReplicaFetcherThread类的字段:
消息获相关字段:
都是FETCH请求的参数,主要控制Follower副本拉取Leader副本消息的行为,如:
Follower副本拉取线程要做的最重要的三件事:
AbstractFetcherThread线程从Leader副本拉取回消息后,要调用processPartitionData执行后续动作:
processPartitionData中的process就是写入Follower副本本地日志。因此,该方法的主体逻辑就是调用分区对象Partition的appendRecordsToFollowerOrFutureReplica写入获取到的消息。沿着这个写入方法追踪,就会发现它调用appendAsFollower。
仅写入日志还不够,还要做一些更新。如更新Follower副本的高水位值:将FETCH请求Response中包含的高水位值作为新的高水位值,还要尝试更新Follower副本的Log Start Offset值。
为何Log Start Offset值也可能变化?因为Leader的Log Start Offset可能发生变化,如用户手动执行删除消息的操作。Follower副本的日志要和Leader保持严格一致,因此,若Leader的该值发生变化,Follower自然也要发生变化。
此外还会更新其他一些统计指标值,最后将写入结果返回。
构建发送给Leader副本所在Broker的FETCH请求:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ccGd2xLJ-1641571213281)(/Users/apple/Library/Application Support/typora-user-images/image-20220107235217950.png)]
构造FETCH请求的Builder对象然后返回。有Builder对象,就能构造出FETCH请求,仅需调用builder.build()。
该方法的一个副产品是汇总出错分区,调用方后续可统一处理这些出错分区。
构造Builder的过程中,会用到ReplicaFetcherThread类定义的那些与消息获取相关的字段,如maxWait、minBytes和maxBytes。
对给定分区执行日志截断操作:
override def truncate(
tp: TopicPartition,
offsetTruncationState: OffsetTruncationState): Unit = {
// 拿到分区对象
val partition = replicaMgr.nonOfflinePartition(tp).get
//拿到分区本地日志
val log = partition.localLogOrException
// 执行截断操作,截断到的位置由offsetTruncationState的offset指定
partition.truncateTo(offsetTruncationState.offset, isFuture = false)
if (offsetTruncationState.offset < log.highWatermark)
warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +
s"${log.highWatermark}")
if (offsetTruncationState.truncationCompleted)
replicaMgr.replicaAlterLogDirsManager
.markPartitionsForTruncation(brokerConfig.brokerId, tp,
offsetTruncationState.offset)
}
利用给定的offsetTruncationState的offset值,对给定分区的本地日志进行截断操作。该操作由Partition对象的truncateTo方法完成,但实际上底层调用的是Log#truncateTo:将日志截断到小于给定值的最大位移值处。
AbstractFetcherThread线程的doWork完整了拉取线程要执行的逻辑,即日志截断(truncate)+日志获取(buildFetch)+日志处理(processPartitionData),而其子类ReplicaFetcherThread是真正实现这3个方法:Follower副本利用ReplicaFetcherThread线程实时地从Leader副本拉取消息并写入到本地日志,从而实现了与Leader副本之间的同步。
要点:
Follower副本正是利用它来获取对应分区Partition对象的,然后依靠该对象执行消息写入。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://javaedge.blog.csdn.net/article/details/122374591
内容来源于网络,如有侵权,请联系作者删除!