文章13 | 阅读 5140 | 点赞0
ZooKeeperServer是zookeeper提供的简单单机服务器,它的请求处理链只有PrepRequestProcessor、SyncRequestProcessor、 FinalRequestProcessor三种。它是所有zookeeper服务器父类。有只读模式的ReadOnlyZooKeeperServer,从机服务器FollowerZooKeeperServer和ObserverZooKeeperServer。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// 最小session过期时间
protected int minSessionTimeout = -1;
// 最大session过期时间
protected int maxSessionTimeout = -1;
// 服务socket监听端口
protected int listenBacklog = -1;
// sessionTracker
protected SessionTracker sessionTracker;
// FileTxnSnapLog
private FileTxnSnapLog txnLogFactory = null;
// 响应缓存
private ResponseCache readResponseCache;
private ZKDatabase zkDb;
// 构造函数,初始化所有持有的对象,但只有running时,才开始启动对客户端请求端口的监听
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog,
ZKDatabase zkDb, String initialConfig) {
// 服务器状态对象
serverStats = new ServerStats(this);
// FileTxnSnapLog,提供对事务日志及dataTree的操作
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
// session心跳时间
this.tickTime = tickTime;
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
// 客户端监听端口
this.listenBacklog = clientPortListenBacklog;
listener = new ZooKeeperServerListenerImpl(this);
// 响应缓存
readResponseCache = new ResponseCache();
connThrottle = new BlueThrottle();
}
}
实现SessionExpirer提供对session过期的操作方法如下。
public class ZooKeeperServer implements SessionExpirer {
// expire session
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
// 获取sessionId,关闭
close(sessionId);
}
private void close(long sessionId) {
// 构造Request请求,并提交请求给处理器执行
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
setLocalSessionFlag(si);
submitRequest(si);
}
// 单机模式,默认服务器id为0
public long getServerId() {
return 0;
}
}
实现ServerStats.Provider,提供服务状态相关方法。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// 获取数据目录大小
public long getDataDirSize() {
if (zkDb == null) {
return 0L;
}
// 获取数据文件目录
File path = zkDb.snapLog.getDataDir();
// 递归加上每个文件大小
return getDirSize(path);
}
// 获取日志目录大小
public long getLogDirSize() {
if (zkDb == null) {
return 0L;
}
File path = zkDb.snapLog.getSnapDir();
return getDirSize(path);
}
// 递归加上每个文件大小
private long getDirSize(File file) {
long size = 0L;
if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null) {
for (File f : files) {
size += getDirSize(f);
}
}
} else {
size = file.length();
}
return size;
}
// 服务器活跃连接数
public int getNumAliveConnections() {
int numAliveConnections = 0;
if (serverCnxnFactory != null) {
// 获取serverCnxnFactory的活跃连接数
numAliveConnections += serverCnxnFactory.getNumAliveConnections();
}
if (secureServerCnxnFactory != null) {
numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
}
return numAliveConnections;
}
// 返回dataTree的最大处理了的zxid
public long getLastProcessedZxid() {
return zkDb.getDataTreeLastProcessedZxid();
}
// 返回已经被处理过的请求个数
public long getOutstandingRequests() {
return getInProcess();
}
}
startdata是zookeeperServer的初始化方法,它初始化ZKDatabase并且加载dataDir目录下的文件至DataTree内存中。
public void startdata() throws IOException, InterruptedException {
//核对zkDb是否为null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
// 初始化DataTree数据
if (!zkDb.isInitialized()) {
loadData();
}
}
public void loadData() throws IOException, InterruptedException {
// 一个leader重新lead时,也会调用此方法初始化数据,但是leader在
// 选取时为了拿到自己最大的zxid已经初始化过一次数据,所以使用
// initialize变量来区分
if(zkDb.isInitialized()){
// zxid为zkDb内存中的dataTree的lastProcessedZxid
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
// zxid为zkDb重新从文件中加载反序列化为dataTree的zxid
setZxid(zkDb.loadDataBase());
}
// 清除已经过期的sessions
List<Long> deadSessions = new ArrayList<>();
for (Long session : zkDb.getSessions()) {
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
for (long session : deadSessions) {
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
// 并且拍一个快照至文件中
takeSnapshot();
}
public void takeSnapshot() {
takeSnapshot(false);
}
// 调用txnLogFactory的save方法,将当前的dataTree和sessions重新保存至文件中
public void takeSnapshot(boolean syncSnap){
long start = Time.currentElapsedTime();
try {
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
// so we need to exit
System.exit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
}
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Snapshot taken in " + elapsed + " ms");
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
}
startup初始化在startdata方法之后,它启动SessionTracker,提供对session是否过期的监控,设置请求处理链,注册JMX,启动jvm监控器等操作。
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
// 启动SessionTracker
startSessionTracker();
// 设置请求处理链
setupRequestProcessors();
// 注册JMX
registerJMX();
// 启动jvm监控器
startJvmPauseMonitor();
registerMetrics();
// 设置初始化完成,RUNNING状态
setState(State.RUNNING);
// 告知其他等待线程,可以操作
// 如submit方法,需要处理链初始化后才能执行
notifyAll();
}
// 设置请求处理链,单机模式server处理链顺序为PrepRequestProcessor,
// SyncRequestProcessor,FinalRequestProcessor
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
processConnectRequest为服务连接NIOServerCnxn或NettyServerCnxn调用,提供给zookeeperServer根据ServerCnxn,处理相关session。
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
throws IOException, ClientCnxnLimitException {
if (connThrottle.checkLimit(1) == false) {
throw new ClientCnxnLimitException();
}
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
// 获取BinaryInputArchive流
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
// 反序列化客户端流数据为连接请求对象ConnectRequest
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
boolean readOnly = false;
try {
// 新版本获取是否只读readOnly
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
// 旧版本没有,忽略错误
}
// 如果客户端请求只读,当前却不是只读服务器,抛错
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client "
+ cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg);
}
// 请求的zxid大于当前服务器的zxid,则抛错,应该请求其他服务器
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg);
}
// 获取sessionTimeout
int sessionTimeout = connReq.getTimeOut();
// 获取passwd
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// 初始化session后才开始连接接受数据功能
cnxn.disableRecv();
// 没有sessionId,创建session
long sessionId = connReq.getSessionId();
if (sessionId == 0) {
long id = createSession(cnxn, passwd, sessionTimeout);
LOG.debug("Client attempting to establish new session:" +
" session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
// 有clientSessionId
long clientSessionId = connReq.getSessionId();
// 先关闭以前session
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId);
}
cnxn.setSessionId(sessionId);
// 再重新打开session
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}
}
// 重新打开session
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+ " for session 0x" + Long.toHexString(sessionId));
// 完成session初始化,返回给客户端响应数据
finishSessionInit(cnxn, false);
}
}
// 完成session初始化,返回给客户端响应数据
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
try {
// 创建ConnectResponse连接响应对象
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
// 序列化至内存bos中
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
// 通过当前连接发送出去
cnxn.sendBuffer(bb);
if (valid) {
// 验证通过,接受其他请求
cnxn.enableRecv();
} else {
// 验证不通过,发送关闭连接请求
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
cnxn.close();
}
}
processPacket方法处理来自ServerCnxn的请求数据ByteBuffer。
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// 先反序列化请求头对象RequestHeader
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
// 如果请求类型是授权
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
// 反序列化为AuthPacket对象
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
// 获取授权scheme
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
} catch(RuntimeException e) {
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: " + scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
// 生成ReplyHeader授权信息并返回
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
} else {
// 如果是其他的请求类型,则生成Request对象
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
setLocalSessionFlag(si);
// 提交请求
submitRequest(si);
return;
}
}
// submitRequest请求,交由请求处理链处理
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// 因为所有的请求处理链都是在INITIAL之后开始设置的,
// 所以我们需要等待处理链初始化好了之后再处理
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
// 处理链设置好了,firstProcessor仍然为null,异常
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
// touch它的连接
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33513250/article/details/103919583
内容来源于网络,如有侵权,请联系作者删除!