(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为 LOOKING;
(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1) 大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING; LOOKING
4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
(5)服务器5启动,同4一样当小弟。
(1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。
选举Leader规则:
****①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出
如果不是使用Docker搭建的zookeeper集群,那么每次启动zookeeper集群会非常繁琐,因此一般使用启停脚本,快速完成zookeeper集群的启停
1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
vim zk.sh
在脚本中编写如下内容
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh
start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh
stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh
status"
done
};;
esac
2)增加脚本执行权限
chmod u+x zk.sh
3)Zookeeper 集群启动脚本
zk.sh start
4)Zookeeper 集群停止脚本
zk.sh stop
1)启动客户端
bin/zkCli.sh -server hadoop102:2181
2)显示所有操作命令
help
ls /
默认顶层有一个zookeeper节点
ls / -s
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
(1)czxid:创建节点的事务 zxid
每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所
有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之
前发生。
(2)ctime:znode 被创建的毫秒数(从 1970 年开始)
(3)mzxid:znode 最后更新的事务 zxid
(4)mtime:znode 最后修改的毫秒数(从 1970 年开始)
(5)pZxid:znode 最后更新的子节点 zxid
(6)cversion:znode 子节点变化号,znode 子节点修改次数(7)dataversion:znode 数据变化号
(8)aclVersion:znode 访问控制列表的变化号
(9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是
临时节点则是 0。(10)dataLength:znode 的数据长度
(11)numChildren:znode 子节点数量
create /sanguo "diaochan"
Created /sanguo
create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
注意:创建节点时,要赋值
get /sanguo -s
diaochan
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000003
mtime = Wed Aug 29 00:03:23 CST 2018
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 1
get /sanguo/shuguo -s
liubei
cZxid = 0x100000004
ctime = Wed Aug 29 00:04:35 CST 2018
mZxid = 0x100000004
mtime = Wed Aug 29 00:04:35 CST 2018
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
(1)先创建一个普通的根节点/sanguo/weiguo
create /sanguo/weiguo "caocao"
Created /sanguo/weiguo
(2)创建带序号的节点
create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000000
create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000001
create -s /sanguo/weiguo/xuchu "xuchu"
Created /sanguo/weiguo/xuchu0000000002
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
(1)创建短暂的不带序号的节点
create -e /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo
(2)创建短暂的带序号的节点
create -e -s /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo0000000001
(3)在当前客户端是能查看到的
ls /sanguo
[wuguo, wuguo0000000001, shuguo]
(4)退出当前客户端然后再重启客户端
quit
bin/zkCli.sh
(5)再次查看根目录下短暂节点已经删除
ls /sanguo
[shuguo]
set /sanguo/weiguo "simayi"
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。
1)首先要有一个main()线程
2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给Zookeeper。
4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6)listener线程内部调用了process()方法。
(1)在 hadoop104 主机上注册监听/sanguo 节点数据变化
get -w /sanguo
(2)在 hadoop103 主机上修改/sanguo 节点的数据
set /sanguo "xisi"
(3)观察 hadoop104 主机收到数据变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged
path:/sanguo
注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册 一次,只能监听一次。想再次监听,需要再次注册。
(1)在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
ls -w /sanguo
[shuguo, weiguo]
(2)在 hadoop103 主机/sanguo 节点上创建子节点
create /sanguo/jin "simayi"
Created /sanguo/jin
(3)观察 hadoop104 主机收到子节点变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged
path:/sanguo
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
1)删除节点
delete /sanguo/jin
2)递归删除节点
deleteall /sanguo/shuguo
3)查看节点状态
stat /sanguo
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000011
mtime = Wed Aug 29 00:21:23 CST 2018
pZxid = 0x100000014
cversion = 9
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1
前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
创建 ZooKeeper 客户端
@Before
public void init() throws Exception {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//zookeeper注册一次,就监听一次
@Override
public void process(WatchedEvent watchedEvent) {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "--"
+ watchedEvent.getPath());
// 再次启动监听
List<String> children = null;
try {
children = zooKeeper.getChildren("/",true);
for (String child : children) {
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Test
public void create() throws InterruptedException, KeeperException {
//第三个参数是权限
//OPEN_ACL_UNSAFE:任何人都可以访问
//第四个参数: 创建持久节点
// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
zooKeeper.create("/dhy","xpylikedhy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 获取子节点
@Test
public void getChildren() throws Exception {
//获取某个路径下的孩子,是否开启监听
//注册一次监听,只会生效一次
List<String> children = zooKeeper.getChildren("/",true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
// 判断 znode 是否存在
@Test
public void exist() throws Exception {
//是否存在/dhy节点,不进行监听
Stat stat = zooKeeper.exists("/dhy", false);
System.out.println(stat == null ? "not exist" : "exist");
}
C:\Windows\System32\drivers\etc
public class zkClient
{
//这里不能加空格
//必须在本机的host文件中做了域名和ip的映射
//否则这里不能使用zoo1代替19.168.112.11的实际ip
private static String connectString="192.168.112.128:2181";
private int sessionTimeout=5000;
private ZooKeeper zooKeeper;
@Before
public void init() throws Exception {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//zookeeper注册一次,就监听一次
@Override
public void process(WatchedEvent watchedEvent) {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "--"
+ watchedEvent.getPath());
// 再次启动监听
List<String> children = null;
try {
children = zooKeeper.getChildren("/",true);
for (String child : children) {
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Test
public void create() throws InterruptedException, KeeperException {
//第三个参数是权限
//OPEN_ACL_UNSAFE:任何人都可以访问
//第四个参数: 创建持久节点
// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
String nodeCreate = zooKeeper.create("/dhy", "xpylikedhy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 获取子节点
@Test
public void getChildren() throws Exception {
//获取某个路径下的孩子,是否开启监听
//注册一次监听,只会生效一次
List<String> children = zooKeeper.getChildren("/",true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
// 判断 znode 是否存在
@Test
public void exist() throws Exception {
//是否存在/dhy节点,不进行监听
Stat stat = zooKeeper.exists("/dhy", false);
System.out.println(stat == null ? "not exist" : "exist");
}
}
1、首先客户端向zookeeper注册中心已经在监听的服务器写数据
2、如果此时该客户端所连接的服务器不是leader,那么接收到数据的server就会将该请求转发给集群中的leader,而担当leader的服务器又会进一步将数据广播给集群中的其他follower,让所有的follower都将数据写入自己的服务器中
3、每台follower写成功后就会通知给leader
4、如果leader接收到了半数以上的通知时,就表示数据已经写成功了,这时leader就会通知给原本给他转发数据的那台服务器:数据已经写成功了
5、服务器又会将成功通知发送给客户端,整个过程也就结束了。
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
服务器去zk里面创建节点,并存储数据,数据记录自己的主机名,当前连接数等等信息
客户端获取节点,如果节点不存在,表名对应的服务器下线
create /servers "servers"
Created /servers
向zk中注册一台服务器,其实就是创建一个节点,然后传入主机名作为节点保存的数据,这个节点通常是临时带序号的
每启动一台服务器就传入对应的主机名称,向服务器中进行注册
public class DistributeServer {
private static String connectString =
"hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建zk连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 注册服务器
public void registServer(String hostname) throws Exception{
String create = zk.create(
parentNode + "/"+hostname,,
hostname.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
//创建临时有序号的节点
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online "+ create);
}
// 业务功能
public void business(String hostname) throws Exception{
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 连接注册服务器信息
server.registServer(args[0]);
// 3 启动业务功能
server.business(args[0]);
}
}
public class DistributeClient {
private static String connectString =
"hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent event) {
// 再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 获取服务器列表信息
public void getServerList() throws Exception {
// 1 获取服务器子节点信息,并且对父节点进行监听
//这里如果自己new一个watcher,那么就会走自己new的watcher,而不是getConnect()里面的watcher了
//如果是true,则表明使用getConnect()方法里面的watcher,这样可以实现持续监听
List<String> children = zk.getChildren(parentNode, true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
//不进行监听
byte[] data = zk.getData(parentNode + "/" + child,
false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception {
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 3 业务进程启动
client.business();
}
}
(1)启动 DistributeClient 客户端
(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102"
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"
(3)观察 Idea 控制台变化
[hadoop102, hadoop103]
(4)执行删除操作
[zk: localhost:2181(CONNECTED) 8] delete
/servers/hadoop1020000000000
(5)观察 Idea 控制台变化
[hadoop103]
(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)
(2)启动 DistributeServer 服务
①点击 Edit Configurations…
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
③回到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run “DistributeServer.main()”
④观察 DistributeServer 控制台,提示 hadoop102 is working
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/m0_53157173/article/details/120732024
内容来源于网络,如有侵权,请联系作者删除!