演示一些常用的场景
下载apache-zookeeper-3.9.4-bin.tar.gz,https://zookeeper.apache.org/releases.html
集群部署
重命名zoo_sample.cfg为zoo.cfg,具体配置内容如下示例:
根据协议内容,选举需要提前确定好有哪些参与节点,所以zoo.cfg要提前列好所有节点,想动态添加节点的话比较麻烦
zookeeper1/conf/zoo.cfg 1 2 3 4 5 6 7 8 9 10 11 12 13 14 tickTime =2000 initLimit =10 syncLimit =5 dataDir =/Users/shanhuiming/zk/zookeeper1/data clientPort =2181 server.101 =127.0.0.1:2888:3888 server.102 =127.0.0.1:2889:3889 server.103 =127.0.0.1:2890:3890
对应dataDir配置,创建data目录并在目录中创建myid文件设置节点id
zookeeper1/data/myid
可以通过四字命令查看状态
1 2 3 4 5 6 7 8 9 10 11 echo srvr | nc localhost 2181 Zookeeper version: 3.9.4-7246445ec281f3dbf53dc54e970c914f39713903, built on 2025-08-19 19:53 UTC Latency min/avg/max: 0/0.0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x1000001c7 Mode: follower Node count: 10
Java客户端 文档:https://curator.apache.org/docs/about
pom.xml 1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.9.4</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 5.9.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 @Bean public CuratorFramework curatorFramework () { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183" ) .sessionTimeoutMs(5000 ) .retryPolicy(new ExponentialBackoffRetry (1000 , 3 )) .build(); curatorFramework.start(); return curatorFramework; }
场景示例 https://curator.apache.org/docs/recipes
服务注册监听 Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程,可以监听节点的创建,修改和删除。Curator会在本地内存维护一份ZooKeeper的节点快照,如果断开重连后,会根据本地快照和远程状态重新对齐,对期间丢失的watch事件进行补偿,虽然不能保证“重放”事件,但是也弥补了watch一次性触发的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public void node () throws Exception { if (curatorFramework.checkExists().forPath("/services" ) == null ) { curatorFramework.create().creatingParentsIfNeeded() .forPath("/services" , "Service Registry" .getBytes()); } String service = InetAddress.getLocalHost().getHostAddress() + ":" + port; curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/services/instance-" , service.getBytes()); CuratorCacheListener listener = CuratorCacheListener.builder() .forAll((type, oldData, data) -> { switch (type) { case NODE_CREATED -> { System.out.println("服务上线 " + data.getPath()); } case NODE_DELETED -> { System.out.println("服务下线 " + oldData.getPath()); } } }).build(); CuratorCache cache = CuratorCache.build(curatorFramework, "/services" ); cache.listenable().addListener(listener); cache.start(); if (curatorFramework.checkExists().forPath("/services" ) != null ) { List<String> instances = curatorFramework.getChildren().forPath("/services" ); for (String instance : instances) { String instancePath = "/services/" + instance; byte [] data = curatorFramework.getData().forPath(instancePath); System.out.println("在线服务 " + new String (data)); } } }
集群配置管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void config () throws Exception { CuratorCacheListener listener = CuratorCacheListener.builder() .forAll((type, oldData, data) -> { switch (type) { case NODE_CREATED -> System.out.println("创建配置 " + data.getPath() + " " + new String (data.getData())); case NODE_CHANGED -> System.out.println("修改配置 " + data.getPath() + " " + new String (data.getData()) + " -> " +new String (data.getData())); case NODE_DELETED -> System.out.println("删除配置 " + oldData.getPath()); } }).build(); CuratorCache cache = CuratorCache.build(curatorFramework, "/config/app1/db" ); cache.listenable().addListener(listener); cache.start(); curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/config/app1/db/user" , "root" .getBytes(StandardCharsets.UTF_8)); curatorFramework.setData().forPath("/config/app1/db/user" , "satncs" .getBytes(StandardCharsets.UTF_8)); curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/config/app1/db/user" ); }
Leader选举 轮换Leader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void leaderSelector () { LeaderSelectorListener listener = new LeaderSelectorListenerAdapter () { @Override public void takeLeadership (CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " 获取Leader" ); try { Thread.sleep(5000 ); } finally { System.out.println(Thread.currentThread().getName() + " 释放Leader" ); } } }; LeaderSelector selector = new LeaderSelector (curatorFramework, "/election" , listener); selector.autoRequeue(); selector.start(); }
长期Leader
LeaderLatch在启动期间选择一个leader并保持leader状态,直到显式地关闭leader或发生客户端断开连接事件。适用于需要稳定、长期存在的Leader场景,例如Kafka Controller、集群协调节点等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void leaderLatch () throws Exception { LeaderLatchListener listener = new LeaderLatchListener () { @Override public void isLeader () { System.out.println("获取Leader" ); } @Override public void notLeader () { System.out.println("失去Leader" ); } }; LeaderLatch latch = new LeaderLatch (curatorFramework, "/controller" ); latch.addListener(listener); latch.start(); }
分布式锁 InterProcessMutex是Curator实现的分布式锁,基本思路是按顺序创建临时节点,如果当前节点序号最小,则获得锁,否则watch排在自己前面的节点。InterProcessMutex内部做了重试和锁竞争机制,它不会依赖单次Watcher通知。不然有些场景下会有问题,比如A先创建了临时借点,B后创建了临时借点,然后A宕机了,但是通知B的事件丢失了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void lock () throws Exception { if (Objects.isNull(curatorFramework.checkExists().forPath("/lock" ))) { curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/lock" ); } InterProcessMutex lock = new InterProcessMutex (curatorFramework, "/lock" ); List<Integer> list = new ArrayList <>(); for (int i = 0 ; i < 100 ; i++) { list.add(i); } list.parallelStream().forEach(integer -> { try { lock.acquire(); Integer order = orderService.createOrder(); log.info("->" + order); lock.release(); } catch (Exception e) { e.printStackTrace(); } }); }
参考: