文章24 | 阅读 12822 | 点赞0
然后是RaftConsistencyServiceImpl CP强制一致性实现。
同样是put方法:
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
然后是raftCore.signalPublish:
public void signalPublish(String key, Record value) throws Exception {
//不是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
//交给leader去做/v1/ns/raft/datum
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
try {
// 是leader
// 加锁
OPERATE_LOCK.lock();
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//发布数据改变通知 peers是所有节点集合. peers.local获取本机
onPublish(datum, peers.local());
final String content = json.toString();
// 过半同步成功才会响应,也就是说put操作需要过半同步成功,强一致性 CP模型
//只要过半的结点数
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//遍历所有结点
for (final String server : peers.allServersIncludeMyself()) {
//自己算一次
if (isLeader(server)) {
latch.countDown();
continue;
}
// /v1/ns/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
// 异步同步数据
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
// 处理完成coundDown
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半数完成 还有个5秒超时时间
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
// 解锁
OPERATE_LOCK.unlock();
}
}
此时有两个分支,当前节点是leader的情况,和当前节点不是leader的情况。
此时通过raftProxy.proxyPostLarge将消息转发给leader,RaftController的publish方法接收:
@PostMapping("/datum")
public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
String key = json.get("key").asText();
if (KeyBuilder.matchInstanceListKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Instances.class));
return "ok";
}
if (KeyBuilder.matchSwitchKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), SwitchDomain.class));
return "ok";
}
if (KeyBuilder.matchServiceMetaKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Service.class));
return "ok";
}
throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
}
可以看到就是调用raftConsistencyService.put方法,回到最初开始的地方了。
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
//不是leader不能干这个事
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
//过时了
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
//重置任期
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
//放入数据
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
//添加任务
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
这里做了几件事
集群下的所有节点都同步一下,调用/v1/ns/raft/datum/commit接口,CountDownLatch的数量是集群数量/2+1,这样保证半数通过。
final String content = json.toString();
// 过半同步成功才会响应,也就是说put操作需要过半同步成功,强一致性 CP模型
//只要过半的结点数
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//遍历所有结点
for (final String server : peers.allServersIncludeMyself()) {
//自己算一次
if (isLeader(server)) {
latch.countDown();
continue;
}
// /v1/ns/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
//同步数据
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
// 处理完成coundDown
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半数完成 还有个5秒超时时间
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
我们看一下同步数据的接收方,RaftController的onPublish方法:
@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode jsonObject = JacksonUtils.toObj(value);
String key = "key";
RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
JsonNode datumJson = jsonObject.get("datum");
Datum datum = null;
if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
});
} else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
});
} else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
});
}
// 还是调用RaftConsistencyServiceImpl的onPublish
raftConsistencyService.onPut(datum, source);
return "ok";
}
调用raftConsistencyService.onPut方法
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onPublish(datum, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onPut failed.", e);
throw new NacosException(NacosException.SERVER_ERROR,
"Raft onPut failed, datum:" + datum + ", source: " + source, e);
}
}
然后是raftCore.onPublish方法,回到上面了。
AP和CP模式下的数据一致性同步我们分析完了。对照着流程图看还是很容易搞明白的。下面我们开始讨论nacos的raft选举。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112469966
内容来源于网络,如有侵权,请联系作者删除!