zookeeper源码分析之ZooKeeperServer

x33g5p2x  于2021-12-20 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(466)

ZooKeeperServer是zookeeper提供的简单单机服务器,它的请求处理链只有PrepRequestProcessor、SyncRequestProcessor、 FinalRequestProcessor三种。它是所有zookeeper服务器父类。有只读模式的ReadOnlyZooKeeperServer,从机服务器FollowerZooKeeperServer和ObserverZooKeeperServer。

一、构造函数

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {

    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    // 最小session过期时间
    protected int minSessionTimeout = -1;
    // 最大session过期时间
    protected int maxSessionTimeout = -1;
    // 服务socket监听端口
    protected int listenBacklog = -1;
    // sessionTracker
    protected SessionTracker sessionTracker;
    // FileTxnSnapLog
    private FileTxnSnapLog txnLogFactory = null;
    // 响应缓存
    private ResponseCache readResponseCache;
    private ZKDatabase zkDb;

    // 构造函数,初始化所有持有的对象,但只有running时,才开始启动对客户端请求端口的监听
    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog,
            ZKDatabase zkDb, String initialConfig) {
	// 服务器状态对象
        serverStats = new ServerStats(this);
	// FileTxnSnapLog,提供对事务日志及dataTree的操作
        this.txnLogFactory = txnLogFactory;
        this.txnLogFactory.setServerStats(this.serverStats);
        this.zkDb = zkDb;
	// session心跳时间
        this.tickTime = tickTime;
        setMinSessionTimeout(minSessionTimeout);
        setMaxSessionTimeout(maxSessionTimeout);
	// 客户端监听端口
        this.listenBacklog = clientPortListenBacklog;

        listener = new ZooKeeperServerListenerImpl(this);
	// 响应缓存
        readResponseCache = new ResponseCache();

        connThrottle = new BlueThrottle();
    }
}

二、SessionExpirer

实现SessionExpirer提供对session过期的操作方法如下。

public class ZooKeeperServer implements SessionExpirer {

    // expire session
    public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
	// 获取sessionId,关闭
        close(sessionId);
    }

    private void close(long sessionId) {
        // 构造Request请求,并提交请求给处理器执行
        Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
        setLocalSessionFlag(si);
        submitRequest(si);
    }

    // 单机模式,默认服务器id为0
    public long getServerId() {
        return 0;
    }
}

三、ServerStats.Provider

实现ServerStats.Provider,提供服务状态相关方法。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {

    // 获取数据目录大小
    public long getDataDirSize() {
        if (zkDb == null) {
            return 0L;
        }
	// 获取数据文件目录
        File path = zkDb.snapLog.getDataDir();
	// 递归加上每个文件大小
        return getDirSize(path);
    }

    // 获取日志目录大小
    public long getLogDirSize() {
        if (zkDb == null) {
            return 0L;
        }
        File path = zkDb.snapLog.getSnapDir();
        return getDirSize(path);
    }

    // 递归加上每个文件大小
    private long getDirSize(File file) {
        long size = 0L;
        if (file.isDirectory()) {
            File[] files = file.listFiles();
            if (files != null) {
                for (File f : files) {
                    size += getDirSize(f);
                }
            }
        } else {
            size = file.length();
        }
        return size;
    }

    // 服务器活跃连接数
    public int getNumAliveConnections() {
        int numAliveConnections = 0;
        if (serverCnxnFactory != null) {
	    // 获取serverCnxnFactory的活跃连接数
            numAliveConnections += serverCnxnFactory.getNumAliveConnections();
        }
        if (secureServerCnxnFactory != null) {
            numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
        }
        return numAliveConnections;
    }

    // 返回dataTree的最大处理了的zxid
    public long getLastProcessedZxid() {
        return zkDb.getDataTreeLastProcessedZxid();
    }

    // 返回已经被处理过的请求个数
    public long getOutstandingRequests() {
        return getInProcess();
    }
}

四、startdata

startdata是zookeeperServer的初始化方法,它初始化ZKDatabase并且加载dataDir目录下的文件至DataTree内存中。

public void startdata() throws IOException, InterruptedException {
        //核对zkDb是否为null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }
	// 初始化DataTree数据
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }

    public void loadData() throws IOException, InterruptedException {
        // 一个leader重新lead时,也会调用此方法初始化数据,但是leader在
	// 选取时为了拿到自己最大的zxid已经初始化过一次数据,所以使用
	// initialize变量来区分
        if(zkDb.isInitialized()){
	    // zxid为zkDb内存中的dataTree的lastProcessedZxid
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
	    // zxid为zkDb重新从文件中加载反序列化为dataTree的zxid
            setZxid(zkDb.loadDataBase());
        }

        // 清除已经过期的sessions
        List<Long> deadSessions = new ArrayList<>();
        for (Long session : zkDb.getSessions()) {
            if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                deadSessions.add(session);
            }
        }

        for (long session : deadSessions) {
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }

        // 并且拍一个快照至文件中
        takeSnapshot();
    }

    public void takeSnapshot() {
        takeSnapshot(false);
    }

    // 调用txnLogFactory的save方法,将当前的dataTree和sessions重新保存至文件中
    public void takeSnapshot(boolean syncSnap){
        long start = Time.currentElapsedTime();
        try {
            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
        } catch (IOException e) {
            LOG.error("Severe unrecoverable error, exiting", e);
            // This is a severe error that we cannot recover from,
            // so we need to exit
            System.exit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
        }
        long elapsed = Time.currentElapsedTime() - start;
        LOG.info("Snapshot taken in " + elapsed + " ms");
        ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
    }

五、startup

startup初始化在startdata方法之后,它启动SessionTracker,提供对session是否过期的监控,设置请求处理链,注册JMX,启动jvm监控器等操作。

public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
	// 启动SessionTracker
        startSessionTracker();
	// 设置请求处理链
        setupRequestProcessors();
	// 注册JMX
        registerJMX();
	// 启动jvm监控器
        startJvmPauseMonitor();

        registerMetrics();
	// 设置初始化完成,RUNNING状态
        setState(State.RUNNING);
	// 告知其他等待线程,可以操作
	// 如submit方法,需要处理链初始化后才能执行
        notifyAll();
    }

    // 设置请求处理链,单机模式server处理链顺序为PrepRequestProcessor,
    // SyncRequestProcessor,FinalRequestProcessor
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

六、processConnectRequest

processConnectRequest为服务连接NIOServerCnxn或NettyServerCnxn调用,提供给zookeeperServer根据ServerCnxn,处理相关session。

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
        throws IOException, ClientCnxnLimitException {

        if (connThrottle.checkLimit(1) == false) {
            throw new ClientCnxnLimitException();
        }
        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
	// 获取BinaryInputArchive流
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
	// 反序列化客户端流数据为连接请求对象ConnectRequest
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect");

        ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
        boolean readOnly = false;
        try {
	    // 新版本获取是否只读readOnly
            readOnly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
            // 旧版本没有,忽略错误
        }
	// 如果客户端请求只读,当前却不是只读服务器,抛错
        if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
            String msg = "Refusing session request for not-read-only client "
                + cnxn.getRemoteSocketAddress();
            LOG.info(msg);
            throw new CloseRequestException(msg);
        }
	// 请求的zxid大于当前服务器的zxid,则抛错,应该请求其他服务器
        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
            String msg = "Refusing session request for client "
                + cnxn.getRemoteSocketAddress()
                + " as it has seen zxid 0x"
                + Long.toHexString(connReq.getLastZxidSeen())
                + " our last zxid is 0x"
                + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                + " client must try another server";

            LOG.info(msg);
            throw new CloseRequestException(msg);
        }
	// 获取sessionTimeout
        int sessionTimeout = connReq.getTimeOut();
	// 获取passwd
        byte passwd[] = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // 初始化session后才开始连接接受数据功能
        cnxn.disableRecv();
	// 没有sessionId,创建session
        long sessionId = connReq.getSessionId();
        if (sessionId == 0) {
            long id = createSession(cnxn, passwd, sessionTimeout);
            LOG.debug("Client attempting to establish new session:" +
                            " session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                    Long.toHexString(id),
                    Long.toHexString(connReq.getLastZxidSeen()),
                    connReq.getTimeOut(),
                    cnxn.getRemoteSocketAddress());
        } else {
	    // 有clientSessionId
            long clientSessionId = connReq.getSessionId();
	    // 先关闭以前session
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId);
            }
            cnxn.setSessionId(sessionId);
	    // 再重新打开session
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
            ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);

        }
    }

    // 重新打开session
    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
            int sessionTimeout) throws IOException {
        if (checkPasswd(sessionId, passwd)) {
            revalidateSession(cnxn, sessionId, sessionTimeout);
        } else {
            LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
                    + " for session 0x" + Long.toHexString(sessionId));
            // 完成session初始化,返回给客户端响应数据
            finishSessionInit(cnxn, false);
        }
    }

    // 完成session初始化,返回给客户端响应数据
    public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
        try {
	    // 创建ConnectResponse连接响应对象
            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            bos.writeInt(-1, "len");
	    // 序列化至内存bos中
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(
                        this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
            bb.putInt(bb.remaining() - 4).rewind();
	    // 通过当前连接发送出去
            cnxn.sendBuffer(bb);

            if (valid) {
	        // 验证通过,接受其他请求
                cnxn.enableRecv();
            } else {
	        // 验证不通过,发送关闭连接请求
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            }
        } catch (Exception e) {
            LOG.warn("Exception while establishing session, closing", e);
            cnxn.close();
        }
    }

七、processPacket

processPacket方法处理来自ServerCnxn的请求数据ByteBuffer。

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // 先反序列化请求头对象RequestHeader
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");
        incomingBuffer = incomingBuffer.slice();
	// 如果请求类型是授权
        if (h.getType() == OpCode.auth) {
            LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
	    // 反序列化为AuthPacket对象
            AuthPacket authPacket = new AuthPacket();
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
	    // 获取授权scheme
            String scheme = authPacket.getScheme();
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if(ap != null) {
                try {
                    // handleAuthentication may close the connection, to allow the client to choose
                    // a different server to connect to.
                    authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
                } catch(RuntimeException e) {
                    authReturn = KeeperException.Code.AUTHFAILED;
                }
            }
            if (authReturn == KeeperException.Code.OK) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Authentication succeeded for scheme: " + scheme);
                }
                LOG.info("auth success " + cnxn.getRemoteSocketAddress());
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.OK.intValue());
		// 生成ReplyHeader授权信息并返回
                cnxn.sendResponse(rh, null, null);
            } else {
                if (ap == null) {
                    LOG.warn("No authentication provider for scheme: "
                            + scheme + " has "
                            + ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: " + scheme);
                }
                // send a response...
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.AUTHFAILED.intValue());
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                cnxn.disableRecv();
            }
            return;
        } else if (h.getType() == OpCode.sasl) {
            Record rsp = processSasl(incomingBuffer,cnxn);
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
            cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
            return;
        } else {
	// 如果是其他的请求类型,则生成Request对象
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
              h.getType(), incomingBuffer, cnxn.getAuthInfo());
            si.setOwner(ServerCnxn.me);
            setLocalSessionFlag(si);
	    // 提交请求
            submitRequest(si);
            return;
        }
    }

    // submitRequest请求,交由请求处理链处理
    public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // 因为所有的请求处理链都是在INITIAL之后开始设置的,
		    // 所以我们需要等待处理链初始化好了之后再处理
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
		// 处理链设置好了,firstProcessor仍然为null,异常
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
	    // touch它的连接
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

相关文章