文章24 | 阅读 13294 | 点赞0
首先nacos服务端分为集群部署和单机部署两种模式,我们以单机部署为例。
具体部署方式参考官方文档。链接
这里默认启动本地单机模式服务端。
我们以com.alibaba.nacos.example.ConfigExample这个测试类进行分析。
public static void main(String[] args) throws NacosException, InterruptedException {
String serverAddr = "localhost";
String dataId = "test";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
System.out.println(isPublishOk);
Thread.sleep(3000);
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
boolean isRemoveOk = configService.removeConfig(dataId, group);
System.out.println(isRemoveOk);
Thread.sleep(3000);
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
Thread.sleep(300000);
}
看一下如何得到ConfigService实例, ConfigService configService = NacosFactory.createConfigService(properties); 这行代码基本上包含了所有客户端需要做的事情。
public static ConfigService createConfigService(Properties properties) throws NacosException {
return ConfigFactory.createConfigService(properties);
}
里面调用ConfigFactory工厂类获得实例对象:
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
下面看一下NacosConfigService这个的构造方法:
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
// ServerHttpAgent http代理
// MetricsHttpAgent 又代理了一次
// 另外,屏蔽了集群逻辑。提供的方法只是统一的http调用。
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
MetricsHttpAgent类实际上就是对ServerHttpAgent进行代理,包装了一层prometheus用于指标采集,以httpGet为例:
@Override
public HttpRestResult<String> httpGet(String path, Map<String, String> headers, Map<String, String> paramValues,
String encode, long readTimeoutMs) throws Exception {
Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("GET", path, "NA");
HttpRestResult<String> result;
try {
result = httpAgent.httpGet(path, headers, paramValues, encode, readTimeoutMs);
} catch (IOException e) {
throw e;
} finally {
timer.observeDuration();
timer.close();
}
return result;
}
主要是针对时间数据采集。
我们看ServerHttpAgent类:
public ServerHttpAgent(Properties properties) throws NacosException {
// 集群管理类
this.serverListMgr = new ServerListManager(properties);
// 安全认证
this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE);
// 命名空间
this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE);
// 初始化配置 encoding、maxRetry、ak、sk
// ak、sk在登录认证时未用到。发送get、post、delete请求时需要验证
init(properties);
// 登录认证
this.securityProxy.login(this.serverListMgr.getServerUrls());
// init executorService
// daemon线程
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.config.security.updater");
t.setDaemon(true);
return t;
}
});
// 5秒一次的登录认证(会校验是否在token窗口内,如果不在,则重新获取token),刷新token和token窗口
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
securityProxy.login(serverListMgr.getServerUrls());
}
}, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
}
我们看一下securityProxy.login:
public boolean login(List<String> servers) {
try {
// token刷新窗口范围内,返回true
if ((System.currentTimeMillis() - lastRefreshTime) < TimeUnit.SECONDS
.toMillis(tokenTtl - tokenRefreshWindow)) {
return true;
}
// 集群每个server进行登录验证
for (String server : servers) {
if (login(server)) {
lastRefreshTime = System.currentTimeMillis();
return true;
}
}
} catch (Throwable ignore) {
}
return false;
}
这里有个时间窗口。如果当前任务执行的时间还在token窗口内,说明token还没有过期,此时不需要重新认证,否则重新登录认证并更新lastRefreshTime。
public boolean login(String server) throws UnsupportedEncodingException {
if (StringUtils.isNotBlank(username)) {
Map<String, String> params = new HashMap<String, String>(2);
Map<String, String> bodyMap = new HashMap<String, String>(2);
params.put("username", username);
bodyMap.put("password", URLEncoder.encode(password, "utf-8"));
String url = "http://" + server + contextPath + LOGIN_URL;
if (server.contains(Constants.HTTP_PREFIX)) {
url = server + contextPath + LOGIN_URL;
}
try {
HttpRestResult<String> restResult = nacosRestTemplate
.postForm(url, Header.EMPTY, Query.newInstance().initParams(params), bodyMap, String.class);
if (!restResult.ok()) {
SECURITY_LOGGER.error("login failed: {}", JacksonUtils.toJson(restResult));
return false;
}
JsonNode obj = JacksonUtils.toObj(restResult.getData());
if (obj.has(Constants.ACCESS_TOKEN)) {
accessToken = obj.get(Constants.ACCESS_TOKEN).asText();
tokenTtl = obj.get(Constants.TOKEN_TTL).asInt();
tokenRefreshWindow = tokenTtl / 10;
}
} catch (Exception e) {
SECURITY_LOGGER.error("[SecurityProxy] login http request failed"
+ " url: {}, params: {}, bodyMap: {}, errorMsg: {}", url, params, bodyMap, e.getMessage());
return false;
}
}
return true;
}
最终的登录认证实现,url是http://server/contextPath/v1/auth/users/login
nacosRestTemplate是在初始化securityProxy时构造的:
private static final NacosRestTemplate NACOS_RESTTEMPLATE = ConfigHttpClientManager.getInstance()
.getNacosRestTemplate();
private static final NacosRestTemplate NACOS_REST_TEMPLATE;
static {
NACOS_REST_TEMPLATE = HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
NACOS_REST_TEMPLATE.getInterceptors().add(new LimiterHttpClientRequestInterceptor());
}
主要是加了一个拦截器。
对象的来源由HttpClientBeanHolder.getNacosRestTemplate得到,最终是:
com.alibaba.nacos.common.http.AbstractHttpClientFactory#createNacosRestTemplate
@Override
public NacosRestTemplate createNacosRestTemplate() {
HttpClientConfig httpClientConfig = buildHttpClientConfig();
final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
// enable ssl
initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
@Override
public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
clientRequest.setSSLContext(loadSSLContext());
clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
}
}, new TlsFileWatcher.FileChangeListener() {
@Override
public void onChanged(String filePath) {
clientRequest.setSSLContext(loadSSLContext());
}
});
return new NacosRestTemplate(assignLogger(), clientRequest);
}
也就是requestClient的类型是JdkHttpClientRequest。
我们以get方法为例:com.alibaba.nacos.common.http.client.NacosRestTemplate#get(java.lang.String, com.alibaba.nacos.common.http.param.Header, com.alibaba.nacos.common.http.param.Query, java.lang.reflect.Type)
public <T> HttpRestResult<T> get(String url, Header header, Query query, Type responseType) throws Exception {
return execute(url, HttpMethod.GET, new RequestHttpEntity(header, query), responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}
执行requestClient的execute方法:
public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity)
throws Exception {
final Object body = requestHttpEntity.getBody();
final Header headers = requestHttpEntity.getHeaders();
replaceDefaultConfig(requestHttpEntity.getHttpClientConfig());
HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();
Map<String, String> headerMap = headers.getHeader();
if (headerMap != null && headerMap.size() > 0) {
for (Map.Entry<String, String> entry : headerMap.entrySet()) {
conn.setRequestProperty(entry.getKey(), entry.getValue());
}
}
conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis());
conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis());
conn.setRequestMethod(httpMethod);
if (body != null) {
String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE);
String bodyStr = JacksonUtils.toJson(body);
if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {
Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class);
bodyStr = HttpUtils.encodingParams(map, headers.getCharset());
}
if (bodyStr != null) {
conn.setDoOutput(true);
byte[] b = bodyStr.getBytes();
conn.setRequestProperty("Content-Length", String.valueOf(b.length));
conn.getOutputStream().write(b, 0, b.length);
conn.getOutputStream().flush();
conn.getOutputStream().close();
}
}
conn.connect();
return new JdkHttpClientResponse(conn);
}
再往里就是jdk的了。至此nacos的客户端到服务端的网络通信模型就明确了。
回来继续看NacosConfigService的初始化,当创建完ServerHttpAgent后,紧接着调用了start方法:
@Override
public void start() throws NacosException {
serverListMgr.start();
}
就是服务管理列表的start方法:
public synchronized void start() throws NacosException {
if (isStarted || isFixed) {
return;
}
GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
// 这里不是新线程,是直接调用run方法。
for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
getServersTask.run();
try {
this.wait((i + 1) * 100L);
} catch (Exception e) {
LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
}
}
if (serverUrls.isEmpty()) {
LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
addressServerUrl);
throw new NacosException(NacosException.SERVER_ERROR,
"fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
}
// executor schedules the timer task
// 30秒执行一次任务,更新服务状态
this.executorService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
isStarted = true;
}
先run一下,然后丢掉线程池里每隔30秒再run一下
class GetServerListTask implements Runnable {
final String url;
GetServerListTask(String url) {
this.url = url;
}
@Override
public void run() {
/*
get serverlist from nameserver
*/
try {
updateIfChanged(getApacheServerList(url, name));
} catch (Exception e) {
LOGGER.error("[" + name + "][update-serverlist] failed to update serverlist from address server!", e);
}
}
}
获取服务列表,如果有变更就更新
private void updateIfChanged(List<String> newList) {
if (null == newList || newList.isEmpty()) {
LOGGER.warn("[update-serverlist] current serverlist from address server is empty!!!");
return;
}
List<String> newServerAddrList = new ArrayList<String>();
for (String server : newList) {
if (server.startsWith(HTTP) || server.startsWith(HTTPS)) {
newServerAddrList.add(server);
} else {
newServerAddrList.add(HTTP + server);
}
}
/*
no change
*/
if (newServerAddrList.equals(serverUrls)) {
return;
}
serverUrls = new ArrayList<String>(newServerAddrList);
iterator = iterator();
currentServerAddr = iterator.next();
// Using unified event processor, NotifyCenter
// 服务列表发生变更,发布事件消息
NotifyCenter.publishEvent(new ServerlistChangeEvent());
LOGGER.info("[{}] [update-serverlist] serverlist updated to {}", name, serverUrls);
}
通知中心发布服务列表变更事件ServerlistChangeEvent
总结一下:
下篇继续分析ClientWorker的创建过程
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112228975
内容来源于网络,如有侵权,请联系作者删除!