Zookeeper

官方帮助文档

ZooKeeper是一个高性能,高可用性,高可靠性的分布式应用程序的分布式,开放源代码协调服务。

zookeeper

zookeeper

zookeeper

官网测试统计,在事件③和事件⑤将Leader宕机,ZooKeeper只需不到200毫秒即可选出新的领导者。随着关注者的恢复,ZooKeeper一旦开始处理请求就能够再次提高吞吐量。

数据模型

ZooKeeper提供的名称空间与标准文件系统的名称空间非常相似。名称是由斜杠(/)分隔的一系列路径元素。ZooKeeper命名空间中的每个节点都由路径标识。

zookeeper

zookeeper

这种数据模型适用的场景:

1
2
3
4
5
6
1.统一配置管理<-  1M数据
2.分组管理 <- path结构
3.统一命名 <- sequential
4.同步 <- 临时节点
5.分布式锁 <- 临时节点
锁依托一个父节点,且具备 -s 序列,代表父节点下可以有多把锁,后面的ID节点盯着前面的ID节点

特征

zookeeper

使用

1
2
3
4
5
6
7
8
9
10
#Zookeeper需要java环境
[root@localhost zookeeper]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

[root@localhost zookeeper]# tar xf apache-zookeeper-3.5.8.tar.gz

[root@localhost zookeeper]# cd apache-zookeeper-3.5.8-bin

[root@localhost zookeeper]# cp zoo_sample.cfg zoo.cfg

[root@localhost zookeeper]# vi zoo.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# The number of milliseconds of each tick
# Leader和Follow心跳
tickTime=2000

# The number of ticks that the initial
# synchronization phase can take
# 重试 主可以忍耐 一个follow的 2000 * 10 的一个延迟,超过则认为有问题
initLimit=10

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# 同步 主和Follow 超过 2000 * 5 没有回馈,则认为有问题
syncLimit=5

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# 数据持久化目录 需要将默认目录改成其他目录
dataDir=/local/zookeeper/data

# the port at which the clients will connect
# 客户端连接Zookeeper的端口号
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients

# 最大客户端连接数
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

#由于ZK没有像Redis那种发布订阅自动识别哨兵的集群,需要人为规划ZK集群
server.1=192.168.163.128:2888:3888
server.2=192.168.163.129:2888:3888
server.3=192.168.163.130:2888:3888
server.4=192.168.163.131:2888:3888

#server.X X满足过半通过3台时,最大的就是Leader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 进入上面配置的数据持久化目录
[root@localhost zookeeper]# cd data/
# 设置myid文件 值为 1 表示这是server.1, 其他机器同理 1~4
[root@localhost data]# echo 1 > myid

[root@localhost data]# vi /etc/profile

···
export ZOOKEEPER_HOME=/home/local/zookeeper/apache-zookeeper-3.5.8-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH
···

#这样就可以直接在任何目录下使用zk的相关命令
[root@localhost data]# source /etc/profile
1
2
3
4
5
6
7
[root@localhost zookeeper]# zkServer.sh help
ZooKeeper JMX enabled by default
Using config: /home/local/zookeeper/apache-zookeeper-3.5.8/bin/../conf/zoo.cfg
Usage: /home/local/zookeeper/apache-zookeeper-3.5.8/bin/zkServer.sh [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}

#默认不阻塞启动,当前使用阻塞启动看日志
[root@localhost zookeeper]# zkServer.sh start-foreground
1
2
3
4
5
6
7
#启动顺序 128、129、130、131
#当前服务器 130 leader 因为已经启动三台,满足过半
[root@localhost ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/local/zookeeper/apache-zookeeper-3.5.8/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

假设这时候主动把130给停了,那么ZK会先比对哪台Follow的数据是最新的,如果有多台数据最新的Follow,则谁Server.X大就推选谁。

1
2
#任意一台机器链接ZK
[root@localhost ~]# zkCli.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
history
listquota path
ls [-s] [-w] [-R] path
ls2 path [watch]
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
rmr path
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path
Command not found: Command not found help
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#查看根目录
[zk: localhost:2181(CONNECTED) 2] ls /
[zookeeper]

#创建节点 默认是持久节点
[zk: localhost:2181(CONNECTED) 3] create /ooxx ""
Created /ooxx

#再次目录
[zk: localhost:2181(CONNECTED) 4] ls /
[ooxx, zookeeper]

#创建节点里的节点
[zk: localhost:2181(CONNECTED) 5] create /ooxx/xxoo ""
Created /ooxx/xxoo

#再次节点里的目录
[zk: localhost:2181(CONNECTED) 6] ls /ooxx
[xxoo]

#设置节点里的值
[zk: localhost:2181(CONNECTED) 7] set /ooxx "hello"

#获取节点里的值
[zk: localhost:2181(CONNECTED) 8] get /ooxx
hello
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#查看节点的状态
[zk: localhost:2181(CONNECTED) 11] stat /ooxx
# cZxid 有64位,前32位表示Leader的纪元,后32位表示事务ID
# ZK顺序执行,Leader维护着一个自增事务ID:00000002
# 0x2 表示的是Leader的纪元
# cZxid 的 c 表示 create,cZxid表示也就是创建节点的事务ID
cZxid = 0x200000002
# 创建时间
ctime = Fri Nov 27 22:19:42 CST 2020
# 修改节点的事务ID
mZxid = 0x200000004
mtime = Fri Nov 27 22:20:53 CST 2020
# 当前这个节点下创建的最后一个节点的ID,也就是 /ooxx/xxoo的cZxid
pZxid = 0x200000003
cversion = 1
dataVersion = 1
aclVersion = 0
#临时持有者 0x0表示持久节点,否则对应的是sessionId
ephemeralOwner = 0x0
dataLength = 5
numChildren = 1

#删除节点
[zk: localhost:2181(CONNECTED) 12] deleteall /ooxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#断开客户端重新连接,能够看到日志输出了sesionid = 0x1000092017d0004
2020-11-27 22:37:43,097 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1394] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x1000092017d0004, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

# create [-s] [-e] [-c] [-t ttl] path [data] [acl]
# 创建一个临时节点
[zk: localhost:2181(CONNECTED) 0] create -e /xoxo "aaa"
Created /xoxo
#查看临时节点状态
[zk: localhost:2181(CONNECTED) 1] stat /xoxo
cZxid = 0x20000000d
ctime = Fri Nov 27 22:38:40 CST 2020
mZxid = 0x20000000d
mtime = Fri Nov 27 22:38:40 CST 2020
pZxid = 0x20000000d
cversion = 0
dataVersion = 0
aclVersion = 0
#临时节点归属于 0x1000092017d0004 的SessionId
ephemeralOwner = 0x1000092017d0004
dataLength = 3
numChildren = 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#这时候用另外一台机器连接zk,得到【sessionid = 0x400002966df0000】
#查看目录,能够查看到 xoxo
[zk: localhost:2181(CONNECTED) 0] ls /
[ooxx, xoxo, zookeeper]

#查看临时节点状态
[zk: localhost:2181(CONNECTED) 1] stat /xoxo
cZxid = 0x20000000d
ctime = Fri Nov 27 22:38:40 CST 2020
mZxid = 0x20000000d
mtime = Fri Nov 27 22:38:40 CST 2020
pZxid = 0x20000000d
cversion = 0
dataVersion = 0
aclVersion = 0
#在其他节点中也能看到它的归属其他Session状态
ephemeralOwner = 0x1000092017d0004
dataLength = 3
numChildren = 0
1
2
3
4
5
# 这时候将【sessionid = 0x1000092017d0004】的链接断开
# 然后在【sessionid = 0x400002966df0000】的链接中查看
# 已经查不到xoxo节点了,它随着session的断开而消失
[zk: localhost:2181(CONNECTED) 1] ls /
[ooxx, zookeeper]
1
2
3
#在临时节点里再新增节点会提示错误,因为临时节点无法加子节点
[zk: localhost:2181(CONNECTED) 8] create /aaa/xxx
Ephemerals cannot have children: /aaa/xxx

Zookeeper提供了统一视图的功能,它在客户端连接到Zookeeper创建Session时,会执行将Session添加到统一视图的事务,这时候事务ID + 1,当Session退出时,会执行一个将Session从统一视图删除的事务,事务ID +1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#【sessionid = 0x400002966df0000】
[zk: localhost:2181(CONNECTED) 4] create /xoxo ""
Created /xoxo
[zk: localhost:2181(CONNECTED) 5] stat /xoxo
#认准当前事务ID是 00000010
cZxid = 0x200000010
ctime = Fri Nov 27 22:59:37 CST 2020
mZxid = 0x200000010
mtime = Fri Nov 27 22:59:37 CST 2020
pZxid = 0x200000010
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#这时候创建一个客户端连接,创建了【sessionid = 0x400002966df0001】

#回到【sessionid = 0x400002966df0000】
[zk: localhost:2181(CONNECTED) 6] create /oxox "bbb"
Created /oxox
[zk: localhost:2181(CONNECTED) 7] stat /oxox
# 可以看到当前事务ID是 00000012
cZxid = 0x200000012
ctime = Fri Nov 27 23:04:14 CST 2020
mZxid = 0x200000012
mtime = Fri Nov 27 23:04:14 CST 2020
pZxid = 0x200000012
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0

如果一个客户端连接了一个Zookeeper,这时候如果这台Zookeeper挂了,客户端的连接将转移到另外一台Zookeeper,这时候会重新创建Session吗?

也就是说,创建Session后,每个Zookeeper都能够从统一视图中获得这个Session会话,如果某个节点挂了,客户端转移另一台Zookeeper时,只要在Session超时的时间内重新连接了,就能保持原Session,默认超时时间是3秒。

通信

1
2
#分别在4台机器上查看Socket通信状态
[root@localhost ~]# netstat -natp | egrep '(2888|3888)'

zookeeper

原理

Paxos

zookeeper

Paxos

ZAB

ZAB:原子广播协议

创建

zookeeper

选举

zookeeper

1
2
3
4
5
6
每个节点自己会有myid、Zxid

新的Leader
1,经验最丰富的Zxid,
2,myid
3.原Follower的zxid都是可信的,都是经过过半投票后产生的数据

在上图集群中,Leader节点是node04,并且触发了过半通过,而node03节点尚未同步最新数据(Zxid=07 –> Zxid=08),假设这时候Leader挂了,并且node03的心跳检测最先识别出Leader挂了,触发了选举机制。

1
2
3
4
5
6
7
步骤:
1. node03推选自己做Leader,将自身信息发送给其他节点
2. 其他节点对比Zxid,发现比自身旧,驳回node03节点信息
3. 驳回的节点被动推选自己做Leader,将自身信息发给其他节点
4. 每个节点接收到驳回的节点信息后,【比对Zxid,相等时再比对myid,然后自身节点对合适的节点投票】
5. 节点向合适的节点发送自身的投票结果
6. 节点向其他节点转发合适的节点信息

zookeeper

zookeeper

zookeeper

zookeeper

1
2
3
4
5
6
7
8
9
最终投票信息如下:
node01节点:
node02: + 1 (node02投票自身) + 1 (node01投票node02) + 1 (node03转发node02)

node02节点:
node02: + 1 (node02投票自身) + 1 (node01投票node02) + 1 (node03投票node02)

node03节点:
node02: + 1 (node02投票自身) + 1 (node03投票node02) + 1 (node01转发node02)

Watch

zookeeper

代码

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public class App {
public static void main(String[] args) throws Exception {

//zk是有session概念的,没有连接池的概念
//watch:观察,回调
//watch的注册值发生在 读类型调用,get,exites。。。
//第一类:new zk 时候,传入的watch,这个watch,session级别的,跟path 、node没有关系。
final CountDownLatch cd = new CountDownLatch(1);
final ZooKeeper zk = new ZooKeeper("192.168.150.11:2181,192.168.150.12:2181,192.168.150.13:2181,192.168.150.14:2181",
3000, sessionWatcher(cd));

//线程阻塞等待zookeeper连接完成,它是个异步模型
cd.await();
//打印当前zookeeper状态 如果没有闭锁,可能是CONNECTING,加了闭锁则是Connected
printState(zk);

//创建节点
String pathName = zk.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

//查询节点时增加Watch机制,watch监控会在节点修改时触发
final Stat stat = new Stat();
byte[] node = zk.getData("/ooxx", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("getData watch: " + event.toString());
try {
//true 会使用default Watch 重新注册 : new zk的那个watch
//zk.getData("/ooxx", true, stat);

// this将当前Watch重新注册到zk中
zk.getData("/ooxx", this, stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, stat);

System.out.println(new String(node));

//触发回调
Stat stat1 = zk.setData("/ooxx", "newdata".getBytes(), 0);
//如果没有重新注册Watch,就不会再触发回调,重新注册了,就会触发回调
Stat stat2 = zk.setData("/ooxx", "newdata01".getBytes(), stat1.getVersion());

System.out.println("-------async start----------");
//异步获取数据,响应式编程
zk.getData("/ooxx", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("-------async call back----------");
System.out.println(ctx.toString());
System.out.println(new String(data));

}

}, "abc");
System.out.println("-------async over----------");

System.in.read();
}

public static void printState(ZooKeeper zk) {
ZooKeeper.States state = zk.getState();
switch (state) {
case CONNECTING:
System.out.println("ing......");
break;
case ASSOCIATING:
break;
case CONNECTED:
System.out.println("ed........");
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
}

public static Watcher sessionWatcher(CountDownLatch cd) {
return new Watcher() {
//Watch 的回调方法!
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
String path = event.getPath();
System.out.println("new zk watch: " + event.toString());

switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("connected");
cd.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}

switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
};
}
}

配置中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ZKUtils {

private static ZooKeeper zk;

private static String address = "192.168.150.11:2181,192.168.150.12:2181,192.168.150.13:2181,192.168.150.14:2181/testLock";

//自定义了一个Session的Watch
private static DefaultWatch watch = new DefaultWatch();

private static CountDownLatch init = new CountDownLatch(1);
public static ZooKeeper getZK(){

try {
zk = new ZooKeeper(address,1000,watch);
watch.setCc(init);
init.await();

} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DefaultWatch  implements Watcher {

CountDownLatch cc ;

public void setCc(CountDownLatch cc) {
this.cc = cc;
}

@Override
public void process(WatchedEvent event) {

System.out.println(event.toString());

switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
cc.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class MyConf {

private String conf ;

public String getConf() {
return conf;
}

public void setConf(String conf) {
this.conf = conf;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class TestConfig {


ZooKeeper zk;


@Before
public void conn (){
zk = ZKUtils.getZK();
}

@After
public void close (){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}


@Test
public void getConf(){

WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
MyConf myConf = new MyConf();
watchCallBack.setConf(myConf);

//数据不存在的话,就阻塞等待数据
watchCallBack.aWait();
//1,节点不存在
//2,节点存在

while(true){

if(myConf.getConf().equals("")){
System.out.println("conf diu le ......");
watchCallBack.aWait();
}else{
System.out.println(myConf.getConf());

}

try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class WatchCallBack  implements Watcher ,AsyncCallback.StatCallback, AsyncCallback.DataCallback {

ZooKeeper zk ;
MyConf conf ;
CountDownLatch cc = new CountDownLatch(1);

public MyConf getConf() {
return conf;
}

public void setConf(MyConf conf) {
this.conf = conf;
}

public ZooKeeper getZk() {
return zk;
}

public void setZk(ZooKeeper zk) {
this.zk = zk;
}

//获取数据阻塞等待
public void aWait(){
zk.exists("/AppConf",this,this ,"ABC");
try {
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//调用get的callback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {

//如果数据存在,添加进配置对象,然后释放闭锁
if(data != null ){
String s = new String(data);
conf.setConf(s);
cc.countDown();
}


}

//调用exist的callback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
//如果节点存在,则调用get方法,并添加回调
if(stat != null){
zk.getData("/AppConf",this,this,"sdfs");
}

}

//Watch的注册事件
@Override
public void process(WatchedEvent event) {

switch (event.getType()) {
case None:
break;
case NodeCreated:
//如果事件检测到节点创建,则获取数据,释放闭锁阻塞
zk.getData("/AppConf",this,this,"sdfs");

break;
case NodeDeleted:
//容忍性
//如果数据被删了,则清空数据,重置闭锁,重新阻塞
conf.setConf("");
cc = new CountDownLatch(1);
break;
case NodeDataChanged:
//如果数据变化了,重新获取值
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeChildrenChanged:
break;
}
}
}

分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
public class TestLock {


ZooKeeper zk;
ZKConf zkConf;
DefaultWatch defaultWatch;

@Before
public void conn(){
zkConf = new ZKConf();
zkConf.setAddress("192.168.163.128:2181,192.168.163.129:2181,192.168.163.130:2181,192.168.163.131:2181/testLock");
zkConf.setSessionTime(1000);
defaultWatch = new DefaultWatch();
ZKUtils.setConf(zkConf);
ZKUtils.setWatch(defaultWatch);
zk = ZKUtils.getZK();
}

@After
public void close(){
ZKUtils.closeZK();
}

@Test
public void testlock(){
for (int i = 0; i < 10; i++) {
new Thread(){
@Override
public void run() {
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
String name = Thread.currentThread().getName();
watchCallBack.setThreadName(name);

try {
//tryLock
watchCallBack.tryLock();
System.out.println(name + " at work");
watchCallBack.getRootData();
// Thread.sleep(1000);
//unLock
watchCallBack.unLock();
} catch (Exception e) {
e.printStackTrace();
}

}
}.start();
}
while(true){

}
}
}
public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback,AsyncCallback.StatCallback,AsyncCallback.DataCallback {

ZooKeeper zk;
CountDownLatch cc = new CountDownLatch(1);
String lockName ;
String threadName;

public String getThreadName() {
return threadName;
}

public void setThreadName(String threadName) {
this.threadName = threadName;
}

public ZooKeeper getZk() {
return zk;
}

public void setZk(ZooKeeper zk) {
this.zk = zk;
}


public void tryLock() {
//重入
try {
zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, threadName );
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void getRootData() throws KeeperException, InterruptedException {
byte[] data = zk.getData("/", false, new Stat());
System.out.println(new String(data));
}

public void unLock(){
try {
zk.delete("/"+lockName,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

//getChileden....
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

//获得所目录的所有有序节点,然后排序,然后取自己在有序list中的index
if(children == null){
System.out.println(ctx.toString() + "list null");
}else{
try {
Collections.sort(children);
int i = children.indexOf(lockName);
if(i<1){
System.out.println(threadName+" i am first...");
zk.setData("/",threadName.getBytes(),-1);
cc.countDown();
}else{
System.out.println(threadName+" watch "+children.get(i-1));
zk.exists("/"+children.get(i-1),this);
}
} catch (Exception e) {
e.printStackTrace();
}
}


}

//create....
@Override
public void processResult(int rc, String path, Object ctx, String name) {

//每个线程启动后创建锁,然后get锁目录的所有孩子,不注册watch在锁目录
System.out.println(ctx.toString()+" create path: "+ name);
lockName = name.substring(1);
zk.getChildren("/", false, this, ctx );
}


@Override
public void process(WatchedEvent event) {

Event.EventType type = event.getType();
switch (type) {

case NodeDeleted:
zk.getChildren("/", false, this, "");
break;

case NodeChildrenChanged:
break;
}

}


@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {

//监控失败了怎么办
}
}

最后更新: 2021年01月21日 23:14

原始链接: https://midkuro.gitee.io/2020/05/28/zookeeper-base/

× 请我吃糖~
打赏二维码