文章13 | 阅读 5137 | 点赞0
public interface SnapShot {
// 反序列化至DataTree和sessions,返回被序列化的最后一个zxid
long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;
// 序列化DataTree和sessions至文件名为name中
void serialize(DataTree dt, Map<Long, Integer> sessions,
File name, boolean fsync)
throws IOException;
// 找到最近一个持久化文件
File findMostRecentSnapshot() throws IOException;
// 获取最近的持久化信息
SnapshotInfo getLastSnapshotInfo();
// 释放资源
void close() throws IOException;
public class FileSnap implements SnapShot {
File snapDir;
SnapshotInfo lastSnapshotInfo = null;
private volatile boolean close = false;
private static final int VERSION = 2;
private static final long dbId = -1;
// 魔鬼数据,放入文件头中,校验
public final static int SNAP_MAGIC = ByteBuffer.wrap("ZKSN".getBytes()).getInt();
public static final String SNAPSHOT_FILE_PREFIX = "snapshot";
// 构造方法,快照文件目录
public FileSnap(File snapDir) {
this.snapDir = snapDir;
public SnapshotInfo getLastSnapshotInfo() {
return this.lastSnapshotInfo;
// 反序列方法
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// 按照zxid倒叙找到100个文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
File snap = null;
boolean foundValid = false;
// 遍历文件
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
// 当前文件file
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
// 获取文件流构造InputArchive
InputArchive ia = BinaryInputArchive.getArchive(snapIS);
// 通过ia反序列化dt和sessions
deserialize(dt, sessions, ia);
// 检查安全数据
SnapStream.checkSealIntegrity(snapIS, ia);
foundValid = true;
// 找到就返回
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
// 获取找到的文件名,获取它的zxid返回
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
// 构造最新的SnapshotInfo
lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
return dt.lastProcessedZxid;
// 通过ia反序列化dt和sessions
public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
// 先反序列化FileHeader,检查它的魔鬼数据
FileHeader header = new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers "
+ header.getMagic() +
" != " + FileSnap.SNAP_MAGIC);
// 调用util反序列化
// 在文件中找到最近的一个文件
public File findMostRecentSnapshot() throws IOException {
// 按照zxid倒叙找到1个文件并返回
List<File> files = findNValidSnapshots(1);
if (files.size() == 0) {
return null;
return files.get(0);
// 按照文件名解析出zxid,倒序找到最近n个文件
private List<File> findNValidSnapshots(int n) throws IOException {
// 遍历snapDir目录下所有文件,并按照zxid倒序
List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
int count = 0;
List<File> list = new ArrayList<File>();
for (File f : files) {
try {
if (SnapStream.isValidSnapshot(f)) {
// 校验n个snapShot文件
if (count == n) {
} catch (IOException e) {
LOG.info("invalid snapshot " + f, e);
return list;
// 同步方法,序列DataTree和snapShot至文件snapShot中
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
throws IOException {
if (!close) {
try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot)) {
// 获取当前文件流构造OutputArchive
OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
// SNAP_MAGIC魔鬼数字构造文件头
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
// 调用serialize方法
serialize(dt, sessions, oa, header);
SnapStream.sealStream(snapOS, oa);
// 更新lastSnapshotInfo
lastSnapshotInfo = new SnapshotInfo(
Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
snapShot.lastModified() / 1000);
// serialize the datatree and sessions
protected void serialize(DataTree dt,Map<Long, Integer> sessions,
OutputArchive oa, FileHeader header) throws IOException {
// this is really a programmatic error and not something that can
// happen at runtime
throw new IllegalStateException(
"Snapshot's not open for writing: uninitialized header");
// 先序列化fileheader
header.serialize(oa, "fileheader");
// 再调用util序列化
public synchronized void close() throws IOException {
close = true;
// 接口去读取TxnLog事务日志数据
public interface TxnLog extends Closeable {
// 设置ServerStats
void setServerStats(ServerStats serverStats);
// 回滚当前追加的日志
void rollLog() throws IOException;
// 添加日志,true为成功
boolean append(TxnHeader hdr, Record r) throws IOException;
// 通过给定的zxid,读取日志
TxnIterator read(long zxid) throws IOException;
// 返回最后一个日志的zxid
long getLastLoggedZxid() throws IOException;
//truncate 日志
boolean truncate(long zxid) throws IOException;
// 获取日志的dbId
long getDbId() throws IOException;
// 提交事务,保证持久化
void commit() throws IOException;
// 日志Elapsed时间
long getTxnLogSyncElapsedTime();
void setTotalLogSize(long size);
long getTotalLogSize();
// 读事务日志遍历器
public interface TxnIterator extends Closeable {
TxnHeader getHeader();
Record getTxn();
boolean next() throws IOException;
long getStorageSize() throws IOException;
LogFile: FileHeader TxnList ZeroPad
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
TxnList: Txn || Txn TxnList
Txn: checksum Txnlen TxnHeader Record 0x42
checksum: 8bytes Adler32(通过Txnlen, TxnHeader, Record和0x42计算得出)
Txnlen: len 4bytes
TxnHeader: {
sessionid 8bytes
cxid 4bytes
zxid 8bytes
time 8bytes
type 4bytes
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
public final static int TXNLOG_MAGIC = ByteBuffer.wrap("ZKLG".getBytes()).getInt();
public final static int VERSION = 2;
long lastZxidSeen;
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
File logDir;
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
long dbId;
private final Queue<FileOutputStream> streamsToFlush = new ArrayDeque<>();
File logFileWrite = null;
private FilePadding filePadding = new FilePadding();
private ServerStats serverStats;
private volatile long syncElapsedMS = -1L;
// 构造方法,传入日志目录
public FileTxnLog(File logDir) {
this.logDir = logDir;
// 回滚日志方法
public synchronized void rollLog() throws IOException {
// 当前日志流不为空,则刷新
if (logStream != null) {
prevLogsRunningTotal += getCurrentLogSize();
this.logStream = null;
oa = null;
// 回滚当前日志文件数据添加至prevLogsRunningTotal中即可
// 关闭已经打开的文件流
public synchronized void close() throws IOException {
if (logStream != null) {
for (FileOutputStream log : streamsToFlush) {
// 添加事务日志数据
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException {
// 校验驶入头消息
if (hdr == null) {
return false;
// 校验事务的zxid是否比最后的大
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
if (logStream==null) {
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
// 创建文件
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
// 打开日志文件流
fos = new FileOutputStream(logFileWrite);
// 打开日志内存缓冲流
logStream=new BufferedOutputStream(fos);
// 构造BinaryOutputArchive
oa = BinaryOutputArchive.getArchive(logStream);
// 构造FileHeader信息
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
// 先序列化文件头
fhdr.serialize(oa, "fileheader");
// 刷新数据至文件中,确保魔鬼数据比filePadding先至文件
// 调用util,序列化hdr和txn
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
// 构造Checksum
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
// 先写入长度校验数据
oa.writeLong(crc.getValue(), "txnEntryCRC");
// 再写buf数据
Util.writeTxnBytes(oa, buf);
return true;
// 获取最后一条事务日志的zxid
public long getLastLoggedZxid() {
// 获取目录下zxid大于0的所有文件
File[] files = getLogFiles(logDir.listFiles(), 0);
long maxLog=files.length>0?
long zxid = maxLog;
TxnIterator itr = null;
try {
// 新建 TxnIterator遍历器,读取大于maxLog所有的文件,找到zxid的最大值
FileTxnLog txn = new FileTxnLog(logDir);
itr = txn.read(maxLog);
while (true) {
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
return zxid;
// 安静的关闭事务日志TxnIterator遍历器
private void close(TxnIterator itr) {
if (itr != null) {
try {
} catch (IOException ioe) {
LOG.warn("Error closing file iterator", ioe);
// 提交日志,确保所有数据都刷新至文件中
public synchronized void commit() throws IOException {
// 刷新当前logStream
if (logStream != null) {
// 刷新所有log
for (FileOutputStream log : streamsToFlush) {
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
if(serverStats != null) {
while (streamsToFlush.size() > 1) {
// Roll the log file if we exceed the size limit
if(txnLogSizeLimit > 0) {
long logSize = getCurrentLogSize();
if (logSize > txnLogSizeLimit) {
LOG.debug("Log size limit reached: {}", logSize);
// truncate the current transaction logs
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
// 构造logDir目录下,大于zxid的文件遍历器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
if(input == null) {
throw new IOException("No log files found to truncate! This could " +
"happen if you still have snapshots from an old setup or " +
"log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
long pos = input.getPosition();
// 通过RandomAccessFile设置文件长度至当前位置
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
while(itr.goToNextLog()) {
// 删除文件
if (!itr.logFile.delete()) {
LOG.warn("Unable to truncate {}", itr.logFile);
} finally {
// 关闭遍历器
return true;
// 获取dbId,读取目录下第一个文件,获取文件头中dbId
public long getDbId() throws IOException {
FileTxnIterator itr = new FileTxnIterator(logDir, 0);
FileHeader fh=readHeader(itr.logFile);
throw new IOException("Unsupported Format.");
return fh.getDbid();
public static class FileTxnIterator implements TxnLog.TxnIterator {
// 文件目录
File logDir;
// 需遍历大于zxid的文件
long zxid;
// 当前文件TxnHeader
TxnHeader hdr;
// 当前文件record
Record record;
// 当前文件
File logFile;
// 当前文件构造的InputArchive
InputArchive ia;
PositionInputStream inputStream=null;
// 我们已经找到的比zxid大的文件
private ArrayList<File> storedFiles;
// 构造函数,logDir目录,比zxid大,构造时,是否提前向前找到大于zxid的文件
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;
this.zxid = zxid;
// 初始化时,打开第一个文件,初始化hdr和record
// fastForward为true时,获取当前hed的zxid,直到找到大于zxid的第一个文件才停止向下遍历
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
// 构造函数,fastForward为true
public FileTxnIterator(File logDir, long zxid) throws IOException {
this(logDir, zxid, true);
// 初始化,打开第一个文件
void init() throws IOException {
storedFiles = new ArrayList<File>();
// 获取logDir目录下zxid大于0的所有文件,并且按照zxid倒序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
for (File f: files) {
// 找到大于等于zxid的文件放入storedFiles中
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
// 并且添加第一个小于zxid的文件至storedFiles中
else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
// 下一个日志
// 打开下一个文件logFile,并创建ia
private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) {
this.logFile = storedFiles.remove(storedFiles.size()-1);
ia = createInputArchive(this.logFile);
return true;
return false;
// 创建InputArchiv
protected InputArchive createInputArchive(File logFile) throws IOException {
inputStream= new PositionInputStream(new BufferedInputStream(new FileInputStream(logFile)));
LOG.debug("Created new input stream " + logFile);
ia = BinaryInputArchive.getArchive(inputStream);
LOG.debug("Created new input archive " + logFile);
return ia;
// 创建Adler32 Checksum
protected Checksum makeChecksumAlgorithm(){
return new Adler32();
// 遍历器处理当前打开的数据
public boolean next() throws IOException {
if (ia == null) {
return false;
try {
// 先读取 crcvalue
long crcValue = ia.readLong("crcvalue");
// 再读取bytes
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
// 校验Checksum
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
hdr = new TxnHeader();
// 反序列化hdr和record
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream = null;
ia = null;
hdr = null;
// this means that the file has ended
// we should go to the next file
if (!goToNextLog()) {
return false;
// 如果当前文件结束,则自动向下走
return next();
} catch (IOException e) {
throw e;
return true;
public class FileTxnSnapLog {
//the directory containing the
//the transaction logs
final File dataDir;
//the directory containing the
//the snapshot directory
final File snapDir;
TxnLog txnLog;
SnapShot snapLog;
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
txnLog = new FileTxnLog(this.dataDir);
snapLog = new FileSnap(this.snapDir);
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
long snapLoadingStartTime = Time.currentElapsedTime();
long deserializeResult = snapLog.deserialize(dt, sessions);
Time.currentElapsedTime() - snapLoadingStartTime);
FileTxnLog txnLog = new FileTxnLog(dataDir);
boolean trustEmptyDB;
File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
LOG.info("Initialize file found, an empty database will not block voting participation");
trustEmptyDB = true;
} else {
trustEmptyDB = autoCreateDB;
return fastForwardFromEdits(dt, sessions, listener);
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
} catch (IOException e) {
if (snapshotFile.length() == 0) {
if (snapshotFile.delete()) {
LOG.info("Deleted empty snapshot file: " +
} else {
LOG.warn("Could not delete empty snapshot file: " +
} else {
throw e;
public boolean truncateLog(long zxid) throws IOException {
// close the existing txnLog and snapLog
// truncate it
FileTxnLog truncLog = new FileTxnLog(dataDir);
boolean truncated = truncLog.truncate(zxid);
txnLog = new FileTxnLog(dataDir);
snapLog = new FileSnap(snapDir);
return truncated;
public File findMostRecentSnapshot() throws IOException {
FileSnap snaplog = new FileSnap(snapDir);
return snaplog.findMostRecentSnapshot();
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33513250/article/details/103873391