zookeeper源码分析之Record

x33g5p2x  于2021-12-20 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(318)

一、Record

zookeeper通过维护内存中的DataTree和DataNode来向外提供数据节点服务,它也可以序列化持久至文件,反序列化至流中,进行网络请求传输等,所有zookeeper的数据请求和响应实现Record接口。

public interface Record {
    public void serialize(OutputArchive archive, String tag) throws IOException;
    public void deserialize(InputArchive archive, String tag) throws IOException;
}

二、OutputArchive

所有序列化者都需要实现的公共接口,提供写各种类型的数据功能。

public interface OutputArchive {
    // 各种类型的数据写入
    public void writeByte(byte b, String tag) throws IOException;
    public void writeBool(boolean b, String tag) throws IOException;
    public void writeInt(int i, String tag) throws IOException;
    public void writeLong(long l, String tag) throws IOException;
    public void writeFloat(float f, String tag) throws IOException;
    public void writeDouble(double d, String tag) throws IOException;
    public void writeString(String s, String tag) throws IOException;
    public void writeBuffer(byte buf[], String tag) throws IOException;
    // 可以写入子类record
    public void writeRecord(Record r, String tag) throws IOException;
    // 当前record开始时,写入tag标签
    public void startRecord(Record r, String tag) throws IOException;
    // 当前record结束时,写入tag标签
    public void endRecord(Record r, String tag) throws IOException;
    // 集合开始时,写入标签
    public void startVector(List<?> v, String tag) throws IOException;
    // 集合结束时,写入标签
    public void endVector(List<?> v, String tag) throws IOException;
    // map开始时,写入标签
    public void startMap(TreeMap<?,?> v, String tag) throws IOException;
    // map结束时,写入标签
    public void endMap(TreeMap<?,?> v, String tag) throws IOException;
}

三、InputArchive 

所有反序列化者都需要实现的公共接口,提供读各种类型的数据功能。

public interface InputArchive {
    // 各种类型的数据读取
    public byte readByte(String tag) throws IOException;
    public boolean readBool(String tag) throws IOException;
    public int readInt(String tag) throws IOException;
    public long readLong(String tag) throws IOException;
    public float readFloat(String tag) throws IOException;
    public double readDouble(String tag) throws IOException;
    public String readString(String tag) throws IOException;
    public byte[] readBuffer(String tag) throws IOException;
    // 可以读取子类record
    public void readRecord(Record r, String tag) throws IOException;
    // 当前record开始时,读取tag标签
    public void startRecord(String tag) throws IOException;
    // 当前record结束时,读取tag标签
    public void endRecord(String tag) throws IOException;
    // 集合开始时,读取标签
    public Index startVector(String tag) throws IOException;
    // 集合结束时,读取标签
    public void endVector(String tag) throws IOException;
    // map开始时,读取标签
    public Index startMap(String tag) throws IOException;
    // map结束时,读取标签
    public void endMap(String tag) throws IOException;
}

四、BinaryOutputArchive

序列化器和反序列化器需要成对实现,同时保证各个方法匹配,保证序列和反序列结果一致。zookeeper提供3对实现类(BinaryOutputArchive,BinaryInputArchive,XmlOutputArchive,XmlInputArchive,CsvOutputArchive,CsvInputArchive)
默认使用Binary进行数据持久化。

public class BinaryOutputArchive implements OutputArchive {
    // 使用ByteBuffer直接分配1024内存
    private ByteBuffer bb = ByteBuffer.allocate(1024);
    private DataOutput out;
    // 构造器包装Output流
    public static BinaryOutputArchive getArchive(OutputStream strm) {
        return new BinaryOutputArchive(new DataOutputStream(strm));
    }
    // 构造器包装DataOutput流
    public BinaryOutputArchive(DataOutput out) {
        this.out = out;
    }
    // DataOutput代理
    public void writeByte(byte b, String tag) throws IOException {
        out.writeByte(b);
    }
    
    public void writeBool(boolean b, String tag) throws IOException {
        out.writeBoolean(b);
    }
    
    public void writeInt(int i, String tag) throws IOException {
        out.writeInt(i);
    }
    
    public void writeLong(long l, String tag) throws IOException {
        out.writeLong(l);
    }
    
    public void writeFloat(float f, String tag) throws IOException {
        out.writeFloat(f);
    }
    
    public void writeDouble(double d, String tag) throws IOException {
        out.writeDouble(d);
    }
    // 写string,先写len长度
    public void writeString(String s, String tag) throws IOException {
        if (s == null) {
            writeInt(-1, "len");
            return;
        }
        ByteBuffer bb = stringToByteBuffer(s);
        writeInt(bb.remaining(), "len");
        out.write(bb.array(), bb.position(), bb.limit());
    }

    // 写byte,先写barr.length
    public void writeBuffer(byte barr[], String tag)
    throws IOException {
    	if (barr == null) {
    		out.writeInt(-1);
    		return;
    	}
    	out.writeInt(barr.length);
        out.write(barr);
    }
    // 写对象,直接调用对象使用当前序列化器写入
    public void writeRecord(Record r, String tag) throws IOException {
        r.serialize(this, tag);
    }
    // 开始写record时,不做处理
    public void startRecord(Record r, String tag) throws IOException {}
    // 结束写record时,不做处理
    public void endRecord(Record r, String tag) throws IOException {}
    // 写集合时,先写大小size
    public void startVector(List<?> v, String tag) throws IOException {
    	if (v == null) {
    		writeInt(-1, tag);
    		return;
    	}
        writeInt(v.size(), tag);
    }
    
    public void endVector(List<?> v, String tag) throws IOException {}
    // 写map时,先写大小size
    public void startMap(TreeMap<?,?> v, String tag) throws IOException {
        writeInt(v.size(), tag);
    }
    
    public void endMap(TreeMap<?,?> v, String tag) throws IOException {}
    
}

五、BinaryInputArchive

public class BinaryInputArchive implements InputArchive {
    static public final String UNREASONBLE_LENGTH= "Unreasonable length = ";
    private DataInput in;
    // 构造器包装Input流
    static public BinaryInputArchive getArchive(InputStream strm) {
        return new BinaryInputArchive(new DataInputStream(strm));
    }
    // BinaryIndex,done方法为map或集合个数为传入的len递减结束,incr方法len递减
    static private class BinaryIndex implements Index {
        private int nelems;
        BinaryIndex(int nelems) {
            this.nelems = nelems;
        }
        public boolean done() {
            return (nelems <= 0);
        }
        public void incr() {
            nelems--;
        }
    }
    // 构造器包装DataInput流
    public BinaryInputArchive(DataInput in) {
        this.in = in;
    }
    // DataInput代理
    public byte readByte(String tag) throws IOException {
        return in.readByte();
    }
    
    public boolean readBool(String tag) throws IOException {
        return in.readBoolean();
    }
    
    public int readInt(String tag) throws IOException {
        return in.readInt();
    }
    
    public long readLong(String tag) throws IOException {
        return in.readLong();
    }
    
    public float readFloat(String tag) throws IOException {
        return in.readFloat();
    }
    
    public double readDouble(String tag) throws IOException {
        return in.readDouble();
    }
    // 读string时,先读len
    public String readString(String tag) throws IOException {
    	int len = in.readInt();
    	if (len == -1) return null;
        checkLength(len);
    	byte b[] = new byte[len];
    	in.readFully(b);
    	return new String(b, "UTF8");
    }
    // 读buffer时,先读len
    public byte[] readBuffer(String tag) throws IOException {
        int len = readInt(tag);
        if (len == -1) return null;
        checkLength(len);
        byte[] arr = new byte[len];
        in.readFully(arr);
        return arr;
    }
    // 读record时,此record使用当前反序列化器处理
    public void readRecord(Record r, String tag) throws IOException {
        r.deserialize(this, tag);
    }
    // 开始结束record时都不做处理
    public void startRecord(String tag) throws IOException {}
    
    public void endRecord(String tag) throws IOException {}
    // 开始集合时,先读长度
    public Index startVector(String tag) throws IOException {
        int len = readInt(tag);
        if (len == -1) {
        	return null;
        }
		return new BinaryIndex(len);
    }
    
    public void endVector(String tag) throws IOException {}
    // 开始map时,先读长度
    public Index startMap(String tag) throws IOException {
        return new BinaryIndex(readInt(tag));
    }
    
    public void endMap(String tag) throws IOException {}
}

六、DataNode

DataNode节点数据为zookeeper维护的核心数据,它由顶级DataTree持有,下面举例它的序列化和反序列实现。

public class DataNode implements Record {

    byte data[];

    Long acl;

    public StatPersisted stat;

    synchronized public void deserialize(InputArchive archive, String tag)
            throws IOException {
        archive.startRecord("node");
        data = archive.readBuffer("data");
        acl = archive.readLong("acl");
        stat = new StatPersisted();
        stat.deserialize(archive, "statpersisted");
        archive.endRecord("node");
    }

    synchronized public void serialize(OutputArchive archive, String tag)
            throws IOException {
        archive.startRecord(this, "node");
        archive.writeBuffer(data, "data");
        archive.writeLong(acl, "acl");
        stat.serialize(archive, "statpersisted");
        archive.endRecord(this, "node");
    }
}

相关文章