Zookeeper 3.9.4部署

words: 1.3k    views:    time: 6min

演示一些常用的场景

下载apache-zookeeper-3.9.4-bin.tar.gz,https://zookeeper.apache.org/releases.html

集群部署

  • 分别配置zookeeper

重命名zoo_sample.cfg为zoo.cfg,具体配置内容如下示例:

根据协议内容,选举需要提前确定好有哪些参与节点,所以zoo.cfg要提前列好所有节点,想动态添加节点的话比较麻烦

zookeeper1/conf/zoo.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# The number of milliseconds of each tick                                                                                             
tickTime=2000
# The number of ticks that the initial synchronization phase can take
initLimit=10
# The number of ticks that can pass between sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/Users/shanhuiming/zk/zookeeper1/data
# the port at which the clients will connect
clientPort=2181

server.101=127.0.0.1:2888:3888
server.102=127.0.0.1:2889:3889
server.103=127.0.0.1:2890:3890

对应dataDir配置,创建data目录并在目录中创建myid文件设置节点id

zookeeper1/data/myid
1
101
  • 分别启动zookeeper
1
bin/zkServer.sh start

可以通过四字命令查看状态

1
2
3
4
5
6
7
8
9
10
11
echo srvr | nc localhost 2181

Zookeeper version: 3.9.4-7246445ec281f3dbf53dc54e970c914f39713903, built on 2025-08-19 19:53 UTC
Latency min/avg/max: 0/0.0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1000001c7
Mode: follower
Node count: 10

Java客户端

文档:https://curator.apache.org/docs/about

pom.xml
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.9.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
@Bean
public CuratorFramework curatorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
return curatorFramework;
}

场景示例

https://curator.apache.org/docs/recipes

服务注册监听

Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程,可以监听节点的创建,修改和删除。Curator会在本地内存维护一份ZooKeeper的节点快照,如果断开重连后,会根据本地快照和远程状态重新对齐,对期间丢失的watch事件进行补偿,虽然不能保证“重放”事件,但是也弥补了watch一次性触发的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void node() throws Exception {
// 创建根节点(持久节点)
if (curatorFramework.checkExists().forPath("/services") == null) {
curatorFramework.create().creatingParentsIfNeeded()
.forPath("/services", "Service Registry".getBytes());
}

// 注册服务,创建临时节点
String service = InetAddress.getLocalHost().getHostAddress() + ":" + port;
curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath("/services/instance-", service.getBytes());

// 监听节点事件
CuratorCacheListener listener = CuratorCacheListener.builder()
.forAll((type, oldData, data) -> {
switch (type) {
case NODE_CREATED -> {
System.out.println("服务上线 " + data.getPath());
}
case NODE_DELETED -> {
System.out.println("服务下线 " + oldData.getPath());
}
}
}).build();
CuratorCache cache = CuratorCache.build(curatorFramework, "/services");
cache.listenable().addListener(listener);
cache.start();

// 获取服务
if (curatorFramework.checkExists().forPath("/services") != null) {
List<String> instances = curatorFramework.getChildren().forPath("/services");
for (String instance : instances) {
String instancePath = "/services/" + instance;
byte[] data = curatorFramework.getData().forPath(instancePath);
System.out.println("在线服务 " + new String(data));
}
}
}
集群配置管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void config() throws Exception {
// 创建根节点(持久节点)
CuratorCacheListener listener = CuratorCacheListener.builder()
.forAll((type, oldData, data) -> {
switch (type) {
case NODE_CREATED -> System.out.println("创建配置 " + data.getPath()
+ " " + new String(data.getData()));
case NODE_CHANGED -> System.out.println("修改配置 " + data.getPath()
+ " " + new String(data.getData()) + " -> " +new String(data.getData()));
case NODE_DELETED -> System.out.println("删除配置 " + oldData.getPath());
}
}).build();
CuratorCache cache = CuratorCache.build(curatorFramework, "/config/app1/db");
cache.listenable().addListener(listener);
cache.start();

// 新增
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath("/config/app1/db/user", "root".getBytes(StandardCharsets.UTF_8));
// 修改
curatorFramework.setData().forPath("/config/app1/db/user", "satncs".getBytes(StandardCharsets.UTF_8));
// 删除
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/config/app1/db/user");
}
Leader选举

轮换Leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void leaderSelector() {
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(Thread.currentThread().getName() + " 获取Leader");
try {
// 模拟执行业务逻辑
Thread.sleep(5000);
} finally {
System.out.println(Thread.currentThread().getName() + " 释放Leader");
}
}
};
LeaderSelector selector = new LeaderSelector(curatorFramework, "/election", listener);
selector.autoRequeue(); // 自动再次参与选举
selector.start();
}

长期Leader

LeaderLatch在启动期间选择一个leader并保持leader状态,直到显式地关闭leader或发生客户端断开连接事件。适用于需要稳定、长期存在的Leader场景,例如Kafka Controller、集群协调节点等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void leaderLatch() throws Exception {
LeaderLatchListener listener = new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("获取Leader");
}

@Override
public void notLeader() {
System.out.println("失去Leader");
}
};
LeaderLatch latch = new LeaderLatch(curatorFramework, "/controller");
latch.addListener(listener);
latch.start();
}
分布式锁

InterProcessMutex是Curator实现的分布式锁,基本思路是按顺序创建临时节点,如果当前节点序号最小,则获得锁,否则watch排在自己前面的节点。InterProcessMutex内部做了重试和锁竞争机制,它不会依赖单次Watcher通知。不然有些场景下会有问题,比如A先创建了临时借点,B后创建了临时借点,然后A宕机了,但是通知B的事件丢失了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void lock() throws Exception {
if (Objects.isNull(curatorFramework.checkExists().forPath("/lock"))) {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/lock");
}
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/lock");

// 100个并行处理
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(i);
}
list.parallelStream().forEach(integer -> {
try {
lock.acquire();
Integer order = orderService.createOrder();
log.info("->" + order);
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
});
}


参考: