大纲
1.基于Curator进行基本的zk数据操作
2.基于Curator实现集群元数据管理
3.基于Curator实现HA主备自动切换
4.基于Curator实现Leader选举
5.基于Curator实现分布式Barrier
6.基于Curator实现分布式计数器
7.基于Curator实现zk的节点和子节点监听机制
8.基于Curator创建客户端实例的源码分析
9.Curator在启动时是如何跟zk建立连接的
10.基于Curator进行增删改查节点的源码分析
11.基于Curator的节点监听回调机制的实现源码
12.基于Curator的Leader选举机制的实现源码
1.基于Curator进行基本的zk数据操作
Guava is to Java what Curator is to ZooKeeper,引入依赖如下:
<dependencies><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>
</dependencies>
Curator实现对znode进行增删改查的示例如下,其中CuratorFramework代表一个客户端实例。注意:可以通过creatingParentsIfNeeded()方法进行指定节点的级联创建。
public class CrudDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();//启动客户端并建立连接System.out.println("已经启动Curator客户端");client.create().creatingParentsIfNeeded()//进行级联创建.withMode(CreateMode.PERSISTENT)//指定节点类型.forPath("/my/path", "10".getBytes());//增byte[] dataBytes = client.getData().forPath("/my/path");//查System.out.println(new String(dataBytes));client.setData().forPath("/my/path", "11".getBytes());//改dataBytes = client.getData().forPath("/my/path");System.out.println(new String(dataBytes));List<String> children = client.getChildren().forPath("/my");//查System.out.println(children);client.delete().forPath("/my/path");//删Thread.sleep(Integer.MAX_VALUE);}
}
2.基于Curator实现集群元数据管理
Curator可以操作zk。比如自研了一套分布式系统类似于Kafka、Canal,想把集群运行的核心元数据都放到zk里去。此时就可以通过Curator创建一些znode,往里面写入对应的值。
写入的值推荐用json格式,比如Kafka就是往zk写json格式数据。这样,其他客户端在需要的时候,就可以从里面读取出集群元数据了。
3.基于Curator实现HA主备自动切换
HDFS、Kafka、Canal都使用了zk进行Leader选举,所以可以基于Curator实现HA主备自动切换。
HDFS的NameNode是可以部署HA架构的,有主备两台机器。如果主机器宕机了,备用的机器可以感知到并选举为Leader,这样备用的机器就可以作为新的NameNode对外提供服务。
Kafka里的Controller负责管理整个集群的协作,Kafka中任何一个Broker都可以变成Controller,类似于Leader的角色。
Canal也会部署主备两台机器,主机器挂掉了,备用机器就可以跟上去。
4.基于Curator实现Leader选举
(1)Curator实现Leader选举的第一种方式之LeaderLatch
(2)Curator实现Leader选举的第二种方式之LeaderSelector
(1)Curator实现Leader选举的第一种方式之LeaderLatch
通过Curator的LeaderLatch来实现Leader选举:
public class LeaderLatchDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();//"/leader/latch"这其实是一个znode顺序节点LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");leaderLatch.start();leaderLatch.await();//直到等待他成为Leader再往后执行//类似于HDFS里,两台机器,其中一台成为了Leader就开始工作//另外一台机器可以通过await阻塞在这里,直到Leader挂了,自己就会成为Leader继续工作Boolean hasLeaderShip = leaderLatch.hasLeadership();//判断是否成为LeaderSystem.out.println("是否成为leader:" + hasLeaderShip);Thread.sleep(Integer.MAX_VALUE);}
}
(2)Curator实现Leader选举的第二种方式之LeaderSelector
通过Curator的LeaderSelector来实现Leader选举如下:其中,LeaderSelector有两个监听器,可以关注连接状态。
public class LeaderSelectorDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();LeaderSelector leaderSelector = new LeaderSelector(client,"/leader/election",new LeaderSelectorListener() {public void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("你已经成为了Leader......");//在这里干Leader所有的事情,此时方法不能退出Thread.sleep(Integer.MAX_VALUE);}public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("连接状态的变化,已经不是Leader......");if (connectionState.equals(ConnectionState.LOST)) {throw new CancelLeadershipException();}}});leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为LeaderThread.sleep(Integer.MAX_VALUE);}
}
5.基于Curator实现的分布式Barrier
(1)分布式Barrier
(2)分布式双重Barrier
(1)分布式Barrier
很多台机器都可以创建一个Barrier,此时它们都被阻塞了。除非满足一个条件(setBarrier()或removeBarrier()),才能不再阻塞它们。
//DistributedBarrier
public class DistributedBarrierDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");barrier.waitOnBarrier();}
}
(2)分布式双重Barrier
//DistributedDoubleBarrier
public class DistributedDoubleBarrierDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client, "/barrier/double", 10);doubleBarrier.enter();//每台机器都会阻塞在enter这里//直到10台机器都调用了enter,就会从enter这里往下执行//此时可以做一些计算任务doubleBarrier.leave();//每台机器都会阻塞在leave这里,直到10台机器都调用了leave//此时就可以继续往下执行}
}
6.基于Curator实现分布式计数器
如果真的要实现分布式计数器,最好用Redis来实现。因为Redis的并发量更高,性能更好,功能更加的强大,而且还可以使用lua脚本嵌入进去实现复杂的业务逻辑。但是Redis天生的异步同步机制,存在机器宕机导致的数据不同步风险。然而zk在ZAB协议下的数据同步机制,则不会出现宕机导致数据不同步的问题。
//SharedCount:通过一个节点的值来实现
public class SharedCounterDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();SharedCount sharedCount = new SharedCount(client, "/shared/count", 0);sharedCount.start();sharedCount.addListener(new SharedCountListener() {public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {System.out.println("分布式计数器变化了......");}public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("连接状态变化了.....");}});Boolean result = sharedCount.trySetCount(1);System.out.println(sharedCount.getCount());}
}
7.基于Curator实现zk的节点和子节点监听机制
(1)基于Curator实现zk的子节点监听机制
(2)基于Curator实现zk的节点数据监听机制
我们使用zk主要用于:
一.对元数据进行增删改查、监听元数据的变化
二.进行Leader选举
有三种类型的节点可以监听:
一.子节点监听PathCache
二.节点监听NodeCache
三.整个节点以下的树监听TreeCache
(1)基于Curator实现zk的子节点监听机制
下面是PathCache实现的子节点监听示例:
public class PathCacheDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);//cache就是把zk里的数据缓存到客户端里来//可以针对这个缓存的数据加监听器,去观察zk里的数据的变化pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {}});pathChildrenCache.start();}
}
(2)基于Curator实现zk的节点数据监听机制
下面是NodeCache实现的节点监听示例:
public class NodeCacheDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();final NodeCache nodeCache = new NodeCache(client, "/cluster");nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {Stat stat = client.checkExists().forPath("/cluster");if (stat == null) {} else {nodeCache.getCurrentData();}}});nodeCache.start();}
}
8.基于Curator创建客户端实例的源码分析
(1)创建CuratorFramework实例使用了构造器模式
(2)创建CuratorFramework实例会初始化CuratorZooKeeperClient实例
(1)创建CuratorFramework实例使用了构造器模式
CuratorFrameworkFactory.newClient()方法使用了构造器模式。首先通过builder()方法创建出Builder实例对象,然后把参数都设置成Builder实例对象的属性,最后通过build()方法把Builder实例对象传入目标类的构造方法中。
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");}
}public class CuratorFrameworkFactory {//创建CuratorFramework实例使用了构造器模式public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();}...public static Builder builder() {return new Builder();}public static class Builder {...private EnsembleProvider ensembleProvider;private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;private RetryPolicy retryPolicy;...public Builder connectString(String connectString) {ensembleProvider = new FixedEnsembleProvider(connectString);return this;}public Builder sessionTimeoutMs(int sessionTimeoutMs) {this.sessionTimeoutMs = sessionTimeoutMs;return this;}public Builder connectionTimeoutMs(int connectionTimeoutMs) {this.connectionTimeoutMs = connectionTimeoutMs;return this;}public Builder retryPolicy(RetryPolicy retryPolicy) {this.retryPolicy = retryPolicy;return this;}...public CuratorFramework build() {return new CuratorFrameworkImpl(this);}}...
}public class CuratorFrameworkImpl implements CuratorFramework {...public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());this.client = new CuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(),builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),builder.getWaitForShutdownTimeoutMs(),new Watcher() {//这里注册了一个zk的watcher@Overridepublic void process(WatchedEvent watchedEvent) {CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);processEvent(event);}},builder.getRetryPolicy(),builder.canBeReadOnly(),builder.getConnectionHandlingPolicy());...}...
}
(2)创建CuratorFramework实例会初始化CuratorZooKeeperClient实例
CuratorFramework实例代表了一个zk客户端,CuratorFramework初始化时会初始化一个CuratorZooKeeperClient实例。
CuratorZooKeeperClient是Curator封装ZooKeeper的客户端。
初始化CuratorZooKeeperClient时会传入一个Watcher监听器。
所以CuratorFrameworkFactory的newClient()方法的主要工作是:初始化CuratorFramework -> 初始化CuratorZooKeeperClient -> 初始化ZookeeperFactory + 注册一个Watcher。
客户端发起与zk的连接,以及注册Watcher监听器,则是由CuratorFramework的start()方法触发的。
9.Curator启动时是如何跟zk建立连接的
ConnectionStateManager的start()方法会启动一个线程处理eventQueue。eventQueue里存放了与zk的网络连接变化事件,eventQueue收到这种事件便会通知ConnectionStateListener。
CuratorZookeeperClient的start()方法会初始化好原生zk客户端,和zk服务器建立一个TCP长连接,而且还会注册一个ConnectionState类型的Watcher监听器,以便能收到zk服务端发送的通知事件。
public class CuratorFrameworkImpl implements CuratorFramework {private final CuratorZookeeperClient client;private final ConnectionStateManager connectionStateManager;private volatile ExecutorService executorService;...public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {...this.client = new CuratorZookeeperClient(...);connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());...}...@Overridepublic void start() {log.info("Starting");if (!state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)) {throw new IllegalStateException("Cannot be started more than once");}...//1.启动一个线程监听和zk网络连接的变化事件connectionStateManager.start();//2.添加一个监听器监听和zk网络连接的变化final ConnectionStateListener listener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState) {logAsErrorConnectionErrors.set(true);}}@Overridepublic boolean doNotDecorate() {return true;}};this.getConnectionStateListenable().addListener(listener);//3.创建原生zk客户端client.start();//4.创建一个线程池,执行后台的操作executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);executorService.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {backgroundOperationsLoop();return null;}});if (ensembleTracker != null) {ensembleTracker.start();}log.info(schemaSet.toDocumentation());}...
}public class ConnectionStateManager implements Closeable {private final ExecutorService service;private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);...public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) {...service = Executors.newSingleThreadExecutor(threadFactory);...}...public void start() {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");//启动一个线程service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {processEvents();return null;}});}private void processEvents() {while (state.get() == State.STARTED) {int useSessionTimeoutMs = getUseSessionTimeoutMs();long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;long pollMaxMs = useSessionTimeoutMs - elapsedMs;final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);if (newState != null) {if (listeners.size() == 0) {log.warn("There are no ConnectionStateListeners registered.");}listeners.forEach(listener -> listener.stateChanged(client, newState));} else if (sessionExpirationPercent > 0) {synchronized(this) {checkSessionExpiration();}}}}...
}public class CuratorZookeeperClient implements Closeable {private final ConnectionState state;...public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {...state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);...}...public void start() throws Exception {log.debug("Starting");if (!started.compareAndSet(false, true)) {throw new IllegalStateException("Already started");}state.start();}...
}class ConnectionState implements Watcher, Closeable {private final HandleHolder zooKeeper;ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {this.ensembleProvider = ensembleProvider;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.tracer = tracer;this.connectionHandlingPolicy = connectionHandlingPolicy;if (parentWatcher != null) {parentWatchers.offer(parentWatcher);}//把自己作为Watcher注册给HandleHolderzooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);}...void start() throws Exception {log.debug("Starting");ensembleProvider.start();reset();}synchronized void reset() throws Exception {log.debug("reset");instanceIndex.incrementAndGet();isConnected.set(false);connectionStartMs = System.currentTimeMillis();//创建客户端与zk的连接zooKeeper.closeAndReset();zooKeeper.getZooKeeper();//initiate connection}...
}class HandleHolder {private final ZookeeperFactory zookeeperFactory;private final Watcher watcher;private final EnsembleProvider ensembleProvider;private final int sessionTimeout;private final boolean canBeReadOnly;private volatile Helper helper;...HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly) {this.zookeeperFactory = zookeeperFactory;this.watcher = watcher;this.ensembleProvider = ensembleProvider;this.sessionTimeout = sessionTimeout;this.canBeReadOnly = canBeReadOnly;}private interface Helper {ZooKeeper getZooKeeper() throws Exception;String getConnectionString();int getNegotiatedSessionTimeoutMs();}ZooKeeper getZooKeeper() throws Exception {return (helper != null) ? helper.getZooKeeper() : null;}void closeAndReset() throws Exception {internalClose(0);helper = new Helper() {private volatile ZooKeeper zooKeeperHandle = null;private volatile String connectionString = null;@Overridepublic ZooKeeper getZooKeeper() throws Exception {synchronized(this) {if (zooKeeperHandle == null) {connectionString = ensembleProvider.getConnectionString();//创建和zk的连接,初始化变量zooKeeperHandlezooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);}...return zooKeeperHandle;}}@Overridepublic String getConnectionString() {return connectionString;}@Overridepublic int getNegotiatedSessionTimeoutMs() {return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;}};}...
}//创建客户端与zk的连接
public class DefaultZookeeperFactory implements ZookeeperFactory {@Overridepublic ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);}
}
10.基于Curator进行增删改查节点的源码分析
(1)基于Curator创建znode节点
(2)基于Curator查询znode节点
(3)基于Curator修改znode节点
(4)基于Curator删除znode节点
Curator的CURD操作,底层都是通过调用zk原生的API来完成的。
(1)基于Curator创建znode节点
创建节点也使用了构造器模式:首先通过CuratorFramework的create()方法创建一个CreateBuilder实例,然后通过CreateBuilder的withMode()等方法设置CreateBuilder的变量,最后通过CreateBuilder的forPath()方法 + 重试调用来创建znode节点。
创建节点时会调用CuratorFramework的getZooKeeper()方法获取zk客户端实例,之后就是通过原生zk客户端的API去创建节点了。
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//创建节点client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/my/path", "100".getBytes());}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic CreateBuilder create() {checkState();return new CreateBuilderImpl(this);}...
}public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> { private final CuratorFrameworkImpl client;private CreateMode createMode;private Backgrounding backgrounding;private boolean createParentsIfNeeded;...CreateBuilderImpl(CuratorFrameworkImpl client) {this.client = client;createMode = CreateMode.PERSISTENT;backgrounding = new Backgrounding();acling = new ACLing(client.getAclProvider());createParentsIfNeeded = false;createParentsAsContainers = false;compress = false;setDataIfExists = false;storingStat = null;ttl = -1;}@Overridepublic String forPath(final String givenPath, byte[] data) throws Exception {if (compress) {data = client.getCompressionProvider().compress(givenPath, data);}final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));List<ACL> aclList = acling.getAclList(adjustedPath);client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);String returnPath = null;if (backgrounding.inBackground()) {pathInBackground(adjustedPath, data, givenPath);} else {//创建节点String path = protectedPathInForeground(adjustedPath, data, aclList);returnPath = client.unfixForNamespace(path);}return returnPath;}private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception {return pathInForeground(adjustedPath, data, aclList);}private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Foreground");final AtomicBoolean firstTime = new AtomicBoolean(true);//重试调用String returnPath = RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<String>() {@Overridepublic String call() throws Exception {boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;protectedMode.checkSetSessionId(client, createMode);String createdPath = null;if (!localFirstTime && protectedMode.doProtected()) {debugForceFindProtectedNode = false;createdPath = findProtectedNodeInForeground(path);}if (createdPath == null) {//在创建znode节点的时候,首先会调用CuratorFramework.getZooKeeper()获取zk客户端实例//之后就是通过原生zk客户端的API去创建节点了try {if (client.isZk34CompatibilityMode()) {createdPath = client.getZooKeeper().create(path, data, aclList, createMode);} else {createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);}} catch (KeeperException.NoNodeException e) {if (createParentsIfNeeded) {//这就是级联创建节点的实现ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);if (client.isZk34CompatibilityMode()) {createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);} else {createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);}} else {throw e;}} catch (KeeperException.NodeExistsException e) {if (setDataIfExists) {Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);if (storingStat != null) {DataTree.copyStat(setStat, storingStat);}createdPath = path;} else {throw e;}}}if (failNextCreateForTesting) {failNextCreateForTesting = false;throw new KeeperException.ConnectionLossException();}return createdPath;}});trace.setRequestBytesLength(data).setPath(path).commit();return returnPath;}...
}public class CuratorFrameworkImpl implements CuratorFramework {private final CuratorZookeeperClient client;public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());this.client = new CuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(),builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),builder.getWaitForShutdownTimeoutMs(),new Watcher() {...},builder.getRetryPolicy(),builder.canBeReadOnly(),builder.getConnectionHandlingPolicy());...}...ZooKeeper getZooKeeper() throws Exception {return client.getZooKeeper();}...
}public class CuratorZookeeperClient implements Closeable {private final ConnectionState state;...public ZooKeeper getZooKeeper() throws Exception {Preconditions.checkState(started.get(), "Client is not started");return state.getZooKeeper();}...
}class ConnectionState implements Watcher, Closeable {private final HandleHolder zooKeeper;...ZooKeeper getZooKeeper() throws Exception {if (SessionFailRetryLoop.sessionForThreadHasFailed()) {throw new SessionFailRetryLoop.SessionFailedException();}Exception exception = backgroundExceptions.poll();if (exception != null) {new EventTrace("background-exceptions", tracer.get()).commit();throw exception;}boolean localIsConnected = isConnected.get();if (!localIsConnected) {checkTimeouts();}//通过HandleHolder获取ZooKeeper实例return zooKeeper.getZooKeeper();}...
}
(2)基于Curator查询znode节点
查询节点也使用了构造器模式:首先通过CuratorFramework的getData()方法创建一个GetDataBuilder实例,然后通过GetDataBuilder的forPath()方法 + 重试调用来查询znode节点。
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//查询节点byte[] dataBytes = client.getData().forPath("/my/path");System.out.println(new String(dataBytes));//查询子节点List<String> children = client.getChildren().forPath("/my");System.out.println(children);}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic GetDataBuilder getData() {checkState();return new GetDataBuilderImpl(this);}@Overridepublic GetChildrenBuilder getChildren() {checkState();return new GetChildrenBuilderImpl(this);}...
}public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> {private final CuratorFrameworkImpl client;...@Overridepublic byte[] forPath(String path) throws Exception {client.getSchemaSet().getSchema(path).validateWatch(path, watching.isWatched() || watching.hasWatcher());path = client.fixForNamespace(path);byte[] responseData = null;if (backgrounding.inBackground()) {client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);} else {//查询节点responseData = pathInForeground(path);}return responseData;}private byte[] pathInForeground(final String path) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");//重试调用byte[] responseData = RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<byte[]>() {@Overridepublic byte[] call() throws Exception {byte[] responseData;//通过CuratorFramework获取原生zk客户端实例,然后调用其getData()获取节点if (watching.isWatched()) {responseData = client.getZooKeeper().getData(path, true, responseStat);} else {responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat);watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);}return responseData;}});trace.setResponseBytesLength(responseData).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(responseStat).commit();return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;}...
}
(3)基于Curator修改znode节点
修改节点也使用了构造器模式:首先通过CuratorFramework的setData()方法创建一个SetDataBuilder实例,然后通过SetDataBuilder的forPath()方法 + 重试调用来修改znode节点。
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//修改节点client.setData().forPath("/my/path", "110".getBytes());byte[] dataBytes = client.getData().forPath("/my/path");System.out.println(new String(dataBytes));}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic SetDataBuilder setData() {checkState();return new SetDataBuilderImpl(this);}...
}public class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> {private final CuratorFrameworkImpl client;...@Overridepublic Stat forPath(String path, byte[] data) throws Exception {client.getSchemaSet().getSchema(path).validateGeneral(path, data, null);if (compress) {data = client.getCompressionProvider().compress(path, data);}path = client.fixForNamespace(path);Stat resultStat = null;if (backgrounding.inBackground()) {client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null);} else {//修改节点resultStat = pathInForeground(path, data);}return resultStat;}private Stat pathInForeground(final String path, final byte[] data) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");//重试调用Stat resultStat = RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<Stat>() {@Overridepublic Stat call() throws Exception {//通过CuratorFramework获取原生zk客户端实例,然后调用其setData()修改节点return client.getZooKeeper().setData(path, data, version);}});trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();return resultStat;}...
}
(4)基于Curator删除znode节点
删除节点也使用了构造器模式:首先通过CuratorFramework的delete()方法创建一个DeleteBuilder实例,然后通过DeleteBuilder的forPath()方法 + 重试调用来删除znode节点。
public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//删除节点client.delete().forPath("/my/path");}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic DeleteBuilder delete() {checkState();return new DeleteBuilderImpl(this);}...
}public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> {private final CuratorFrameworkImpl client;...@Overridepublic Void forPath(String path) throws Exception {client.getSchemaSet().getSchema(path).validateDelete(path);final String unfixedPath = path;path = client.fixForNamespace(path);if (backgrounding.inBackground()) {OperationAndData.ErrorCallback<String> errorCallback = null;if (guaranteed) {errorCallback = new OperationAndData.ErrorCallback<String>() {@Overridepublic void retriesExhausted(OperationAndData<String> operationAndData) {client.getFailedDeleteManager().addFailedOperation(unfixedPath);}};}client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null);} else {//删除节点pathInForeground(path, unfixedPath);}return null;}private void pathInForeground(final String path, String unfixedPath) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("DeleteBuilderImpl-Foreground");//重试调用RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<Void>() {@Overridepublic Void call() throws Exception {try {//通过CuratorFramework获取原生zk客户端实例,然后调用其delete()删除节点client.getZooKeeper().delete(path, version);} catch (KeeperException.NoNodeException e) {if (!quietly) {throw e;}} catch (KeeperException.NotEmptyException e) {if (deletingChildrenIfNeeded) {ZKPaths.deleteChildren(client.getZooKeeper(), path, true);} else {throw e;}}return null;}});trace.setPath(path).commit();}
}