转载并且补充:Kafka源码解析 - LogSegment以及Log初始化
我们先回想一下Kafka的日志结构是怎样的?
Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)
。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的
。
下面我们看一下LogSegment的实现情况,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。
LogSegment.scala这个文件里面定义了三个对象:
我这里贴一下LogSegment.scala这个文件上面的注释,介绍了LogSegment的构成:
A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in any previous segment
这段注释清楚的写了每个日志段由两个核心组件构成:日志和索引
。每个日志段都有一个起始位置:base offset
,而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大
。
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { … }
FileRecords
是实际保存 Kafka 消息的对象。
lazyOffsetIndex、lazyTimeIndex 和 txnIndex
分别对应位移索引文件、时间戳索引文件、已中止事务索引文件。
baseOffset
是每个日志段对象的起始位移,每个 LogSegment
对象实例一旦被创建,它的起始位移就是固定
的了,不能再被更改。
indexIntervalBytes
值其实就是 Broker 端参数 log.index.interval.bytes
值,它控制了日志段对象新增索引项的频率。默认情况下,日志段至少新写入 4KB
的消息数据才会新增一条索引项。
time
是用于统计计时的一个实现类。
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* 在指定的 offset 处追加指定的 msgs, 需要的情况下追加相应的索引
*
* 将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引
* 项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护
*
* 在LogSegment.append0方法中实现了追加消息的功能,可能有多个Handler线程并发写人同一个LogSegment,所以调用此方法时必须保证线程安全,
* 在后面分析Log类时会看到相应的同步代码。
*
* 这个方法主要做了那么几件事:
*
* 1. 判断日志段是否为空,不为空则往下进行操作
* 2. 调用ensureOffsetInRange方法,确保输入参数最大位移值是合法的。
* 3. 调用 FileRecords 的 append 方法执行真正的写入。
* 4. 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。
* 5. 更新索引项和写入的字节数,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了
* 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数。
*
* @param largestOffset The last offset in the message set
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
* @return the physical position in the file of the appended records
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*
*
*/
@nonthreadsafe
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
// 检测是否满足添加索引项的条件,判断是否日志段是否为空
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
// 获取FileRecord 文件的末尾,他就是本次消息要写入的物理地址
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// 确保输入参数最大位移值是合法的
ensureOffsetInRange(largestOffset)
// append the messages
// todo:写日志文件 追加到数据文件中 内存中
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
// 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
// 当一些如字节数超过4kb之后,append方法会调用索引对象的append方法 新增索引项 同时清空已经写入字节数
//note: 判断是否需要追加索引(数据每次都会添加到数据文件中,但不是每次都会添加索引的,间隔 indexIntervalBytes 大小才会写入一个索引文件)
// todo: 优化点 这里并不是来一条数据就写入一条数据,而是达到了一定的条件才写一次索引,我们管它叫稀疏索引
// indexIntervalBytes 默认是4096个字节,也就是说每次写了4096个字节的消息就更新一次索引
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
// todo:添加数据索引
offsetIndex.append(largestOffset, physicalPosition)
//添加时间戳索引
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
//重置为0
bytesSinceLastIndexEntry = 0
}
//更新bytesSinceLastIndexEntry 添加消息的大小到 bytesSinceLastIndexEntry 变量中
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
这个方法主要做了那么几件事:
我们下面再看看ensureOffsetInRange
方法是怎么校验最大位移的:
/**
* 这个方法会将offset和baseOffset做对比,当offset小于baseOffset或者当offset和baseOffset相减后大于Int的最大值,
* 那么都是异常的情况,那么这时就会抛出LogSegmentOffsetOverflowException异常。
* @param offset
* @return
*/
private def ensureOffsetInRange(offset: Long): Unit = {
if (!canConvertToRelativeOffset(offset))
throw new LogSegmentOffsetOverflowException(this, offset)
}
这个方法最终会调用到AbstractIndex的toRelative方法中:
/**
* 这个方法会将offset和baseOffset做对比,当offset小于baseOffset或者当offset和baseOffset相减后大于Int的最大值,
* 那么都是异常的情况,那么这时就会抛出LogSegmentOffsetOverflowException异常。
* @param offset
* @return
*/
private def toRelative(offset: Long): Option[Int] = {
val relativeOffset = offset - baseOffset
if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
None
else
Some(relativeOffset.toInt)
}
可见这个方法会将offset和baseOffset做对比,当offset小于baseOffset或者当offset和baseOffset相减后大于Int的最大值,那么都是异常的情况,那么这时就会抛出LogSegmentOffsetOverflowException异常。
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxPosition The maximum position in the log segment that should be exposed for read
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
*
* 读取日志分段(副本同步不会设置 maxSize)
*
* startOffset : 指定读取的起始消息的offset
* maxOffset : 指定读取结束的offset,可以为空
* maxSize : 指定读取的最大字节数
* maxPosition : 指定读取的最大物理地址,可选参数,默认值是日志文件的大小
*/
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
// 日志文件测长度,log 文件物理长度
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
//将startOffset转成对应的物理地址,以字节为单位
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
// 组装offset元数据信息
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
// 如果设置了 adjustedMaxSize ,则根据其具体计算实际需要读取的字节数
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
// 计算读取的字节数
val fetchSize: Int = maxOffset match {
//note: 副本同步时的计算方式
case None =>
// no max offset, just read until the max position
min((maxPosition - startPosition).toInt, adjustedMaxSize)
//note: consumer 拉取时,计算方式
case Some(offset) =>
// there is a max offset, translate it to a file position and use that to calculate the max read size;
// when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
// true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
// offset between new leader's high watermark and the log end offset, we want to return an empty response.
if (offset < startOffset)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
//将maxOffset转成物理地址
val mapping = translateOffset(offset, startPosition)
val endPosition =
// maxOffset超出此日志文件时,则使用日志文件长度
if (mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
//由maxOffsetmaxPosition和maxSize共同决定读取的长度
min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
}
//maxOffset通常是Replica的HW,消费者最多只能读到HW这个位置的消息。
// todo: 封装结果 通过 log.slice 读取数据 读取数据的方法是 log.slice
//按照读取起始位置和长度生成的一个分片的FileRecords对象,根据起始的物理位置和读取长度读取数据文件
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
这段代码中,主要做了这几件事:
这个方法是恢复日志段,Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的 LogSegment 对象实例。在这个过程中,它需要执行一系列的操作。
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
* from the end of the log and index.
*
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
* the transaction index.
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
*
* 根据日志文件重建索引文件,同时验证日志文件中消息的合法性
*
* 在重建索引文件过程中,如果遇到了压缩消息需要进行解压,主要原因是因为索引项中保存的相对offset 是第一条消息的offset
* ,而外层消息的offiset是压缩消息集合中的最后一条消息的offset。
*
* 恢复一个日志段——即根据日志文件重建索引并砍掉那些无效的字节,所谓的无效字节就是由参数限定的,任何在maxMessageSize之外的字节都是为无效状态。
* 该方法实现也很简单,就是先将索引项全部截断并将索引文件重置为原来的大小,然后遍历该消息集合,超过indexIntervalBytes之后就追加一条索引记录从而达到重建索引的目的
*
* 这个方法是恢复日志段,Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的
* LogSegment 对象实例。在这个过程中,它需要执行一系列的操作。
*
* 这个方法主要做了以下几件事:
* 1. 清空索引文件
* 2. 遍历日吹端中多有消息集合
* 1. 校验日志段中的消息
* 2. 获取最大时间戳及所属消息位移
* 3. 更新索引项
* 4. 更新总消息字节数
* 5. 更新Porducer状态和Leader Epoch缓存
* 3. 执行消息日志索引文件截断
* 4. 调整索引文件大小
*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
//遍历日志段中所有消息集合
for (batch <- log.batches.asScala) {
// 验证消息是否合法
batch.ensureValid()
// 校验消息中最后一条消息的位移不能越界
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
// 获取最大时间戳及所属消息位移
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
// 当已写入字节数超过了 4KB 之后,调用索引对象的 append 方法新增索引项,同时清空已写入字节数
if (validBytes - lastIndexEntry > indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
// 更新总消息字节数
validBytes += batch.sizeInBytes()
// 更新Porducer状态和Leader Epoch缓存
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e: CorruptRecordException =>
warn("Found invalid messages in log segment %s at byte offset %d: %s."
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
// 遍历完后将 遍历累加的值和日志总字节数比较,
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
// 执行日志截断操作,对日志文件进行拦截,抛弃后面验证失败的Message
log.truncateTo(validBytes)
// 对索引文件进行相应的截断,调整索引文件大小
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
// 对索引文件进行相应的拦截
timeIndex.trimToValidSize()
// 返回截掉的字节数
truncated
}
下面我们进入到truncateTo方法中,看一下截断操作是怎么做的:
/**
* Truncate this file message set to the given size in bytes. Note that this API does no checking that the
* given size falls on a valid message boundary.
* In some versions of the JDK truncating to the same size as the file message set will cause an
* update of the files mtime, so truncate is only performed if the targetSize is smaller than the
* size of the underlying FileChannel.
* It is expected that no other threads will do writes to the log when this function is called.
* @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
* @return The number of bytes truncated off
*
* 将文件消息集合截断成指定的字节大小,这个方法不保证截断位置的Message的完整性。
*
* Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,
* 说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。
*/
public int truncateTo(int targetSize) throws IOException {
int originalSize = sizeInBytes();
// 检测 targetSize 的有效性, 要截断的目标大小不能超过当前文件的大小
if (targetSize > originalSize || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.");
//如果目标大小小于当前文件大小,那么执行截断
if (targetSize < (int) channel.size()) {
// 裁剪文件
channel.truncate(targetSize);
// 修改 size
size.set(targetSize);
}
// 返回裁剪掉的字节数
return originalSize - targetSize;
}
Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值
这个方法会将日志段中的数据强制截断到指定的位移处。
/**
* Truncate off all index and log entries with offsets >= the given offset.
* If the given offset is larger than the largest message in this segment, do nothing.
*
* @param offset The offset to truncate to
* @return The number of log bytes truncated
*
* 给定一个位移,将位于该位移之后的所有索引项和日志项全部清除,如果给定的位移大于日志段本身的最大位移就什么都不做。
* 最后函数返回日志数据总共截断的字节数。值得注意的是,如果把所有日志数据都截断了,那么需要更新这个日志段的创建日期。
* 同时还会将检查是否增加索引项的指针清零。
*
* 日志截断使之保存的最大offset不会超过给定的targetOffset。当然,如果targetOffset就比现有日志的结束位移还要大自然什么都不做。
* 另外在截断的过程中,还需要判断该log的最小位移(也就是第一个日志段的基础位移)如果比targetOffset大的话,那么直接调用
* truncateFullyAndStartAt方法删除所有日志数据并设置新的位移点,否则逐一删除那些起始位移比targetOffset大的日志段。
* 此时activeSegment会自动变成当前删除之后最新的那个日志段,所以还要对activeSegment进行截断操作。这些做完之后更新下一条消息offset并重设恢复点位移
*
* 这个方法会将日志段中的数据强制截断到指定的位移处。
* 1. 将位置值转换成物理文件位置
* 2. 移动索引到指定位置,位移索引文件、时间戳索引文件、已中止事务索引文件等位置
* 3. 将索引做一次resize操作,节省内存空间
* 4. 调整日志段日志位置
*/
@nonthreadsafe
def truncateTo(offset: Long): Int = {
// Do offset translation before truncating the index to avoid needless scanning
// in case we truncate the full index
// 将位置值转换成物理文件位置
val mapping = translateOffset(offset)
// 移动索引到指定位置
offsetIndex.truncateTo(offset)
timeIndex.truncateTo(offset)
txnIndex.truncateTo(offset)
// After truncation, reset and allocate more space for the (new currently active) index
// 因为位置变了,为了节省内存,做一次resize操作
offsetIndex.resize(offsetIndex.maxIndexSize)
timeIndex.resize(timeIndex.maxIndexSize)
val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position)
// 如果调整到初始位置,那么重新记录一下创建时间
if (log.sizeInBytes == 0) {
created = time.milliseconds
rollingBasedTimestamp = None
}
//调整索引项
bytesSinceLastIndexEntry = 0
//调整最大的索引位置
if (maxTimestampSoFar >= 0)
loadLargestTimestamp()
bytesTruncated
}
我们到OffsetIndex的truncateTo方法中看一下:
/**
* 1. 根据指定位移返回消息中的槽位。
* 2. 如果返回的槽位小于零,说明没有消息位移小于指定位移,所以newEntries返回0。
* 3. 如果指定位移在消息位移中,那么返回slot槽位。
* 4. 如果指定位移位置大于消息中所有位移,那么跳到消息位置中最大的一个的下一个位置。
*
* @param offset
*/
override def truncateTo(offset: Long) {
inLock(lock) {
val idx = mmap.duplicate
//根据指定位移返回消息中位移
val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY)
/* There are 3 cases for choosing the new size
* 1) if there is no entry in the index <= the offset, delete everything
* 2) if there is an entry for this exact offset, delete it and everything larger than it
* 3) if there is no entry for this offset, delete everything larger than the next smallest
*/
val newEntries =
//如果没有消息的位移值小于指定位移值,那么就直接从头开始
if(slot < 0)
0
// 跳到执行的位移位置
else if(relativeOffset(idx, slot) == offset - baseOffset)
slot
// 指定位移位置大于消息中所有位移,那么跳到消息位置中最大的一个的下一个位置
else
slot + 1
// 执行位置跳转
truncateToEntries(newEntries)
}
}
讲完了LogSegment之后,我们在来看看Log。
Log.scala定义了 10 个类和对象,图中括号里的 C 表示 Class,O 表示 Object。
我们主要看的是Log类:
/**
* An append-only log for storing messages.
*
* The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
*
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
*
* @param dir The directory in which log segments are created.
* @param config The log configuration settings
* @param logStartOffset The earliest offset allowed to be exposed to kafka client.
* The logStartOffset can be updated by :
* - user's DeleteRecordsRequest
* - broker's log retention
* - broker's log truncation
* The logStartOffset is used to decide the following:
* - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
* It may trigger log rolling if the active segment is deleted.
* - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
* we make sure that logStartOffset <= log's highWatermark
* Other activities such as log cleaning are not affected by logStartOffset.
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param brokerTopicStats Container for Broker Topic Yammer Metrics
* @param time The time instance used for checking the clock
* @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
*
* param dir Log对应的磁盘目录,此目录下存放了每个LogSegment对应的日志文件和索引文件。
* param config : 日志配置信息
* param logStartOffset 日志段集合中第一个日志段的基础位移,也就是这个日志对象的基础位移logEndOffsetMetadata
* The logStartOffset can be updated by :
* - user's DeleteRecordsRequest
* - broker's log retention
* - broker's log truncation
* The logStartOffset is used to decide the following:
* - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
* It may trigger log rolling if the active segment is deleted.
* - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
* we make sure that logStartOffset <= log's highWatermark
* Other activities such as log cleaning are not affected by logStartOffset.
*
* param recoveryPoint : 恢复的起始offset——即尚未被写入磁盘的第一个offset,指定恢复操作的起始offset, recoveryPoint 之前的Message已经刷新到磁盘上持久存储,而其后的消息则不一-定,
* 出现宕机时可能会丢失。所以只需要恢复recoveryPoint之后的消息即可。
* param scheduler : 用于后台操作的一个调度器线程池。主要用于异步地删除日志段和日志段切分时使用
* param brokerTopicStats Container for Broker Topic Yammer Metrics
* param time : 提供时间服务的对象实例
* param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
* param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
*
*
*
* Log是对多个LogSegment对象的顺序组合,形成一个逻辑的日志。为了实现快速定位LogSegment, Log使用跳表( SkipList )对LogSegment进行管理。
*
* 在Log中,将每个LogSegment的baseOffset作为key, LogSegment 对象作为value,放人segments这个跳表中管理
*
* 向Log中追加消息时是顺序写人的,那么只有最后一个LogSegment能够进行写人操作,在其之前的所有LogSegment都不能写入数据。
* 最后一个LogSegment使用Log.activeSegment()方法获取,即segments集合中最后一个元素,为了描述方便,我们将此Segment对象称为“activeSegment” 。
* 随着数据的不断写入,当activeSegment的日志文件大小到达一定阈值时,就需要创建新的activeSegment, 之后追加的消息将写人新的activeSegment。
*
*/
@threadsafe
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
主要的属性有两个dir和logStartOffset,分别表示个日志所在的文件夹路径,也就是主题分区的路径以及日志的当前最早位移。
在kafka中,我们用Log End Offset(LEO)表示日志下一条待插入消息的位移值,也就是日志的末端位移。
Log Start Offset表示日志当前对外可见的最早一条消息的位移值。
再看看其他属性:
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
nextOffsetMetadata基本上等同于LEO。
highWatermarkMetadata是分区日志高水位值。
segments保存了分区日志下所有的日志段信息。
Leader Epoch Cache 对象保存了分区 Leader 的 Epoch 值与对应位移值的映射关系。
/**
* 参考文章:https://www.cnblogs.com/luozhiyun/p/13062835.html
*/
locally {
val startMs = time.milliseconds
// create the log directory if it doesn't exist
//创建分区日志路径
Files.createDirectories(dir.toPath)
//初始化Leader Epoch Cache
initializeLeaderEpochCache()
//将topic所有的分片加载到segments集合了.并做一些topic分片文件检查工作.
//加载所有日志段对象
val nextOffset = loadSegments()
/** Calculate the offset of the next message
* activeSegment表示当前最后一个分片.因为分片是按大小分布.最大的就是最新的.也就是活跃的分片.这里生成下一个offsetmetadata
* "下一个偏移量数据",它的数据都是从当前活动的日志分段获取的
* 下一个偏移量元数据
* 第一个参数:下一条消息的偏移量;第二个参数:日志分段的基准偏移量;第三个参数:日志分段大小
* */
nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
//更新Leader Epoch Cache,清除无效数据
leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException("Producer state must be empty during log initialization")
loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
}
这个代码里面主要做了这几件事:
Leader Epoch暂且不表,我们看看loadSegments是如何加载日志段的。
/**
* Load the log segments from the log files on disk and return the next offset.
* This method does not need to convert IOException to KafkaStorageException because it is only called before all logs
* are loaded.
* @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that overflow index offset; or when
* we find an unexpected number of .log files with overflow
*
* (1)删除“.delete” 和“.cleaned” 文件。因为存在“.cleaned” 后缀的文件表示是在
* 日志压缩过程中宕机的,“.cleaned”文件中数据的状态不明确,无法进行恢复。而“.swap'
* 后缀的文件表示日志压缩已经完成了,但在swap过程中宕机,“ .swap”文件中保存了日
* 志压缩后的完整消息,可进行恢复。“ .delete”后缀的文件则是本来就要删除的日志文件
* 或索引文件。
*
* (2)加载全部的日志文件和索引文件。如果索引文件没有配对的日志文件,则删除索引文件;如果日志文件没有对应的索引文件,则重建索引文件。
*
*
* 该方法就是加载磁盘上的日志文件。具体逻辑如下:
* a. 如果给定的路径不存在则创建出来;
* b. 遍历该目录路径下的所有文件删除掉那些临时文件(包括后缀名是.deleted和.cleaned);
* c. 如果发现是以.swap结尾的文件,说明在上一次的swap过程中Kafka失败了,需要执行恢复操作。针对上面的情况,
* 先去掉结尾的.swap然后判断是.log还是.index结尾。如果是索引文件(.index结尾)则直接删除,反正后面可以重建;
* 如果是日志数据文件(.log结尾),那么先删除对应的索引文件,然后将.swap去掉表示修复成功;
* d. 第一遍遍历之后再次进行第二遍遍历。对目录下的每个文件,如果它是索引文件,则寻找对应的.log文件,
* 如果不存在抛出告警信息并直接该索引文件; 如果存在的话不做任何处理;
* 但如果该文件本身就是日志数据文件,则必然是000000...0000【offset】.log这样的形式;
* e. 提取基础offset,并判断是否存在对应的索引文件,然后就创建新的日志段对象。
* f. 创建日志段之后判断是否存在索引文件,如果没有的话重建索引;
* g. 最后将新创建的日志段加入到日志段map中,至此第二遍遍历完成;
* h. 此时判断日志段map中是否存在任何日志段,如果没有的话则创建一个offset为0的空日志段——因为每个日志都至少要有一个日志段。
* 如果map中的确有日志段,先调用recoverLog方法(稍后会说)恢复日志段然后重设activetSegment的索引长度(否则容易引发日志段切分);
* j. 最后为每个日志段检查对应的索引文件(确保索引文件为空以及索引长度一定要是8的倍数,因为索引项长度总是位移的整数倍)
*
*
* 这个方法首先会调用removeTempFilesAndCollectSwapFiles方法移除上次 Failure 遗留下来的各种
* 临时文件(包括.cleaned、.swap、.deleted 文件等)。
*
* 然后它会清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应日志
* 段文件的孤立索引文件。
*
* 待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用 completeSwapOperations 方法。
* 等这些都做完之后,再调用 recoverLog 方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值。
*/
private def loadSegments(): Long = {
// first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
//移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)
val swapFiles = removeTempFilesAndCollectSwapFiles()
// Now do a second pass and load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
// 清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应
// 日志段文件的孤立索引文件。
retryOnOffsetOverflow {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
//先清空日志段信息
logSegments.foreach(_.close())
segments.clear()
//从文件中装载日志段
loadSegmentFiles()
}
// Finally, complete any interrupted swap operations. To be crash-safe,
// log files that are replaced by the swap segment should be renamed to .deleted
// before the swap file is restored as the new segment file.
//完成未完成的 swap 操作
completeSwapOperations(swapFiles)
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
recoverLog()
}
// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
0
}
}
这个方法首先会调用removeTempFilesAndCollectSwapFiles
方法移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等
)。
然后它会清空所有日志段对象
,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应日志段文件的孤立索引文件。
待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用 completeSwapOperations 方法。等这些都做完之后,再调用 recoverLog 方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值
。
/**
* Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped
* in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
* the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted
* by this method.
* @return Set of .swap files that are valid to be swapped in as segment files
*
* 删除日志目录中找到的任何临时文件,并创建一个所有.swap文件的列表,这些文件可以替换现有的段。对于日志分割,我们知道,
* 任何.swap文件的基偏移量大于最小偏移量.clean文件,都可能是不完整分割操作的一部分。这样的.swap文件也会被这个方法删除。
*
* 1. 定义了一个内部方法deleteIndicesIfExist,用于删除日志文件对应的索引文件。
* 2. 遍历文件列表删除遗留文件,并筛选出.cleaned结尾的文件和.swap结尾的文件。
* 3. 根据minCleanedFileOffset删除无效的.swap文件。
* 4. 最后返回当前有效的.swap文件集合
*/
private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
// 在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
val offset = offsetFromFile(baseFile)
Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
}
var swapFiles = Set[File]()
var cleanFiles = Set[File]()
var minCleanedFileOffset = Long.MaxValue
for (file <- dir.listFiles if file.isFile) {
if (!file.canRead)
throw new IOException(s"Could not read file $file")
val filename = file.getName
//如果是以.deleted结尾的文件
if (filename.endsWith(DeletedFileSuffix)) {
debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
// 说明是上次Failure遗留下来的文件,直接删除
Files.deleteIfExists(file.toPath)
} else if (filename.endsWith(CleanedFileSuffix)) {
// 如果是以.cleaned结尾的文件
minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset)
cleanFiles += file
} else if (filename.endsWith(SwapFileSuffix)) {
// .swap结尾的文件
// we crashed in the middle of a swap operation, to recover:
// if a log, delete the index files, complete the swap operation later
// if an index just delete the index files, they will be rebuilt
//更改文件名
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
//如果该.swap文件原来是索引文件
if (isIndexFile(baseFile)) {
// 删除原来的索引文件
deleteIndicesIfExist(baseFile)
} else if (isLogFile(baseFile)) {
// 如果该.swap文件原来是日志文件
// 删除掉原来的索引文件
deleteIndicesIfExist(baseFile)
// 加入待恢复的.swap文件集合中
swapFiles += file
}
}
}
// KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap
// files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment
// for more details about the split operation.
// 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
invalidSwapFiles.foreach { file =>
debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
deleteIndicesIfExist(baseFile, SwapFileSuffix)
Files.deleteIfExists(file.toPath)
}
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
// 清除所有待删除文件集合中的文件
cleanFiles.foreach { file =>
debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
}
// 最后返回当前有效的.swap文件集合
validSwapFiles
}
deleteIndicesIfExist
,用于删除日志文件对应的索引文件。.cleaned结尾的文件和.swap结尾
的文件。minCleanedFileOffset
删除无效的.swap
文件。.swap
文件集合处理完了removeTempFilesAndCollectSwapFiles方法,然后进入到loadSegmentFiles方法中。
/**
* This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
* It is possible that we encounter a segment with index offset overflow in which case the LogSegmentOffsetOverflowException
* will be thrown. Note that any segments that were opened before we encountered the exception will remain open and the
* caller is responsible for closing them appropriately, if needed.
* @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset
*
*
* 这个方法不需要将IOException转换为KafkaStorageException,因为它只在加载所有日志之前调用,我们可能会遇到索引偏移溢出的段,
* 在这种情况下会抛出LogSegmentOffsetOverflowException。请注意,在我们遇到异常之前打开的任何段都将保持打开状态,如果需要,
* 调用者负责适当地关闭它们。
*
* 1. 遍历文件目录
* 2. 如果文件是索引文件,那么检查一下是否存在相应的日志文件。
* 3. 如果是日志文件,那么创建对应的LogSegment对象实例,并加入segments中。
*/
private def loadSegmentFiles(): Unit = {
// load segments in ascending order because transactional data from one segment may depend on the
// segments that come before it
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
//如果不是以.log结尾的文件,如.index、.timeindex、.txnindex
if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
val offset = offsetFromFile(file)
val logFile = Log.logFile(dir, offset)
// 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
if (!logFile.exists) {
warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) {
// 如果是以.log结尾的文件
// if it's a log file, load the corresponding log segment
val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
// 创建对应的LogSegment对象实例,并加入segments中
val segment = LogSegment.open(dir = dir,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
"recovering segment and rebuilding index files...")
recoverSegment(segment)
case e: CorruptIndexException =>
warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
recoverSegment(segment)
}
addSegment(segment)
}
}
}
接下来调用completeSwapOperations方法处理有效.swap 文件集合。
/**
* This method does not need to convert IOException to KafkaStorageException because it is only called before all logs
* are loaded.
* @throws LogSegmentOffsetOverflowException if the swap file contains messages that cause the log segment offset to
* overflow. Note that this is currently a fatal exception as we do not have
* a way to deal with it. The exception is propagated all the way up to
* KafkaServer#startup which will cause the broker to shut down if we are in
* this situation. This is expected to be an extremely rare scenario in practice,
* and manual intervention might be required to get out of it.
*
* 这个方法不需要将IOException转换为KafkaStorageException,因为它只在加载所有日志之前调用。
*
* LogSegmentOffsetOverflowException : 如果交换文件包含导致日志段偏移量溢出的消息。请注意,这目前是一个致命的异常,
* 因为我们没有办法处理它。异常将一直传播到KafkaServer#startup,如果我们处于这种情况,将导致代理关闭。这在实践中是非常罕见的情况,
* 可能需要手工干预才能摆脱它。
*
* 1. 遍历所有有效.swap文件
* 2. 创建对应的LogSegment实例;
* 3. 执行日志段恢复操作,恢复部分的源码已经在LogSegment里面讲了;
* 4. 把.swap文件重命名成.log;
*/
private def completeSwapOperations(swapFiles: Set[File]): Unit = {
// 遍历所有有效.swap文件
for (swapFile <- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
// 拿到日志文件的起始位移值
val baseOffset = offsetFromFile(logFile)
// 创建对应的LogSegment实例
val swapSegment = LogSegment.open(swapFile.getParentFile,
baseOffset = baseOffset,
config,
time = time,
fileSuffix = SwapFileSuffix)
info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
// 执行日志段恢复操作
recoverSegment(swapSegment)
// We create swap files for two cases:
// (1) Log cleaning where multiple segments are merged into one, and
// (2) Log splitting where one segment is split into multiple.
//
// Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
// must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
// of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
// do a replace with an existing segment.
// 确认之前删除日志段是否成功,是否还存在老的日志段文件
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
segment.readNextOffset > swapSegment.baseOffset
}
// 如果存在,直接把.swap文件重命名成.log
replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
}
}
最后是执行recoverLog部分代码。
/**
* Recover the log segments and return the next offset after recovery.
* This method does not need to convert IOException to KafkaStorageException because it is only called before all
* logs are loaded.
* @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow
*
* 主要为日志段map中自恢复点起的每个日志段重建索引文件并且砍掉那些位于日志和索引尾部的无效字节。如果发现确实存在无效字节,那么就把那些日志段全部删除掉
*/
private def recoverLog(): Long = {
// if we have the clean shutdown marker, skip recovery
// 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
if (!hasCleanShutdownFile) {
// okay we need to actually recover this log
// 获取到上次恢复点以外的所有unflushed日志段对象
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
// 遍历这些unflushed日志段
while (unflushed.hasNext) {
val segment = unflushed.next
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
// 执行恢复日志段操作
recoverSegment(segment, leaderEpochCache)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
s"creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
// 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
unflushed.foreach(deleteSegment)
}
}
}
// 这些都做完之后,如果日志段集合不为空
if (logSegments.nonEmpty) {
val logEndOffset = activeSegment.readNextOffset
if (logEndOffset < logStartOffset) {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
"This could happen if segment files were deleted from the file system.")
logSegments.foreach(deleteSegment)
}
}
// 这些都做完之后,如果日志段集合为空了
if (logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at logStartOffset
// 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
addSegment(LogSegment.open(dir = dir,
baseOffset = logStartOffset,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = config.preallocate))
}
// 更新上次恢复点属性,并返回
recoveryPoint = activeSegment.readNextOffset
recoveryPoint
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_21383435/article/details/123431127
内容来源于网络,如有侵权,请联系作者删除!